In [1]:
!

In [2]:
!spark-submit --version

Welcome to
      ____              __
     / __/__  ___ _____/ /__
    _\ \/ _ \/ _ `/ __/  '_/
   /___/ .__/\_,_/_/ /_/\_\   version 3.5.1
      /_/
                        
Using Scala version 2.12.18, OpenJDK 64-Bit Server VM, 11.0.27
Branch HEAD
Compiled by user heartsavior on 2024-02-15T11:24:58Z
Revision fd86f85e181fc2dc0f50a096855acf83a6cc5d9c
Url https://github.com/apache/spark
Type --help for more information.


In [3]:
import os
import requests
from pathlib import Path
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, avg, current_timestamp, from_json, to_json, struct
from pyspark.sql.types import StructType, StructField, StringType, IntegerType
# from configs import kafka_config

In [4]:
# 📌 Налаштування MySQL
MYSQL_URL = "jdbc:mysql://217.61.57.46:3306/olympic_dataset"
MYSQL_PROPERTIES = {
    "user": "neo_data_admin",
    "password": "Proyahaxuqithab9oplp",
    "driver": "com.mysql.cj.jdbc.Driver"
}

kafka_config = {
    "bootstrap_servers": ['77.81.230.104:9092'],
    "username": 'admin',
    "password": 'VawEzo1ikLtrA8Ug8THa',
    "security_protocol": 'SASL_PLAINTEXT',
    "sasl_mechanism": 'PLAIN'
}

In [5]:
# 📌 Налаштування Kafka
KAFKA_BOOTSTRAP_SERVERS = kafka_config["bootstrap_servers"][0]
KAFKA_TOPIC_INPUT = "alex_athlete_topic_input"
KAFKA_TOPIC_OUTPUT = "alex_athlete_topic_output"

In [6]:
# Використовуємо Maven coordinates - Spark автоматично завантажить всі необхідні залежності
print("📦 Spark автоматично завантажить MySQL та Kafka connectors...")

# 📌 Ініціалізація Spark (використовуємо Maven coordinates для автоматичного завантаження)
spark = SparkSession.builder \
    .appName("Final_project") \
    .config("spark.jars.packages", "mysql:mysql-connector-java:8.0.33,org.apache.spark:spark-sql-kafka-0-10_2.12:3.5.4") \
    .config("spark.sql.shuffle.partitions", "4") \
    .config("spark.streaming.backpressure.enabled", "true") \
    .config("spark.sql.session.timeZone", "UTC") \
    .getOrCreate()

print("✅ Spark Session ініціалізовано успішно!")

📦 Spark автоматично завантажить MySQL та Kafka connectors...
✅ Spark Session ініціалізовано успішно!


In [7]:
# 1️⃣ Читання біографічних даних атлетів
print("📖 Читання біографічних даних атлетів...")
athlete_bio_df = spark.read.format("jdbc").options(
    url=MYSQL_URL,
    driver="com.mysql.cj.jdbc.Driver",
    dbtable="olympic_dataset.athlete_bio",
    user=MYSQL_PROPERTIES["user"],
    password=MYSQL_PROPERTIES["password"]
).load()


📖 Читання біографічних даних атлетів...


In [8]:
# 2️⃣ Фільтрація некоректних значень
print("🧹 Фільтрація некоректних значень...")
athlete_bio_cleaned_df = athlete_bio_df.filter(
    (col("height").isNotNull()) & (col("weight").isNotNull()) &
    (col("height").cast("double").isNotNull()) & (col("weight").cast("double").isNotNull())
)

🧹 Фільтрація некоректних значень...


In [9]:

# 3️⃣ Читання та запис результатів змагань у Kafka
print("📤 Відправка даних у Kafka...")
event_results_df = spark.read.format("jdbc").options(
    url=MYSQL_URL,
    driver="com.mysql.cj.jdbc.Driver",
    dbtable="olympic_dataset.athlete_event_results",
    user=MYSQL_PROPERTIES["user"],
    password=MYSQL_PROPERTIES["password"]
).load()

try:
    event_results_df.selectExpr("CAST(athlete_id AS STRING) as key", "to_json(struct(*)) AS value") \
        .write.format("kafka") \
        .option("kafka.bootstrap.servers", KAFKA_BOOTSTRAP_SERVERS) \
        .option("topic", KAFKA_TOPIC_INPUT) \
        .option("kafka.security.protocol", kafka_config["security_protocol"]) \
        .option("kafka.sasl.mechanism", kafka_config["sasl_mechanism"]) \
        .option("kafka.sasl.jaas.config",
                f'org.apache.kafka.common.security.plain.PlainLoginModule required username="{kafka_config["username"]}" '
                f'password="{kafka_config["password"]}";') \
        .save()
    print("✅ Дані успішно відправлено у Kafka!")
except Exception as e:
    print(f"❌ Помилка відправки у Kafka: {e}")
    # Continue with streaming part even if batch write fails

# 📌 Читання з Kafka (потоково)
print("📥 Налаштування потокового читання з Kafka...")
schema = StructType([
    StructField("edition", StringType(), True),
    StructField("edition_id", StringType(), True),
    StructField("country_noc", StringType(), True),
    StructField("sport", StringType(), True),
    StructField("event", StringType(), True),
    StructField("result_id", StringType(), True),
    StructField("athlete", StringType(), True),
    StructField("athlete_id", StringType(), True),
    StructField("pos", StringType(), True),
    StructField("medal", StringType(), True),
    StructField("isTeamSport", StringType(), True)
])

📤 Відправка даних у Kafka...
✅ Дані успішно відправлено у Kafka!
📥 Налаштування потокового читання з Kafka...


In [15]:
kafka_stream_df = spark.readStream.format("kafka") \
    .option("kafka.bootstrap.servers", KAFKA_BOOTSTRAP_SERVERS) \
    .option("subscribe", KAFKA_TOPIC_INPUT) \
    .option("kafka.security.protocol", kafka_config["security_protocol"]) \
    .option("kafka.sasl.mechanism", kafka_config["sasl_mechanism"]) \
    .option("kafka.sasl.jaas.config",
            f'org.apache.kafka.common.security.plain.PlainLoginModule required username="{kafka_config["username"]}" '
            f'password="{kafka_config["password"]}";') \
    .option("startingOffsets", "earliest") \
    .option("maxOffsetsPerTrigger", "500") \
    .load()

kafka_json_df = kafka_stream_df.selectExpr("CAST(value AS STRING)").select(from_json("value", schema).alias("data")).select("data.*")

In [16]:
# 4️⃣ Об'єднання з біографічними даними
print("🔗 Об'єднання потокових даних з біографічними даними...")

# Rename columns in athlete_bio to avoid conflicts
athlete_bio_renamed_df = athlete_bio_cleaned_df.select(
    col("athlete_id").alias("bio_athlete_id"),
    col("name").alias("athlete_name"),
    col("sex"),
    col("born"),
    col("height"),
    col("weight"),
    col("country").alias("bio_country"),
    col("country_noc").alias("bio_country_noc"),
    col("description").alias("bio_description"),
    col("special_notes").alias("bio_special_notes")
)

🔗 Об'єднання потокових даних з біографічними даними...


In [17]:
# Join using explicit column references
joined_stream_df = kafka_json_df.join(
    athlete_bio_renamed_df,
    kafka_json_df.athlete_id == athlete_bio_renamed_df.bio_athlete_id,
    how="inner"
).drop("bio_athlete_id")  # Remove duplicate athlete_id column


In [18]:
# 5️⃣ Агрегація за видом спорту, медаллю, статтю, країною
print("📊 Налаштування агрегації...")
# Use the country_noc from Kafka data (event results) for grouping
aggregated_stream_df = joined_stream_df.groupBy("sport", "medal", "sex", "country_noc").agg(
    avg("height").alias("avg_height"),
    avg("weight").alias("avg_weight"),
    current_timestamp().alias("timestamp")
)


📊 Налаштування агрегації...


In [19]:
# ✅ Функція запису у Kafka та MySQL
def foreach_batch_function(batch_df, batch_id):
    print(f"📝 Обробка batch {batch_id}...")
    batch_count = batch_df.count()
    print(f"📊 Batch {batch_id} містить {batch_count} записів")

    if batch_count > 0:
        # Show sample data for debugging
        print(f"🔍 Приклад даних з batch {batch_id}:")
        # batch_df.show(5, truncate=False)

        try:
            # Запис у Kafka
            batch_df.selectExpr("to_json(struct(*)) AS value") \
                .write.format("kafka") \
                .option("kafka.bootstrap.servers", KAFKA_BOOTSTRAP_SERVERS) \
                .option("topic", KAFKA_TOPIC_OUTPUT) \
                .option("kafka.security.protocol", kafka_config["security_protocol"]) \
                .option("kafka.sasl.mechanism", kafka_config["sasl_mechanism"]) \
                .option("kafka.sasl.jaas.config",
                        f'org.apache.kafka.common.security.plain.PlainLoginModule required username="{kafka_config["username"]}" '
                        f'password="{kafka_config["password"]}";') \
                .save()
            print(f"✅ Batch {batch_id} записано у Kafka")

            # Запис у MySQL
            batch_df.write.format("jdbc").options(
                url=MYSQL_URL,
                driver="com.mysql.cj.jdbc.Driver",
                dbtable="olympic_dataset.aggregated_results",
                user=MYSQL_PROPERTIES["user"],
                password=MYSQL_PROPERTIES["password"]
            ).mode("append").save()
            print(f"✅ Batch {batch_id} записано у MySQL")

        except Exception as e:
            print(f"❌ Помилка обробки batch {batch_id}: {e}")
            import traceback
            traceback.print_exc()
    else:
        print(f"⚠️ Batch {batch_id} порожній - можливо, немає нових даних у топіку")

In [20]:
# 6️⃣ Старт стріму
print("🚀 Запуск потокової обробки...")
query = aggregated_stream_df.writeStream \
    .foreachBatch(foreach_batch_function) \
    .outputMode("update") \
    .option("checkpointLocation", "checkpoint/athlete_pipeline") \
    .start()

🚀 Запуск потокової обробки...


In [21]:
print("✅ Потокова обробка запущена! Натисніть Ctrl+C для зупинки.")
query.awaitTermination()

✅ Потокова обробка запущена! Натисніть Ctrl+C для зупинки.
📝 Обробка batch 3...
📊 Batch 3 містить 58 записів
🔍 Приклад даних з batch 3:
✅ Batch 3 записано у Kafka
✅ Batch 3 записано у MySQL
📝 Обробка batch 4...
📊 Batch 4 містить 88 записів
🔍 Приклад даних з batch 4:
✅ Batch 4 записано у Kafka
✅ Batch 4 записано у MySQL
📝 Обробка batch 5...
📊 Batch 5 містить 61 записів
🔍 Приклад даних з batch 5:
✅ Batch 5 записано у Kafka


ERROR:root:KeyboardInterrupt while sending command.
Traceback (most recent call last):
  File "/usr/local/lib/python3.11/dist-packages/py4j/java_gateway.py", line 1038, in send_command
    response = connection.send_command(command)
               ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.11/dist-packages/py4j/clientserver.py", line 511, in send_command
    answer = smart_decode(self.stream.readline()[:-1])
                          ^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/lib/python3.11/socket.py", line 718, in readinto
    return self._sock.recv_into(b)
           ^^^^^^^^^^^^^^^^^^^^^^^
KeyboardInterrupt


KeyboardInterrupt: 