In [1]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, avg, current_timestamp, from_json, schema_of_json
from pyspark.sql.types import StringType
from kafka.admin import KafkaAdminClient, NewTopic
from kafka import KafkaProducer, KafkaConsumer
import json
import uuid

In [2]:
# Налаштування конфігурації SQL бази даних
jdbc_url = "jdbc:mysql://217.61.57.46:3306/olympic_dataset"
jdbc_table_bio = "athlete_bio"
jdbc_table_results = "athlete_event_results"
jdbc_user = "neo_data_admin"
jdbc_password = "Proyahaxuqithab9oplp"
jar_file_path = "/content/mysql-connector-j-8.0.32.jar"

# Ініціалізація SparkSession
spark = SparkSession.builder \
    .appName("Olympic Bio Data") \
    .config("spark.jars", jar_file_path) \
    .config("spark.jars.packages", "org.apache.spark:spark-sql-kafka-0-10_2.12:3.5.5") \
    .getOrCreate()

In [3]:
# Крок 1: Читання даних фізичних показників атлетів з SQL бази даних
athletes_bio_df = spark.read.format('jdbc').options(
    url=jdbc_url,
    driver='com.mysql.cj.jdbc.Driver',
    dbtable=jdbc_table_bio,
    user=jdbc_user,
    password=jdbc_password
).load()

athletes_bio_df.show()

+----------+-------------------+------+--------------+------+------+-----------------+-----------+--------------------+--------------------+
|athlete_id|               name|   sex|          born|height|weight|          country|country_noc|         description|       special_notes|
+----------+-------------------+------+--------------+------+------+-----------------+-----------+--------------------+--------------------+
|     65649|       IvankaBonova|Female|    4April1949| 166.0|    55|         Bulgaria|        BUL|PersonalBest40053...|                 nan|
|    112510|   NataliyaUryadova|Female|   15March1977| 184.0|    70|RussianFederation|        RUS|                 nan|ListedinOlympians...|
|    114973|   EssaIsmailRashed|  Male|14December1986| 165.0|    55|            Qatar|        QAT|PersonalBest10000...|ListedinOlympians...|
|     30359|          PterBoros|  Male| 12January1908|      |   nan|          Hungary|        HUN|Between1927and193...|                 nan|
|     50557| 

In [4]:
# Крок 2: Фільтрація даних, де зріст та вага порожні чи не є числами
filtered_bio_df = athletes_bio_df.filter(
    (col("height").isNotNull()) & (col("weight").isNotNull()) &
    (col("height").cast(StringType()).rlike("^[0-9]+(?:\\.[0-9]+)?$")) &
    (col("weight").cast(StringType()).rlike("^[0-9]+(?:\\.[0-9]+)?$"))
)

filtered_bio_df.show()

+----------+-------------------+------+---------------+------+------+-----------------+-----------+--------------------+--------------------+
|athlete_id|               name|   sex|           born|height|weight|          country|country_noc|         description|       special_notes|
+----------+-------------------+------+---------------+------+------+-----------------+-----------+--------------------+--------------------+
|     65649|       IvankaBonova|Female|     4April1949| 166.0|    55|         Bulgaria|        BUL|PersonalBest40053...|                 nan|
|    112510|   NataliyaUryadova|Female|    15March1977| 184.0|    70|RussianFederation|        RUS|                 nan|ListedinOlympians...|
|    114973|   EssaIsmailRashed|  Male| 14December1986| 165.0|    55|            Qatar|        QAT|PersonalBest10000...|ListedinOlympians...|
|    133041|    VincentRiendeau|  Male| 13December1996| 178.0|    68|           Canada|        CAN|                 nan|ListedinOlympians...|
|    1

In [5]:
# Конфігурація Kafka
kafka_config = {
    "bootstrap_servers": ['77.81.230.104:9092'],
    "username": 'admin',
    "password": 'VawEzo1ikLtrA8Ug8THa',
    "security_protocol": 'SASL_PLAINTEXT',
    "sasl_mechanism": 'PLAIN'
}

# Ініціалізація Kafka Admin Client
admin_client = KafkaAdminClient(
    bootstrap_servers=kafka_config['bootstrap_servers'],
    security_protocol=kafka_config['security_protocol'],
    sasl_mechanism=kafka_config['sasl_mechanism'],
    sasl_plain_username=kafka_config['username'],
    sasl_plain_password=kafka_config['password']
)

# Визначення нового топіка
topic_name = "danylob_athlete_event_results"
num_partitions = 1
replication_factor = 1

new_topic = NewTopic(name=topic_name, num_partitions=num_partitions, replication_factor=replication_factor)

# Створення нового топіку
try:
    admin_client.create_topics(new_topics=[new_topic], validate_only=False)
    print(f"Topic '{topic_name}' created successfully.")
except Exception as e:
    print(f"An error occurred: {e}")

# Перевірка списку існуючих топіків
existing_topics = admin_client.list_topics()
print("Список існуючих топіків:", existing_topics)



An error occurred: [Error 36] TopicAlreadyExistsError: Request 'CreateTopicsRequest_v3(create_topic_requests=[(topic='danylob_athlete_event_results', num_partitions=1, replication_factor=1, replica_assignment=[], configs=[])], timeout=30000, validate_only=False)' failed with response 'CreateTopicsResponse_v3(throttle_time_ms=0, topic_errors=[(topic='danylob_athlete_event_results', error_code=36, error_message="Topic 'danylob_athlete_event_results' already exists.")])'.
Список існуючих топіків: ['test-topic', 'kilaru_enriched_athlete_avg', 'yuliia_iot_sensors_data', 'ginger_athlete_event_results', 'edwardprombuildingsensors', 'AS_temperature_alerts', 'm_matviiuk_building_sensors', 'oleg_building_sensors', 'humidity_alerts_volodymyr14', 'kari_morozova_building_sensors', 'kry_temperature_alerts', 'hanna_dunska_input_topic', 'kaplan_m_temperature_alerts', 'kari_shop_topic_1', 'churylov_yevhen_0107_athlete_event_results', 'humidity_alerts_hellcat_topic', 'Vitalii_topic_1', 'iot_alerts', 'co

In [6]:
# Створення Kafka Producer
producer = KafkaProducer(
    bootstrap_servers=kafka_config['bootstrap_servers'],
    security_protocol=kafka_config['security_protocol'],
    sasl_mechanism=kafka_config['sasl_mechanism'],
    sasl_plain_username=kafka_config['username'],
    sasl_plain_password=kafka_config['password'],
    value_serializer=lambda v: json.dumps(v).encode('utf-8'),
    key_serializer=lambda v: json.dumps(v).encode('utf-8')
)



In [7]:
# Крок 3: Зчитування даних з таблиці athlete_event_results і запис в Kafka топік

# Зчитуємо дані з MySQL таблиці athlete_event_results
athlete_event_results_df = spark.read.format('jdbc').options(
    url=jdbc_url,
    driver='com.mysql.cj.jdbc.Driver',
    dbtable=jdbc_table_results,
    user=jdbc_user,
    password=jdbc_password
).load()

athlete_event_results_df.show()

+------------------+----------+-----------+---------+--------------------+---------+--------------+----------+-------+-----+-----------+
|           edition|edition_id|country_noc|    sport|               event|result_id|       athlete|athlete_id|    pos|medal|isTeamSport|
+------------------+----------+-----------+---------+--------------------+---------+--------------+----------+-------+-----+-----------+
|1908SummerOlympics|         5|        ANZ|Athletics|       100metres,Men|    56265|ErnestHutcheon|     64710|    DNS|  nan|      False|
|1908SummerOlympics|         5|        ANZ|Athletics|       400metres,Men|    56313|   HenryMurray|     64756|    DNS|  nan|      False|
|1908SummerOlympics|         5|        ANZ|Athletics|       800metres,Men|    56338|  HarveySutton|     64808| 3h8r12|  nan|      False|
|1908SummerOlympics|         5|        ANZ|Athletics|       800metres,Men|    56338|    GuyHaskins|    922519|    DNS|  nan|      False|
|1908SummerOlympics|         5|        AN

In [13]:
import time
import random
import uuid

# Записуємо перші 1000 записів у Kafka топік danylob_athlete_event_results
for i, row in enumerate(athlete_event_results_df.take(100)):
    try:
        producer.send("danylob_athlete_event_results", key=str(row['athlete_id']), value=row.asDict())
        producer.flush()
        time.sleep(2)
    except Exception as e:
        print(f"An error occurred while sending message {i}: {e}")

# Якщо потрібно, можна перевірити оброблені дані
print("All messages sent successfully.")

All messages sent successfully.


In [14]:
# Читаємо дані з Kafka топіка danylob_athlete_event_results
consumer = KafkaConsumer(
    'danylob_athlete_event_results',
    bootstrap_servers=kafka_config['bootstrap_servers'],
    security_protocol=kafka_config['security_protocol'],
    sasl_mechanism=kafka_config['sasl_mechanism'],
    sasl_plain_username=kafka_config['username'],
    sasl_plain_password=kafka_config['password'],
    value_deserializer=lambda x: json.loads(x.decode('utf-8')),
    auto_offset_reset='earliest',  # Зчитування повідомлень з початку
    enable_auto_commit=True,        # Автоматичне підтвердження зчитаних повідомлень
    group_id='my_consumer_group_3'  # Ідентифікатор групи споживачів
)



In [15]:
# Конвертуємо JSON дані в DataFrame
kafka_data = []
max_records = 100

consumer.subscribe(["danylob_athlete_event_results"])

# Читаємо обмежену кількість повідомлень з Kafka
for _ in range(max_records):
    try:
        message = next(consumer)
        kafka_data.append(message.value)
    except StopIteration:
        break

# Перетворюємо зібрані дані в DataFrame
if kafka_data:
    kafka_df = spark.createDataFrame(kafka_data)
    kafka_df.printSchema()
    kafka_df.show()
else:
    print("No messages received from Kafka.")



root
 |-- athlete: string (nullable = true)
 |-- athlete_id: long (nullable = true)
 |-- country_noc: string (nullable = true)
 |-- edition: string (nullable = true)
 |-- edition_id: long (nullable = true)
 |-- event: string (nullable = true)
 |-- isTeamSport: string (nullable = true)
 |-- medal: string (nullable = true)
 |-- pos: string (nullable = true)
 |-- result_id: long (nullable = true)
 |-- sport: string (nullable = true)

+--------------+----------+-----------+------------------+----------+--------------------+-----------+-----+-------+---------+---------+
|       athlete|athlete_id|country_noc|           edition|edition_id|               event|isTeamSport|medal|    pos|result_id|    sport|
+--------------+----------+-----------+------------------+----------+--------------------+-----------+-----+-------+---------+---------+
|ErnestHutcheon|     64710|        ANZ|1908SummerOlympics|         5|       100metres,Men|      False|  nan|    DNS|    56265|Athletics|
|   HenryMurray| 

In [16]:
# Крок 4: Об'єднання даних з результатами змагань з біологічними даними
joined_df = kafka_df.join(filtered_bio_df, kafka_df.athlete_id == filtered_bio_df.athlete_id, "inner")

In [17]:
joined_df.show()

+----------------+----------+-----------+------------------+----------+--------------------+-----------+------+-------+---------+---------+----------+----------------+----+---------------+------+------+--------------------+-----------+--------------------+--------------------+
|         athlete|athlete_id|country_noc|           edition|edition_id|               event|isTeamSport| medal|    pos|result_id|    sport|athlete_id|            name| sex|           born|height|weight|             country|country_noc|         description|       special_notes|
+----------------+----------+-----------+------------------+----------+--------------------+-----------+------+-------+---------+---------+----------+----------------+----+---------------+------+------+--------------------+-----------+--------------------+--------------------+
|     GeorgeBlake|     64619|        ANZ|1908SummerOlympics|         5|     Lightweight,Men|      False|   nan|    DNS|    21247|   Boxing|     64619|     GeorgeBlake

In [18]:
# Вибір унікальних колонок, щоб уникнути амбігюїті
selected_columns = [
    kafka_df.athlete,
    kafka_df.athlete_id,
    kafka_df.country_noc,  # Вказуємо явно звідки беремо
    kafka_df.edition,
    kafka_df.edition_id,
    kafka_df.event,
    kafka_df.isTeamSport,
    kafka_df.medal,
    kafka_df.pos,
    kafka_df.result_id,
    kafka_df.sport,
    filtered_bio_df.height,
    filtered_bio_df.weight,
    filtered_bio_df.country,
    filtered_bio_df.sex
]

# Створюємо новий DataFrame з вибраними колонками
joined_df = joined_df.select(*selected_columns)

# Знаходимо середній зріст і вагу атлетів
avg_height_weight_df = joined_df.groupBy(
    'sport',
    'medal',
    'sex',
    'country_noc'
).agg(
    avg("height").alias("average_height"),
    avg("weight").alias("average_weight"),
    current_timestamp().alias("calculation_timestamp")
)

avg_height_weight_df.show()

+---------+------+----+-----------+------------------+--------------+---------------------+
|    sport| medal| sex|country_noc|    average_height|average_weight|calculation_timestamp|
+---------+------+----+-----------+------------------+--------------+---------------------+
| Swimming|   nan|Male|        ANZ|             170.0|          65.0| 2025-04-11 15:06:...|
| Swimming|Silver|Male|        ANZ|             170.0|          65.0| 2025-04-11 15:06:...|
|Athletics|Bronze|Male|        ANZ|             184.0|          76.0| 2025-04-11 15:06:...|
| Swimming|Bronze|Male|        ANZ|             170.0|          65.0| 2025-04-11 15:06:...|
|    Rugby|  Gold|Male|        ANZ|             175.0|          75.0| 2025-04-11 15:06:...|
|Athletics|   nan|Male|        ANZ|170.83333333333334|          66.0| 2025-04-11 15:06:...|
|   Boxing|   nan|Male|        ANZ|             167.0|          62.0| 2025-04-11 15:06:...|
+---------+------+----+-----------+------------------+--------------+-----------

In [19]:
def send_to_kafka(batch_df, batch_id):
    # Проходимо по рядках DataFrame за допомогою iterrows для уникнення collect()
    for row in batch_df.toLocalIterator():  # Використовуємо toLocalIterator для ітерації
        # Використовуємо row.asDict(), щоб перетворити рядок у словник
        producer.send("output_topic", value=row.asDict())
    producer.flush()

def write_to_database(batch_df, batch_id):
    if not batch_df.isEmpty():
        batch_df.write.format('jdbc').options(
            url=jdbc_url,
            driver='com.mysql.cj.jdbc.Driver',
            dbtable='average_height_weight',
            user=jdbc_user,
            password=jdbc_password
        ).mode('append').save()

# Використовуємо forEachBatch для стріму
avg_height_weight_df.writeStream \
    .foreachBatch(lambda df, batch_id: send_to_kafka(df, batch_id)) \
    .foreachBatch(lambda df, batch_id: write_to_database(df, batch_id)) \
    .outputMode("update") \
    .start() \
    .awaitTermination()

AnalysisException: [WRITE_STREAM_NOT_ALLOWED] `writeStream` can be called only on streaming Dataset/DataFrame.