In [None]:
#importing the jar files and SparkSession

from pyspark.sql import SparkSession
import pyspark
import os
os.environ['PYSPARK_SUBMIT_ARGS'] = '--jars ./abris_2.12-3.2.0.jar --packages org.apache.spark:spark-sql-kafka-0-10_2.12:3.2.0 pyspark-shell'

In [None]:
# Creating the SparkSession

spark = SparkSession \
    .builder \
    .config("spark.executor.memory", "20g")\
    .config("spark.driver.memory", "20g")\
    .config("spark.memory.offHeap.enabled","true")\
    .config("spark.memory.offHeap.size","20g")\
    .appName("my_pyspark_kafka") \
    .getOrCreate()
print(spark)

In [None]:
# Importing data from kafka topic into spark data frame


readDF = spark \
.readStream \
.format("kafka") \
.option("kafka.bootstrap.servers","broker:29092") \
.option("subscribe","newtopic") \
.option("startingOffsets","earliest") \
.load() \
.selectExpr("CAST(value AS STRING)")



In [None]:
# resultDF = readDF \
# .writeStream \
# .format("console") \
# .outputMode("append") \
# .option("checkpointLocation", "checkpoint/")\
# .option("truncate", "false")\
# .option("numRows",20) \
# .start().awaitTermination()

In [None]:
# Summons_Number,Plate_ID,Registration_State,Plate_Type,Issue_Date,Violation_Code,Vehicle_Body_Type,Vehicle_Make,Street_Code1,Street_Code2,Street_Code3,Violation_Precinct,Issuer_Precinct,Violation_Time,Violation_County,House_Number,Street_Name,Intersecting_Street,Vehicle_Year

In [None]:
# Importing sql funtion adn types library for type casting and split function

from pyspark.sql.functions import *
from pyspark.sql.types import *


# Defining the schema and changing the values from json to struct format for processing in SQL, as it uses SQL Struct types

# from pyspark.sql.functions import from_json, col
# from pyspark.sql.types import StructType, StructField, StringType

schema = StructType(
    [
        StructField('Summons_Number', StringType(), True),
        StructField('Plate_ID', StringType(), True),
        StructField('Registration_State', StringType(), True),
        StructField('Plate_Type', StringType(), True),
        StructField('Issue_Date', StringType(), True),
        StructField('Violation_Code', StringType(), True),
        StructField('Vehicle_Body_Type', StringType(), True),
        StructField('Vehicle_Make', StringType(), True),
        StructField('Street_Code1', StringType(), True),
        StructField('Street_Code2', StringType(), True),
        StructField('Street_Code3', StringType(), True),
        StructField('Violation_Precinct', StringType(), True),
        StructField('Issuer_Precinct', StringType(), True),
        StructField('Violation_Time', StringType(), True),
        StructField('Violation_County', StringType(), True),
        StructField('Street_Name', StringType(), True),
        StructField('Vehicle_Year', StringType(), True),
    ]
)

readDF.withColumn("value", from_json("value", schema))\
    .select(col('value.*'))

In [None]:
readDF.printSchema()

In [None]:
# values are like this at this point of time Struct{Zip=601,Latitude=18.18005....}
# Seaparting the values using "," here

newDF= readDF.withColumn("Summons_Number", split(col("value"), ",").getItem(0))\
.withColumn("Plate_ID", split(col("value"), ",").getItem(1))\
.withColumn("Registration_State", split(col("value"), ",").getItem(2))\
.withColumn("Plate_Type", split(col("value"), ",").getItem(3))\
.withColumn("Issue_Date", split(col("value"), ",").getItem(4))\
.withColumn("Violation_Code", split(col("value"), ",").getItem(5))\
.withColumn("Vehicle_Body_Type", split(col("value"), ",").getItem(6))\
.withColumn("Vehicle_Make", split(col("value"), ",").getItem(7))\
.withColumn("Street_Code1", split(col("value"), ",").getItem(8))\
.withColumn("Street_Code2", split(col("value"), ",").getItem(9))\
.withColumn("Street_Code3", split(col("value"), ",").getItem(10))\
.withColumn("Violation_Precinct", split(col("value"), ",").getItem(11))\
.withColumn("Issuer_Precinct", split(col("value"), ",").getItem(12))\
.withColumn("Violation_Time", split(col("value"), ",").getItem(13))\
.withColumn("Violation_County", split(col("value"), ",").getItem(14))\
.withColumn("Street_Name", split(col("value"), ",").getItem(15))\
.withColumn("Vehicle_Year", split(col("value"), ",").getItem(16))


In [None]:
# {Summons_Number=1447171287|Plate_ID=GYU6351|Registration_State=NY|Plate_Type=PAS|Issue_Date=2019-07-03T00:00:00.000|
# Violation_Code=20|Vehicle_Body_Type=SDN |Vehicle_Make=HONDA|Street_Code1=36050|Street_Code2=10610|Street_Code3=10810|
# Violation_Precinct=28 |Issuer_Precinct=28 |Violation_Time=0918A|Violation_County=NY|Street_Name=W 114 ST           |
# Vehicle_Year=2009}




# # Separating the value using "=" to get the desired value

DF = newDF.withColumn("Summons_Number", split(col("Summons_Number"), "=").getItem(1))\
.withColumn("Plate_ID", split(col("Plate_ID"), "=").getItem(1))\
.withColumn("Registration_State", split(col("Registration_State"), "=").getItem(1))\
.withColumn("Plate_Type", split(col("Plate_Type"), "=").getItem(1))\
.withColumn("Issue_Date", split(col("Issue_Date"), "=").getItem(1))\
.withColumn("Violation_Code", split(col("Violation_Code"), "=").getItem(1))\
.withColumn("Vehicle_Body_Type", split(col("Vehicle_Body_Type"), "=").getItem(1))\
.withColumn("Vehicle_Make", split(col("Vehicle_Make"), "=").getItem(1))\
.withColumn("Street_Code1", split(col("Street_Code1"), "=").getItem(1))\
.withColumn("Street_Code2", split(col("Street_Code2"), "=").getItem(1))\
.withColumn("Street_Code3", split(col("Street_Code3"), "=").getItem(1))\
.withColumn("Violation_Precinct", split(col("Violation_Precinct"), "=").getItem(1))\
.withColumn("Issuer_Precinct", split(col("Issuer_Precinct"), "=").getItem(1))\
.withColumn("Violation_Time", split(col("Violation_Time"), "=").getItem(1))\
.withColumn("Violation_County", split(col("Violation_County"), "=").getItem(1))\
.withColumn("Street_Name", split(col("Street_Name"), "=").getItem(1))\
.withColumn("Vehicle_Year", split(col("Vehicle_Year"), "=").getItem(1))\
.withColumn("Vehicle_Year",split(col("Vehicle_Year"),"}").getItem(0)) #this is bcz I was getting } at the end of each value in this column


In [None]:
# select the columns
# finalDF = DF.select("Summons_Number","Plate_ID","Registration_State","Plate_Type","Issue_Date","Violation_Code","Vehicle_Body_Type","Vehicle_Make","Street_Code1","Street_Code2","Street_Code3","Violation_Precinct","Issuer_Precinct","Violation_Time","Violation_County","Street_Name","Vehicle_Year")

In [None]:
# "Summons_Number:int64,Violation_Code:int32,Street_Code1:int32,Street_Code2:int32,
# Street_Code3:int32,Violation_Precinct:int32,Issuer_Precinct:int32"

In [None]:
finalDF = DF.selectExpr("CAST(Summons_Number AS INT)",\
                        "CAST(Plate_ID AS STRING)",\
                        "CAST(Registration_State AS STRING)",\
                        "CAST(Plate_Type AS STRING)",\
                        "CAST(Issue_Date AS STRING)",\
                        "CAST(Violation_Code AS INT)",\
                        "CAST(Vehicle_Body_Type AS STRING)",\
                        "CAST(Vehicle_Make AS STRING)",\
                        "CAST(Street_Code1 AS INT)","CAST(Street_Code2 AS INT)","CAST(Street_Code3 AS INT)",\
                        "CAST(Violation_Precinct AS INT)",\
                        "CAST(Issuer_Precinct AS INT)",\
                        "CAST(Violation_Time AS STRING)",\
                        "CAST(Violation_County AS STRING)",\
                        "CAST(Street_Name AS STRING)",\
                        "CAST(Vehicle_Year AS INT)")


In [None]:
# resultDF = finalDF \
# .writeStream \
# .format("console") \
# .outputMode("append") \
# .option("checkpointLocation", "checkpoint/")\
# .option("truncate", "false")\
# .option("numRows",20) \
# .start().awaitTermination()

In [None]:
# Trial run

resultDF = finalDF\
    .writeStream\
    .format("csv")\
    .option("format", "append")\
    .option("header","true")\
    .option("startingOffsets","earliest")\
    .option("checkpointLocation", "checkpoint3/")\
    .option("path", "kafka_csv_data/")\
    .outputMode("append") \
    .start()