In [1]:
from pyspark.sql import SparkSession

In [2]:
spark = SparkSession.builder.master('local[*]').appName('traffic-streaming') \
            .config("spark.jars.packages", "org.apache.spark:spark-sql-kafka-0-10_2.12:3.3.0") \
                .config('spark.jars','/home/jovyan/new_stream/jar/mysql-connector-j-9.3.0.jar') \
                    .getOrCreate()

In [3]:
df = (spark
        .readStream
          .format('kafka')
            .option('kafka.bootstrap.servers','ed-kafka:29092')
              .option('Subscribe','trafic-data')
                .option('startingOffset','earliest')
                  .load()
     )

In [4]:
from pyspark.sql.types import StructType,MapType,ArrayType,IntegerType,StringType,DecimalType,StructField,TimestampType,DoubleType

schema = StructType([
    StructField("vehicle_id", StringType(),False),
    StructField("timestamp", StringType(),False),
    StructField("speed_kmh", IntegerType(),True),
    StructField("recent_speeds", ArrayType(IntegerType()),True),
    StructField("location", StructType([
        StructField("lat", DoubleType(),True),
        StructField("lon", DoubleType(),True)
    ]),True),
    StructField("road_id", StringType(),True),
    StructField("nearby_sensors", ArrayType(
        StructType([
            StructField("sensor_id", StringType(),True),
            StructField("signal_strength", IntegerType(),True)
        ])
    ),True)
])


In [5]:
from pyspark.sql.functions import from_json,explode,col

df_extracted = df.withColumn('value',from_json(col('value').cast('string'),schema)).select('value.*')

In [6]:
df_extracted = df_extracted.withColumn('nearby_sensors',explode(col('nearby_sensors'))).select('*','nearby_sensors.*').drop('nearby_sensors')

In [7]:
df_recent_speed = df_extracted.withColumn('recent_speeds',explode(col('recent_speeds')))

In [8]:
final_df = df_recent_speed.withColumn('lat',col('location.lat'))\
                    .withColumn('lon',col('location.lon')).drop('location') \
                        .select('vehicle_id','speed_kmh','recent_speeds','road_id','sensor_id','signal_strength','lat','lon','timestamp')\
                            .withColumnRenamed('timestamp','time')

In [None]:
from dotenv import load_dotenv
import os

load_dotenv('secret.env')  

URL = os.getenv("DATABASE_URL")
USER = os.getenv("USERNAME")
PASSWORD = os.getenv("PASS")


def write_to_mysql(df,batch_id):

    df.write.format('jdbc') \
            .option('url',f'{URL}') \
                .option('dbtable','vehicle_data') \
                    .option('user',f'{USER}') \
                        .option('password',f'{PASSWORD}') \
                            .option('driver','com.mysql.cj.jdbc.Driver') \
                                .mode('append') \
                                    .save()
    

query = final_df.writeStream \
    .foreachBatch(write_to_mysql) \
    .outputMode("append")  \
    .option('checkpointLocation','/tmp/checkpoint_stream') \
    .trigger(processingTime='5 second') \
    .start()

query.awaitTermination()
    