In [1]:
!pip install gradio
!pip install datasets

Collecting gradio
  Downloading gradio-3.23.0-py3-none-any.whl (15.8 MB)
[K     |████████████████████████████████| 15.8 MB 470 kB/s eta 0:00:01     |████████████████████████████▏   | 13.9 MB 6.4 MB/s eta 0:00:01
[?25hCollecting pydantic
  Downloading pydantic-1.10.7-cp39-cp39-manylinux_2_17_x86_64.manylinux2014_x86_64.whl (3.2 MB)
[K     |████████████████████████████████| 3.2 MB 2.9 MB/s eta 0:00:01
Collecting httpx
  Downloading httpx-0.23.3-py3-none-any.whl (71 kB)
[K     |████████████████████████████████| 71 kB 2.2 MB/s eta 0:00:01
[?25hCollecting python-multipart
  Downloading python_multipart-0.0.6-py3-none-any.whl (45 kB)
[K     |████████████████████████████████| 45 kB 1.2 MB/s eta 0:00:01
[?25hCollecting uvicorn
  Downloading uvicorn-0.21.1-py3-none-any.whl (57 kB)
[K     |████████████████████████████████| 57 kB 1.4 MB/s eta 0:00:01
[?25hCollecting markdown-it-py[linkify]>=2.0.0
  Downloading markdown_it_py-2.2.0-py3-none-any.whl (84 kB)
[K     |███████████████████████

In [161]:
from pyspark.sql.functions import from_json
from pyspark.sql.types import StructType, StructField, BooleanType, LongType, IntegerType,StringType
from pyspark.sql.types import *
import pandas as pd

# Event data schema
schema_pv = StructType(
    [StructField("Summons Number",LongType(),True),
     StructField("Plate ID",StringType(),True),
     StructField("Registration State",StringType(),True),
     StructField("Plate Type",StringType(),True),
     StructField("Issue Date",StringType(),True),
     StructField("Violation Code",IntegerType(),True),
     StructField("Vehicle Body Type",StringType(),True),
     StructField("Vehicle Make",StringType(),True),
     StructField("Issuing Agency",StringType(),True),
     StructField("Street Code1",IntegerType(),True),
     StructField("Street Code2",IntegerType(),True),
     StructField("Street Code3",IntegerType(),True),
     StructField("Vehicle Expiration Date",IntegerType(),True),
     StructField("Violation Location",IntegerType(),True),
     StructField("Violation Precinct",IntegerType(),True),
     StructField("Issuer Precinct",IntegerType(),True),
     StructField("Issuer Code",IntegerType(),True),
     StructField("Issuer Command",StringType(),True),
     StructField("Issuer Squad",StringType(),True),
     StructField("Violation Time",StringType(),True),
     StructField("Time First Observed",StringType(),True),
     StructField("Violation County",StringType(),True),
     StructField("Violation In Front Of Or Opposite",StringType(),True),
     StructField("House Number",StringType(),True),
     StructField("Street Name",StringType(),True),
     StructField("Intersecting Street",StringType(),True),
     StructField("Date First Observed",IntegerType(),True),
     StructField("Law Section",IntegerType(),True),
     StructField("Sub Division",StringType(),True),
     StructField("Violation Legal Code",StringType(),True),	 
     StructField("Days Parking In Effect",StringType(),True),
     StructField("From Hours In Effect",StringType(),True),
     StructField("To Hours In Effect",StringType(),True),
     StructField("Vehicle Color",StringType(),True),
     StructField("Unregistered Vehicle?",IntegerType(),True),
     StructField("Vehicle Year",StringType(),True),
     StructField("Meter Number",StringType(),True),
     StructField("Feet From Curb",IntegerType(),True),
     StructField("Violation Post Code",StringType(),True),
     StructField("Violation Description",StringType(),True),
     StructField("No Standing or Stopping Violation",StringType(),True),
     StructField("Hydrant Violation",StringType(),True),
     StructField("Double Parking Violation",StringType(),True),
     StructField("Latitude",StringType(),True),
     StructField("Longitude",StringType(),True),
     StructField("Community Board",StringType(),True),
     StructField("Community Council",StringType(),True),
     StructField("Census Tract",StringType(),True),
     StructField("BIN",StringType(),True),
     StructField("BBL",StringType(),True),
     StructField("NTA",StringType(),True)])


In [157]:
columns_selected = ["Registration State","Plate Type",\
                "Violation Code", "Vehicle Body Type","Vehicle Make","Issuing Agency", "Street Code1", \
               "Street Code2","Street Code3","Violation Location","Violation Precinct", \
               "Issuer Precinct","Issuer Code","Issuer Command",\
               "Violation County","Law Section","Sub Division","Vehicle Color"]

In [171]:
from pyspark.sql import SparkSession
from pyspark.ml import PipelineModel
from pyspark.ml.feature import VectorAssembler, VectorIndexer, OneHotEncoder, StringIndexer
import pyspark.sql.types as T
import pyspark.sql.functions as F
from pyspark.sql.functions import isnull, when, count, col, isnan

def preprocess_data(df,pm):
#     df.printSchema()
#     df.count(), len(df.columns)
    df = df.select(columns_selected)

    # clean up the data as many have incorrect values.
    df = df[(df['Registration State'] != "99") \
        & (df['Plate Type'] != "999") \
        & (df['Violation Code'] != 0)]
    # clean up the data
    # Check if the null value still exist
#     df.select([count(when(isnull(c), c)).alias(c) for c in df.columns]).show()

    df = df.na.drop()
    df.dropDuplicates()

    # convert to required type
    cols = [F.col(field[0]).cast('double') if (field[1] == 'int') else F.col(field[0]) for field in df.dtypes]
    df = df.select(cols)

    #use model to transform
#     pm = PipelineModel.load("../pretrained_models/va_model")

    df = pm.transform(df)
    return df


In [172]:
def predict_data(df,pr_model):
      #use ml model to predict
#     pred_model = PipelineModel.load("../pretrained_models/rf_model")
    data = df.select(F.col("features_scaled").alias("features"))
    # use the PipelineModel object to perform prediciton on  data. 
    prediction = pred_model.transform(data)

    # print the results
    # prediction.select('label','prediction','Violation_Location').show(5)
    prediction.select('prediction','Violation_Location').show(10)
    return prediction

In [173]:
from pyspark.sql import SparkSession
from pyspark.sql.types import StringType

# Spark session & context
spark = (SparkSession
         .builder
         .master('local')
         .appName('NY PV app')
         .config("spark.jars.packages", "org.apache.spark:spark-sql-kafka-0-10_2.12:3.1.1")         
         .getOrCreate())
sc = spark.sparkContext
# print(spark)

df = spark \
  .readStream \
  .format("kafka") \
  .option("kafka.bootstrap.servers", "kafka-server:9092") \
  .option("subscribe", "park.violation") \
  .option("includeHeaders", "true") \
  .option("startingOffsets", "latest") \
  .load()

df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
raw_path = "pv-data"
checkpoint_path = "pv-data-checkpoint"

# Following code is Working to save json files. 
# queryStream =(
#     df_pv
#     .writeStream
#     .format("json")
#     .queryName("pv_data_ingestion")
#     .option("checkpointLocation", checkpoint_path)
#     .option("path", raw_path)
#     .outputMode("append")
#     .start())

# !@@# The data is not getting converted to desired type with schema_pv. The reason coul dbe gthe data is incorect
# Example Integer type has 8 a well as 8.0. And conversion fails. 
queryMem = df.select(from_json(col("value").cast("string"),schema_pv).alias("data")).select("data.*") \
    .writeStream \
    .format("memory") \
    .queryName("ny_pv_count") \
    .outputMode("update") \
    .start()

# consoleOut = df_pv \
#     .writeStream \
#     .trigger(processingTime='5 seconds') \
#     .format("console") \
#     .queryName("ny_pv_console") \
#     .outputMode("update") \
#     .start()

# consoleOut.awaitTermination()


In [152]:
# from pyspark.sql import SparkSession
# from pyspark.sql.types import StringType

# df = spark \
#   .readStream \
#   .format("kafka") \
#   .option("kafka.bootstrap.servers", "kafka-server:9092") \
#   .option("subscribe", "park.violation") \
#   .option("includeHeaders", "true") \
#   .option("startingOffsets", "latest") \
#   .load()\
#   .select(from_json(col("value").cast("string"),schema_pv).alias("data")).select("data.*") \
#   .writeStream \
#   .format("memory") \
#   .queryName("ny_pv_count") \
#   .start()


In [153]:
# Check active streams
for s in spark.streams.active:
    print("ID:{} | NAME:{}".format(s.id, s.name))

ID:065b3353-1d8f-4a2e-9b23-4211e06c1fbc | NAME:ny_pv_count


# Following will consume data as kafka subscriber and later use spark MLlib to predict the streaming data

In [174]:
from kafka import KafkaAdminClient, KafkaConsumer, KafkaProducer
import sys
import json
from json import loads
import pandas as pd
from pyspark.sql import SparkSession
from pyspark.sql.types import StringType
import gradio as gr
import joblib
import datasets

# Spark session & context
spark = (SparkSession
         .builder
         .master('local')
         .appName('NY PV app')
         .config("spark.jars.packages", "org.apache.spark:spark-sql-kafka-0-10_2.12:3.1.1")         
         .getOrCreate())
sc = spark.sparkContext

### Setting up the Python consumer
bootstrap_servers = ['kafka-server:9092']
topicName = 'park.violation'
consumer = KafkaConsumer (topicName, group_id = 'id1',bootstrap_servers = bootstrap_servers,
    auto_offset_reset = 'earliest',value_deserializer=lambda x: loads(x.decode('utf-8')))  ## You can also set it as latest

#use model to transform
pm = PipelineModel.load("../pretrained_models/va_model")
# use pre trained model to infer
pred_model = PipelineModel.load("../pretrained_models/rf_model")

### Reading the message from consumer
# Poll for new messages from Kafka and print them.
try:
    while True:
        msg = consumer.poll(5.0)
        if msg is None:
            print("Waiting...")
#         elif msg.error():
#             print("ERROR: %s".format(msg.error()))
        else:
            print("reading msg...")
            for message in consumer:
                df = pd.json_normalize(message.value)
                df[['Summons Number','Violation Code', 'Street Code1', 'Street Code2', 'Street Code3', 'Vehicle Expiration Date', 'Violation Precinct','Violation Location','Issuer Precinct','Date First Observed','Issuer Code','Unregistered Vehicle?','Feet From Curb','Law Section']] = \
                df[['Summons Number','Violation Code', 'Street Code1', 'Street Code2', 'Street Code3', 'Vehicle Expiration Date', 'Violation Precinct','Violation Location','Issuer Precinct','Date First Observed','Issuer Code','Unregistered Vehicle?','Feet From Curb','Law Section']].apply(pd.to_numeric)
                spark_df = spark.createDataFrame(data=df,schema=schema_pv)
                print('Pre processing data')
                vec_df = preprocess_data(spark_df,pm)
                print('Predicting live data..')
                pr = predict_data(vec_df,pred_model).toPandas()
                print('Expected Violation Location: '),spark_df.select('Violation Location').show(1)
    pass
finally:
    # Leave group and commit final offsets
    consumer.close()
    spark.stop()

reading msg...
Pre processing data
Predicting live data..
+----------+------------------+
|prediction|Violation_Location|
+----------+------------------+
|       1.0|               1.0|
+----------+------------------+

Expected Violation Location: 
+------------------+
|Violation Location|
+------------------+
|                 1|
+------------------+

Pre processing data
Predicting live data..
+----------+------------------+
|prediction|Violation_Location|
+----------+------------------+
|       9.0|              84.0|
+----------+------------------+

Expected Violation Location: 
+------------------+
|Violation Location|
+------------------+
|                71|
+------------------+

Pre processing data
Predicting live data..
+----------+------------------+
|prediction|Violation_Location|
+----------+------------------+
|       9.0|              84.0|
+----------+------------------+

Expected Violation Location: 
+------------------+
|Violation Location|
+------------------+
|       

TypeError: field Violation Location: IntegerType can not accept object nan in type <class 'float'>

In [155]:
from time import sleep
from IPython.display import clear_output
import pandas as pd
from pyspark.sql.functions import from_json

#use model to transform
pm = PipelineModel.load("../pretrained_models/va_model")
# use pre trained model to infer
pred_model = PipelineModel.load("../pretrained_models/rf_model")

# Count rows every 1 seconds while stream is active
try:
    i=1
    # While stream is active, print count
    while len(spark.streams.active) > 0:        
        # Clear output
        clear_output(wait=True)
        print("Run:{}".format(i))
        lst_queries=[]
        for s in spark.streams.active:
            lst_queries.append(s.name)
            
        if "ny_pv_count" in lst_queries:
            # Count number of events            
            df = spark.sql("select * from ny_pv_count")
            print('Calling preprocess_data')
            vec_df = preprocess_data(spark_df,pm)
            print('Calling predict_data')
            pr = predict_data(vec_df,pred_model).toPandas()
            print(pr['Violation_Location'])
        sleep(1)
        i=i+1
        
except KeyboardInterrupt:
    queryMem.stop()    
    print("stream process interrupted")

Run:1
root
 |-- Summons Number: string (nullable = true)
 |-- Plate ID: string (nullable = true)
 |-- Registration State: string (nullable = true)
 |-- Plate Type: string (nullable = true)
 |-- Issue Date: string (nullable = true)
 |-- Violation Code: string (nullable = true)
 |-- Vehicle Body Type: string (nullable = true)
 |-- Vehicle Make: string (nullable = true)
 |-- Issuing Agency: string (nullable = true)
 |-- Street Code1: string (nullable = true)
 |-- Street Code2: string (nullable = true)
 |-- Street Code3: string (nullable = true)
 |-- Vehicle Expiration Date: string (nullable = true)
 |-- Violation Location: string (nullable = true)
 |-- Violation Precinct: string (nullable = true)
 |-- Issuer Precinct: string (nullable = true)
 |-- Issuer Code: string (nullable = true)
 |-- Issuer Command: string (nullable = true)
 |-- Issuer Squad: string (nullable = true)
 |-- Violation Time: string (nullable = true)
 |-- Time First Observed: string (nullable = true)
 |-- Violation Count

TypeError: field Summons Number: LongType can not accept object '1298916276' in type <class 'str'>

In [146]:
spark.stop()

In [None]:
%qtconsole

In [None]:
%connect_info