In [None]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, from_json, avg, current_timestamp
from pyspark.sql.types import StructType, StructField, IntegerType, FloatType, StringType

In [None]:
# Визначення структур даних для JSON
schema_athlete_bio = StructType([
    StructField("athlete_id", IntegerType(), True),
    StructField("height", FloatType(), True),
    StructField("weight", FloatType(), True),
    StructField("sex", StringType(), True),
    StructField("country_noc", StringType(), True)
])

schema_event_results = StructType([
    StructField("event_id", IntegerType(), True),
    StructField("athlete_id", IntegerType(), True),
    StructField("medal", StringType(), True),
    StructField("timestamp", StringType(), True)
])


In [None]:
# Частина 1. 
# Створення стримуючої архітектури
if __name__ == "__main__":
    # Ініціалізація Spark сесії
    spark = SparkSession.builder \
        .appName("StreamingPipeline") \
        .master("spark://217.61.58.159:7077") \
        .config("spark.ui.enabled", "true") \
        .config("spark.ui.port", "8080") \
        .getOrCreate()

In [None]:
# Зчитуємо дані з MySQL для таблиці athlete_bio
    df_athlete_bio = spark.read \
        .format("jdbc") \
        .option("url", "jdbc:mysql://217.61.57.46:3306/neo_data") \
        .option("dbtable", "athlete_bio") \
        .option("user", "neo_data_admin") \
        .option("password", "Proyahaxuqithab9oplp") \
        .load()

In [None]:
# Фільтрація записів із неповними даними
    df_athlete_bio = df_athlete_bio.filter(
        col("height").isNotNull() & col("weight").isNotNull()
    )

In [None]:
# Зчитуємо дані з Kafka топіка
    df_event_results = spark \
        .readStream \
        .format("kafka") \
        .option("kafka.bootstrap.servers", "77.81.230.104:9092") \
        .option("subscribe", "athlete_event_results") \
        .option("kafka.security.protocol", "PLAINTEXT") \
        .option("kafka.sasl.jaas.config", 
                f"org.apache.kafka.common.security.plain.PlainLoginModule required username=\"admin\" password=\"VawEzo1ikLtrA8Ug8THa\";") \
        .load()

In [None]:
# Декодування JSON даних із Kafka
    df_event_results = df_event_results.selectExpr("CAST(value AS STRING)")
    df_event_results = df_event_results.withColumn("value", from_json(col("value"), schema_event_results))
    df_event_results = df_event_results.select("value.*")


In [None]:
# Злиття даних із athlete_bio та athlete_event_results
combined_df = df_athlete_bio.join(df_event_results, on="athlete_id", how="inner")

# Запис результатів у Kafka топік
    query = combined_df.writeStream \
        .outputMode("append") \
        .format("kafka") \
        .option("kafka.bootstrap.servers", "77.81.230.104:9092") \
        .option("topic", "processed_athlete_data") \
        .start()

    query.awaitTermination()

In [None]:
# Частина 2. Створення Batch Data Lake:
if __name__ == "__main__":
    # Ініціалізація Spark сесії
    spark = SparkSession.builder \
        .appName("BatchDataLake") \
        .master("spark://217.61.58.159:7077") \
        .getOrCreate()

In [None]:
# Завантаження файлів з FTP серверу
url = "https://ftp.goit.study/neoversity/"
    files = ["athlete_bio.txt", "athlete_event_results.txt"]

    for file in files:
        local_file_path = f"bronze/{file}"
        full_url = url + file
        print(f"Downloading {file} from {full_url}")

        # Зчитування файлу та збереження у форматі Parquet
        df = spark.read.option("header", True).csv(full_url)
        df.write.parquet(local_file_path)

        print(f"File {file} saved as Parquet in bronze layer.")

In [None]:
 # Зчитуємо таблиці з бронзою
    bronze_athlete_bio = spark.read.parquet("bronze/athlete_bio.txt")
    bronze_event_results = spark.read.parquet("bronze/athlete_event_results.txt")

# Очищення даних та видалення дублювань
    cleaned_athlete_bio = bronze_athlete_bio.dropDuplicates()
    cleaned_event_results = bronze_event_results.dropDuplicates()

# Запис у срібну
    cleaned_athlete_bio.write.parquet("silver/athlete_bio_silver.txt")
    cleaned_event_results.write.parquet("silver/athlete_event_results_silver.txt")

    print("Data cleaned and saved to silver layer.")

# Зчитуємо таблиці зі срібною
    silver_athlete_bio = spark.read.parquet("silver/athlete_bio_silver.txt")
    silver_event_results = spark.read.parquet("silver/athlete_event_results_silver.txt")

# Об'єднуєнмо данні
    joined_df = silver_athlete_bio.join(silver_event_results, on="athlete_id", how="inner")

# Обчислення середніх значень weight і height
    avg_stats_df = joined_df.groupBy("sex", "country_noc") \
                            .agg(
                                avg("weight").alias("avg_weight"),
                                avg("height").alias("avg_height")
                            )


In [None]:
# Додати час виконання
    avg_stats_df = avg_stats_df.withColumn("timestamp", current_timestamp())

# Запис у золотий шар
    avg_stats_df.write.parquet("gold/avg_stats")

    print("Processed data saved to gold layer.")