In [2]:
import pandas as pd
import numpy as np 
import matplotlib.pyplot as plt
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, from_json, decode

In [3]:
spark = SparkSession.builder \
    .appName("first_app") \
    .config("spark.jars.packages", "org.apache.spark:spark-sql-kafka-0-10_2.12:3.3.0") \
    .getOrCreate()

In [4]:
df = spark \
  .read \
  .format("kafka") \
  .option("kafka.bootstrap.servers", "44.201.154.178:9092") \
  .option("subscribe", "health_events") \
  .option("startingOffsets", "earliest") \
  .load()

df = df.withColumn("decoded_value", decode(col("value"), "UTF-8"))

In [5]:
df.printSchema()

root
 |-- key: binary (nullable = true)
 |-- value: binary (nullable = true)
 |-- topic: string (nullable = true)
 |-- partition: integer (nullable = true)
 |-- offset: long (nullable = true)
 |-- timestamp: timestamp (nullable = true)
 |-- timestampType: integer (nullable = true)
 |-- decoded_value: string (nullable = true)



In [6]:
from pyspark.sql.types import StructType, StructField, StringType, TimestampType, IntegerType
json_schema = StructType([
    StructField("EventType", StringType(), True),   # Assuming EventType is a string that can be nullable
    StructField("TimeStamp", TimestampType(), True),  # Assuming TimeStamp is a datetime, nullable
    StructField("Location", StringType(), True),    # Assuming Location is a string that can be nullable
    StructField("Severity", IntegerType(), True),   # Assuming Severity is an integer, nullable
    StructField("Details", StringType(), True)      # Assuming Details is a string that can be nullable
])


In [7]:

parsed_df = df.select(
    col("key").cast("string"),
    from_json(decode(col("value"), "UTF-8"), json_schema).alias("data"),
    "topic",
    "partition",
    "offset",
    "timestamp",
    "timestampType"
)

In [8]:
from pyspark.sql.functions import col, from_json

parsed_df = df.select(
    col("key").cast("string"),
    from_json(col("decoded_value").cast("string"), json_schema).alias("data"),
    "topic",
    "partition",
    "offset",
    "timestamp",
    "timestampType"
)


In [9]:
exploded_df = parsed_df.select(
    "key",
    "data.EventType",
    "data.TimeStamp",
    "data.Location",
    "data.Severity",
    "data.Details",
    "topic",
    "partition",
    "offset",
    "timestamp",
    "timestampType"
)
exploded_df.show(truncate=False)

+----+---------------------+---------+-----------+--------+------------------------------------------------+-------------+---------+------+-----------------------+-------------+
|key |EventType            |TimeStamp|Location   |Severity|Details                                         |topic        |partition|offset|timestamp              |timestampType|
+----+---------------------+---------+-----------+--------+------------------------------------------------+-------------+---------+------+-----------------------+-------------+
|null|emergency_incident   |null     |Paris      |null    |This is a simulated emergency_incident event.   |health_events|0        |52360 |2024-04-10 02:14:41.884|0            |
|null|emergency_incident   |null     |London     |null    |This is a simulated emergency_incident event.   |health_events|0        |52361 |2024-04-10 02:14:47.884|0            |
|null|routine_checkup      |null     |Berlin     |null    |This is a simulated routine_checkup event.      |he

In [10]:
pandas_df = exploded_df.toPandas()

  series = series.astype(t, copy=False)


In [12]:
pandas_df = pandas_df.drop_duplicates()