In [None]:
pip install kafka

Collecting kafka
  Downloading kafka-1.3.5-py2.py3-none-any.whl.metadata (6.9 kB)
Downloading kafka-1.3.5-py2.py3-none-any.whl (207 kB)
[?25l   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m0.0/207.2 kB[0m [31m?[0m eta [36m-:--:--[0m[2K   [91m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m[91m╸[0m [32m204.8/207.2 kB[0m [31m7.1 MB/s[0m eta [36m0:00:01[0m[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m207.2/207.2 kB[0m [31m4.7 MB/s[0m eta [36m0:00:00[0m
[?25hInstalling collected packages: kafka
Successfully installed kafka-1.3.5


In [None]:
pip install spark

Collecting spark
  Downloading spark-0.2.1.tar.gz (41 kB)
[?25l     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m0.0/41.0 kB[0m [31m?[0m eta [36m-:--:--[0m[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m41.0/41.0 kB[0m [31m2.4 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
Building wheels for collected packages: spark
  Building wheel for spark (setup.py) ... [?25l[?25hdone
  Created wheel for spark: filename=spark-0.2.1-py3-none-any.whl size=58748 sha256=826348f61169f983ff0f1743f6dfdb7ca682ddd8c9aadc8529dcdc1b5478f501
  Stored in directory: /root/.cache/pip/wheels/67/c2/7c/a53325365fba358ffff35af84a2e14cf88c18052f88acfa5f0
Successfully built spark
Installing collected packages: spark
Successfully installed spark-0.2.1


In [None]:
pip install pyspark



In [12]:
# Встановлення PySpark
!pip install pyspark requests



In [14]:
# Імпорт необхідних бібліотек і створення функції завантаження даних із FTP-сервера

import requests
# Функція для завантаження даних з FTP сервера
def download_data(local_file_name):
    url = "https://ftp.goit.study/neoversity/"
    full_url = url + local_file_name + ".csv"
    response = requests.get(full_url)

    if response.status_code == 200:
        with open(local_file_name + ".csv", 'wb') as file:
            file.write(response.content)
        print(f"✅ File '{local_file_name}.csv' downloaded successfully.")
    else:
        raise Exception(f"❌ Error {response.status_code}: File could not be downloaded.")

In [15]:
# Завантаження CSV файлів (athlete_bio.csv та athlete_event_results.csv)

# Завантажуємо athlete_bio.csv
download_data("athlete_bio")
# Завантажуємо athlete_event_results.csv
download_data("athlete_event_results")

✅ File 'athlete_bio.csv' downloaded successfully.
✅ File 'athlete_event_results.csv' downloaded successfully.


#BRONZE

In [16]:
# Запуск SparkSession та завантаження CSV, збереження у форматі Parquet (bronze шар)

from pyspark.sql import SparkSession
# Створюємо SparkSession
spark = SparkSession.builder \
    .appName("LandingToBronze") \
    .getOrCreate()

# Шлях до завантажених CSV файлів
athlete_bio_csv = "athlete_bio.csv"
athlete_event_csv = "athlete_event_results.csv"

# Зчитуємо athlete_bio.csv та зберігаємо у Parquet
athlete_bio_df = spark.read.option("header", True).option("inferSchema", True).csv(athlete_bio_csv)
athlete_bio_df.write.mode("overwrite").parquet("bronze/athlete_bio")

# Виведення результату
print("✅ athlete_bio таблиця успішно збережена в bronze шарі")
athlete_bio_df.show(5)

# Зчитуємо athlete_event_results.csv та зберігаємо у Parquet
athlete_event_df = spark.read.option("header", True).option("inferSchema", True).csv(athlete_event_csv)
athlete_event_df.write.mode("overwrite").parquet("bronze/athlete_event_results")

# Виведення результату
print("✅ athlete_event_results таблиця успішно збережена в bronze шарі")
athlete_event_df.show(5)

# Зупинка SparkSession
spark.stop()

✅ athlete_bio таблиця успішно збережена в bronze шарі
+----------+------------------+------+----------------+------+------+-------------------+-----------+--------------------+--------------------+
|athlete_id|              name|   sex|            born|height|weight|            country|country_noc|         description|       special_notes|
+----------+------------------+------+----------------+------+------+-------------------+-----------+--------------------+--------------------+
|     65649|     Ivanka Bonova|Female|    4 April 1949|   166|    55|           Bulgaria|        BUL|Personal Best: 40...|                NULL|
|    112510| Nataliya Uryadova|Female|   15 March 1977|   184|    70| Russian Federation|        RUS|                NULL|Listed in Olympia...|
|    114973|Essa Ismail Rashed|  Male|14 December 1986|   165|    55|              Qatar|        QAT|Personal Best: 10...|Listed in Olympia...|
|     30359|       Péter Boros|  Male| 12 January 1908|  NULL|  NULL|            H

#BRONZE TO SILVER

In [17]:
# Запуск Spark-сесії і завантаження bronze-шару

from pyspark.sql import SparkSession
spark = SparkSession.builder \
    .appName("BronzeToSilver") \
    .getOrCreate()

# Завантаження даних з bronze
athlete_bio_df = spark.read.parquet("bronze/athlete_bio")
athlete_event_df = spark.read.parquet("bronze/athlete_event_results")

In [18]:
# Створюємо функцію очищення тексту та реєструємо як UDF (user defined function)

import re
from pyspark.sql.functions import udf
from pyspark.sql.types import StringType

# Функція очищення тексту
def clean_text(text):
    return re.sub(r'[^a-zA-Z0-9,.\"\']', '', str(text))

# Реєструємо UDF
clean_text_udf = udf(clean_text, StringType())

In [19]:
# Застосовуємо очищення тексту до всіх текстових колонок

from pyspark.sql.types import StringType

# Функція для очищення текстових колонок
def clean_dataframe_text_columns(df):
    for column_name, column_type in df.dtypes:
        if column_type == 'string':
            df = df.withColumn(column_name, clean_text_udf(df[column_name]))
    return df

# Очищення текстових колонок в обох DataFrame
athlete_bio_cleaned_df = clean_dataframe_text_columns(athlete_bio_df)
athlete_event_cleaned_df = clean_dataframe_text_columns(athlete_event_df)

In [20]:
# Видаляємо дублікати

athlete_bio_cleaned_df = athlete_bio_cleaned_df.dropDuplicates()
athlete_event_cleaned_df = athlete_event_cleaned_df.dropDuplicates()

In [21]:
# Запис очищених даних у silver шар (формат parquet)

# Збереження у silver шар
athlete_bio_cleaned_df.write.mode("overwrite").parquet("silver/athlete_bio")
athlete_event_cleaned_df.write.mode("overwrite").parquet("silver/athlete_event_results")

# Виведення результатів
print("✅ athlete_bio таблиця успішно збережена в silver шарі")
athlete_bio_cleaned_df.show(5, truncate=False)

print("✅ athlete_event_results таблиця успішно збережена в silver шарі")
athlete_event_cleaned_df.show(5, truncate=False)

# Зупиняємо Spark сесію
spark.stop()

✅ athlete_bio таблиця успішно збережена в silver шарі
+----------+----------------+------+--------------+------+------+-----------+-----------+---------------------------------------------------------------------+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|athlete_id|name            |sex   |born          |height|weight|country    |country_noc|description                                                          |special_notes                                                                                                                                                                               |
+----------+----------------+------+--------------+------+------+-----------+-----------+---------------------------------------------------------------------+-----------------------------------------------------------------------------------------

#SILVER TO GOLD

In [22]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import avg, current_timestamp, col
from pyspark.sql.types import FloatType

# Створення сесії Spark
spark = SparkSession.builder.appName("SilverToGoldJob").getOrCreate()

# Зчитування silver даних
bio_df = spark.read.parquet("silver/athlete_bio")
results_df = spark.read.parquet("silver/athlete_event_results")

# Перетворення height і weight у числовий тип
bio_df = bio_df.withColumn("height", col("height").cast(FloatType()))
bio_df = bio_df.withColumn("weight", col("weight").cast(FloatType()))

# Видаляємо country_noc з одного з датафреймів, щоб уникнути конфлікту
bio_df = bio_df.drop("country_noc")  # або results_df = results_df.drop("country_noc")

# Join по athlete_id
joined_df = results_df.join(bio_df, on="athlete_id", how="inner")

# Групування та обчислення середніх значень
agg_df = joined_df.groupBy("sport", "medal", "sex", "country_noc") \
    .agg(
        avg("height").alias("avg_height"),
        avg("weight").alias("avg_weight")
    ) \
    .withColumn("timestamp", current_timestamp())

# Вивід результату в лог
agg_df.show(truncate=False)

# Запис до gold шару
agg_df.write.mode("overwrite").parquet("gold/avg_stats")

+------------------+------+------+-----------+------------------+------------------+--------------------------+
|sport             |medal |sex   |country_noc|avg_height        |avg_weight        |timestamp                 |
+------------------+------+------+-----------+------------------+------------------+--------------------------+
|CrossCountrySkiing|None  |Male  |ARM        |170.45454545454547|67.13636363636364 |2025-04-13 09:27:45.672614|
|Swimming          |None  |Male  |DEN        |189.67708333333334|82.20833333333333 |2025-04-13 09:27:45.672614|
|Shooting          |None  |Male  |GDR        |177.59183673469389|76.63265306122449 |2025-04-13 09:27:45.672614|
|IceHockey         |None  |Male  |SVK        |185.5939393939394 |90.61818181818182 |2025-04-13 09:27:45.672614|
|FreestyleSkiing   |None  |Male  |GBR        |172.38888888888889|70.05555555555556 |2025-04-13 09:27:45.672614|
|Sailing           |None  |Male  |BER        |179.04761904761904|81.47619047619048 |2025-04-13 09:27:45.

#СТВОРЕННЯ DAG-файлу

In [23]:
# Встановлення Apache Airflow та супутніх пакетів для належної роботи в Google Colab
!pip install apache-airflow[mysql]==2.8.4 --constraint "https://raw.githubusercontent.com/apache/airflow/constraints-2.8.4/constraints-3.10.txt"

Collecting apache-airflow==2.8.4 (from apache-airflow[mysql]==2.8.4)
  Downloading apache_airflow-2.8.4-py3-none-any.whl.metadata (55 kB)
[?25l     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m0.0/55.8 kB[0m [31m?[0m eta [36m-:--:--[0m[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m55.8/55.8 kB[0m [31m2.7 MB/s[0m eta [36m0:00:00[0m
[?25hCollecting alembic<2.0,>=1.13.1 (from apache-airflow==2.8.4->apache-airflow[mysql]==2.8.4)
  Downloading alembic-1.13.1-py3-none-any.whl.metadata (7.4 kB)
Collecting argcomplete>=1.10 (from apache-airflow==2.8.4->apache-airflow[mysql]==2.8.4)
  Downloading argcomplete-3.2.3-py3-none-any.whl.metadata (16 kB)
Collecting asgiref (from apache-airflow==2.8.4->apache-airflow[mysql]==2.8.4)
  Downloading asgiref-3.7.2-py3-none-any.whl.metadata (9.2 kB)
Collecting attrs>=22.1.0 (from apache-airflow==2.8.4->apache-airflow[mysql]==2.8.4)
  Downloading attrs-23.2.0-py3-none-any.whl.metadata (9.5 kB)
Collecting blinker (from a

In [24]:
!pip uninstall -y pluggy
!pip install pluggy==1.3.0

Found existing installation: pluggy 1.4.0
Uninstalling pluggy-1.4.0:
  Successfully uninstalled pluggy-1.4.0
Collecting pluggy==1.3.0
  Downloading pluggy-1.3.0-py3-none-any.whl.metadata (4.3 kB)
Downloading pluggy-1.3.0-py3-none-any.whl (18 kB)
Installing collected packages: pluggy
[31mERROR: pip's dependency resolver does not currently take into account all the packages that are installed. This behaviour is the source of the following dependency conflicts.
pytest 8.3.5 requires pluggy<2,>=1.5, but you have pluggy 1.3.0 which is incompatible.[0m[31m
[0mSuccessfully installed pluggy-1.3.0


In [25]:
!pip uninstall -y pytest
!pip install pytest==7.4.4

Found existing installation: pytest 8.3.5
Uninstalling pytest-8.3.5:
  Successfully uninstalled pytest-8.3.5
Collecting pytest==7.4.4
  Downloading pytest-7.4.4-py3-none-any.whl.metadata (7.9 kB)
Downloading pytest-7.4.4-py3-none-any.whl (325 kB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m325.3/325.3 kB[0m [31m6.4 MB/s[0m eta [36m0:00:00[0m
[?25hInstalling collected packages: pytest
Successfully installed pytest-7.4.4


In [26]:
!pip show pluggy pytest

Name: pluggy
Version: 1.3.0
Summary: plugin and hook calling mechanisms for python
Home-page: https://github.com/pytest-dev/pluggy
Author: Holger Krekel
Author-email: holger@merlinux.eu
License: MIT
Location: /usr/local/lib/python3.11/dist-packages
Requires: 
Required-by: apache-airflow, pytest
---
Name: pytest
Version: 7.4.4
Summary: pytest: simple powerful testing with Python
Home-page: https://docs.pytest.org/en/latest/
Author: Holger Krekel, Bruno Oliveira, Ronny Pfannschmidt, Floris Bruynooghe, Brianna Laugher, Florian Bruhin and others
Author-email: 
License: MIT
Location: /usr/local/lib/python3.11/dist-packages
Requires: iniconfig, packaging, pluggy
Required-by: 


In [27]:
# Перевірка встановлення Airflow
import airflow
print(airflow.__version__)

2.8.4


In [28]:
# Клонування репозиторію GitHub
!git clone https://github.com/goitacademy/airflow_sandbox.git

Cloning into 'airflow_sandbox'...
remote: Enumerating objects: 3426, done.[K
remote: Counting objects: 100% (638/638), done.[K
remote: Compressing objects: 100% (333/333), done.[K
remote: Total 3426 (delta 489), reused 433 (delta 305), pack-reused 2788 (from 3)[K
Receiving objects: 100% (3426/3426), 243.73 MiB | 26.71 MiB/s, done.
Resolving deltas: 100% (1716/1716), done.


In [29]:
%cd airflow_sandbox

/content/airflow_sandbox


In [30]:
# налаштування URL з токеном goit
!git remote set-url origin https://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@github.com/goitacademy/airflow_sandbox.git

In [31]:
# свторюємо папку dags/
import os
os.makedirs("airflow_sandbox/dags", exist_ok=True)

In [36]:
# Проектний DAG-файл з основним кодом

%%writefile fin_p_dag.py
#%%writefile airflow_sandbox/dags/fin_p_dag.py
from airflow import DAG
from airflow.providers.apache.spark.operators.spark_submit import SparkSubmitOperator
from datetime import datetime

# DAG definition
default_args = {
    'owner': 'airflow',
    'start_date': datetime(2024, 1, 1),
    'retries': 1
}

dag = DAG(
    dag_id='fin_p',
    default_args=default_args,
    description='DAG to run Spark ETL pipeline for Data Lake',
    schedule_interval=None,  # On-demand
    catchup=False
)

# Task 1: Landing to Bronze
landing_to_bronze = SparkSubmitOperator(
    task_id='landing_to_bronze',
    application='dags/landing_to_bronze.py',
    conn_id='spark-default',
    verbose=True,
    dag=dag
)

# Task 2: Bronze to Silver
bronze_to_silver = SparkSubmitOperator(
    task_id='bronze_to_silver',
    application='dags/bronze_to_silver.py',
    conn_id='spark-default',
    verbose=True,
    dag=dag
)

# Task 3: Silver to Gold
silver_to_gold = SparkSubmitOperator(
    task_id='silver_to_gold',
    application='dags/silver_to_gold.py',
    conn_id='spark-default',
    verbose=True,
    dag=dag
)

# Define DAG dependencies
landing_to_bronze >> bronze_to_silver >> silver_to_gold

Overwriting fin_p_dag.py


In [37]:
# перевірка чи створено файл
!ls fin_p_dag.py

fin_p_dag.py


In [38]:
# додавання усіх змін
!git add .