In [10]:
import os
os.environ['PYSPARK_SUBMIT_ARGS'] = '--packages org.apache.spark:spark-sql-kafka-0-10_2.12:3.0.0 pyspark-shell'

In [11]:
import logging
import json
from pyspark.sql import SparkSession
from pyspark.sql.types import *
import pyspark.sql.functions as psf

In [12]:
spark = SparkSession \
        .builder \
        .master("local[*]") \
        .appName("KafkaSparkStructuredStreaming1") \
        .config("spark.ui.port", 3000) \
        .getOrCreate()

In [13]:
df = spark \
        .readStream \
        .format("kafka") \
        .option("kafka.bootstrap.servers", "localhost:9092") \
        .option("subscribe", "sanfran.police.calls") \
        .option('startingOffsets', 'earliest') \
        .option('maxOffsetsPerTrigger', 10) \
        .option('maxRatePerPartition', 10) \
        .option('stopGracefullyOnShutdown', "true") \
        .load()

In [14]:
schema = StructType([
    StructField('crime_id', StringType(), True),
    StructField('original_crime_type_name', StringType(), True),
    StructField('report_date', StringType(), True),
    StructField('call_date', StringType(), True),
    StructField('offense_date', StringType(), True),
    StructField('call_time', StringType(), True),
    StructField('call_date_time', TimestampType(), True),
    StructField('disposition', StringType(), True),
    StructField('address', StringType(), True),
    StructField('city', StringType(), True),
    StructField('state', StringType(), True),
    StructField('agency_id', StringType(), True),
    StructField('address_type', StringType(), True),
    StructField('common_location', StringType(), True),
])

In [15]:
df.printSchema()

root
 |-- key: binary (nullable = true)
 |-- value: binary (nullable = true)
 |-- topic: string (nullable = true)
 |-- partition: integer (nullable = true)
 |-- offset: long (nullable = true)
 |-- timestamp: timestamp (nullable = true)
 |-- timestampType: integer (nullable = true)



In [16]:
kafka_df = df.selectExpr("CAST(value as string)")

In [17]:
service_table = kafka_df\
        .select(psf.from_json(psf.col('value'), schema).alias("DF"))\
        .select("DF.*")

In [31]:
distinct_table = service_table \
        .select("original_crime_type_name", "disposition", "call_date_time") \
        .distinct() \
        .withWatermark("call_date_time", "1 minute")

In [19]:
agg_df = distinct_table \
        .dropna() \
        .select("original_crime_type_name") \
        .groupby("original_crime_type_name") \
        .agg({"original_crime_type_name": "count"}) \
        .orderBy("count(original_crime_type_name)", ascending=True)

In [38]:
query = agg_df \
        .writeStream \
        .format("console") \
        .outputMode("Complete") \
        .start()


In [45]:
import time

In [47]:

query = agg_df \
        .writeStream \
        .queryName('fights') \
        .format("memory") \
        .outputMode("complete") \
        .start()

for x in range(5):
  spark.sql("select * from fights").show()
  time.sleep(10)

IllegalArgumentException: Cannot start query with name fights as a query with that name is already active in this SparkSession

In [None]:
radio_code_json_filepath = "radio_code.json"
    radio_code_df = spark.read.json(radio_code_json_filepath)

    radio_code_df = radio_code_df.withColumnRenamed("disposition_code", "disposition")

    join_query = agg_df.join(radio_code_df, col("agg_df.disposition") == col("radio_code_df.disposition"), "left_outer")

    join_query.awaitTermination()
