In [None]:
import os
from pyspark.sql import SparkSession
from configs import kafka_configs, db_configs

#Ініціалізація Spark та конфігурація

os.environ['PYSPARK_SUBMIT_ARGS'] = '--packages org.apache.spark:spark-sql-kafka-0-10_2.12:3.5.1,mysql:mysql-connector-java:8.0.32 pyspark-shell'

# Створення SparkSession

spark = SparkSession.builder \
    .appName("OlympicStreaming") \
    .master("local[*]") \
    .config("spark.jars", "mysql-connector-java-8.0.32.jar") \
    .config("spark.sql.streaming.schemaInference", True)\
    .getOrCreate()

spark.sparkContext.setLogLevel("WARN")


In [None]:
# #Зчитування біологічних даних з MySQL
athlete_bio_df = spark.read.format('jdbc').options(
    url=db_configs["url"],
    driver='com.mysql.jdbc.Driver',  # com.mysql.jdbc.Driver
    dbtable="athlete_bio",
    user=db_configs["user"],
    password=db_configs["password"]) \
    .load()

athlete_bio_df.show()

+----------+-------------------+------+--------------+------+------+-----------------+-----------+--------------------+--------------------+
|athlete_id|               name|   sex|          born|height|weight|          country|country_noc|         description|       special_notes|
+----------+-------------------+------+--------------+------+------+-----------------+-----------+--------------------+--------------------+
|     65649|       IvankaBonova|Female|    4April1949| 166.0|    55|         Bulgaria|        BUL|PersonalBest40053...|                 nan|
|    112510|   NataliyaUryadova|Female|   15March1977| 184.0|    70|RussianFederation|        RUS|                 nan|ListedinOlympians...|
|    114973|   EssaIsmailRashed|  Male|14December1986| 165.0|    55|            Qatar|        QAT|PersonalBest10000...|ListedinOlympians...|
|     30359|          PterBoros|  Male| 12January1908|      |   nan|          Hungary|        HUN|Between1927and193...|                 nan|
|     50557| 

In [None]:
# Фільтрація некоректних значень
from pyspark.sql.functions import col
bio_clean_df = athlete_bio_df \
    .filter(col("height").cast("float").isNotNull()) \
    .filter(col("weight").cast("float").isNotNull())


In [5]:
#Зчитування результатів з MySQL та запис у Kafka
event_df = spark.read \
    .format("jdbc") \
    .option("url", db_configs["url"]) \
    .option("dbtable", "athlete_event_results") \
    .option("user", db_configs["user"]) \
    .option("password", db_configs["password"]) \
    .option("driver", db_configs["driver"]) \
    .load()

# Запис у Kafka
event_df.selectExpr("to_json(struct(*)) AS value") \
    .write \
    .format("kafka") \
    .option("kafka.bootstrap.servers", "77.81.230.104:9092") \
    .option("topic", "athlete_event_results") \
    .option("kafka.security.protocol", "SASL_PLAINTEXT") \
    .option("kafka.sasl.mechanism", "PLAIN") \
    .option("kafka.sasl.jaas.config", f"org.apache.kafka.common.security.plain.PlainLoginModule required username='admin' password={kafka_configs['password']};") \
    .save()


In [6]:
#Зчитування з Kafka та парсинг JSON
from pyspark.sql.functions import from_json, schema_of_json

# Стримінг з Kafka
kafka_stream_df = spark.readStream \
    .format("kafka") \
    .option("kafka.bootstrap.servers", "77.81.230.104:9092") \
    .option("subscribe", "athlete_event_results") \
    .option("startingOffsets", "latest") \
    .option("kafka.security.protocol", "SASL_PLAINTEXT") \
    .option("kafka.sasl.mechanism", "PLAIN") \
    .option("kafka.sasl.jaas.config", f"org.apache.kafka.common.security.plain.PlainLoginModule required username='admin' password={kafka_configs['password']};") \
    .load()

# Парсинг JSON
sample_json = event_df.selectExpr("to_json(struct(*)) AS value").first()["value"]
json_schema = schema_of_json(sample_json)

parsed_df = kafka_stream_df.selectExpr("CAST(value AS STRING)") \
    .select(from_json("value", json_schema).alias("data")) \
    .select("data.*")


In [7]:
#Об’єднання з біо-даними та агрегація
from pyspark.sql.functions import current_timestamp, avg

joined_df = parsed_df.join(bio_clean_df, on="athlete_id", how="inner")\
    .drop(parsed_df["country_noc"])

aggregated_df = joined_df.groupBy(
    "sport", "medal", "sex", "country_noc"
).agg(
    avg("height").alias("avg_height"),
    avg("weight").alias("avg_weight")
).withColumn("calculated_at", current_timestamp())


In [None]:
#Стримінг у Kafka та MySQL через forEachBatch

import os
from pathlib import Path

checkpoint_path = str(Path.home() / "spark_checkpoints")
if not os.path.exists(checkpoint_path):
    os.makedirs(checkpoint_path, exist_ok=True)

def write_to_kafka_and_db(batch_df, batch_id):
    if batch_df.isEmpty():
        print(f"Batch {batch_id} is empty, skipping...")
        return
        
    try:
        print(f"Processing batch {batch_id}")
        
        # Запис у Kafka-топік
        batch_df.selectExpr("to_json(struct(*)) AS value") \
            .write \
            .format("kafka") \
            .option("kafka.bootstrap.servers", "77.81.230.104:9092") \
            .option("topic", "aggregated_athlete_stats") \
            .option("kafka.security.protocol", "SASL_PLAINTEXT") \
            .option("kafka.sasl.mechanism", "PLAIN") \
            .option("kafka.sasl.jaas.config", 
                   f"org.apache.kafka.common.security.plain.PlainLoginModule required " \
                   f"username='admin' password={kafka_configs['password']};") \
            .save()

        # Запис у БД MySQL
        batch_df.write \
            .format("jdbc") \
            .option("url", db_configs["url"]) \
            .option("dbtable", "aggregated_athlete_stats") \
            .option("user", db_configs["user"]) \
            .option("password", db_configs["password"]) \
            .option("driver", db_configs["driver"]) \
            .mode("append") \
            .save()
            
        print(f"Successfully processed batch {batch_id}")
            
    except Exception as e:
        print(f"Error processing batch {batch_id}: {str(e)}")
        raise e

# Стримінг
query = aggregated_df.writeStream \
    .outputMode("update") \
    .foreachBatch(write_to_kafka_and_db) \
    .option("checkpointLocation", checkpoint_path) \
    .option("spark.sql.streaming.minBatchesToRetain", "2") \
    .trigger(processingTime="10 seconds") \
    .start()

try:
    print("Starting streaming query...")
    query.awaitTermination()
except KeyboardInterrupt:
    print("\nStopping stream...")
    query.stop()
    print("Stream stopped")
except Exception as e:
    print(f"Stream failed: {str(e)}")
    query.stop()
    raise e