In [25]:
import logging
import json
from pyspark.sql import SparkSession
from pyspark.sql.types import *
import pyspark.sql.functions as psf

In [26]:
schema = StructType([
    StructField("crime_id", StringType(), True),
    StructField("original_crime_type_name", StringType(), True),
    StructField("report_date", StringType(), True),
    StructField("call_date", StringType(), True),
    StructField("offense_date", StringType(), True),
    StructField("call_time", StringType(), True),
    StructField("call_date_time", StringType(), True),
    StructField("disposition", StringType(), True),
    StructField("address", StringType(), True),
    StructField("city", StringType(), True),
    StructField("state", StringType(), True),
    StructField("agency_id", StringType(), True),
    StructField("address_type", StringType(), True),
    StructField("common_location", StringType(), True)    
])

In [27]:
    spark = SparkSession \
        .builder \
        .master("local[*]") \
        .appName("KafkaSparkStructuredStreaming") \
        .getOrCreate()

In [28]:
    df = spark.read.option("multiline","true") \
      .json("./police-department-calls-for-service.json")

In [29]:
    df.printSchema()

root
 |-- address: string (nullable = true)
 |-- address_type: string (nullable = true)
 |-- agency_id: string (nullable = true)
 |-- call_date: string (nullable = true)
 |-- call_date_time: string (nullable = true)
 |-- call_time: string (nullable = true)
 |-- city: string (nullable = true)
 |-- common_location: string (nullable = true)
 |-- crime_id: string (nullable = true)
 |-- disposition: string (nullable = true)
 |-- offense_date: string (nullable = true)
 |-- original_crime_type_name: string (nullable = true)
 |-- report_date: string (nullable = true)
 |-- state: string (nullable = true)



In [30]:
    kafka_df = df#.selectExpr("CAST(value AS STRING)")

    service_table = kafka_df
    
    service_table.printSchema()

root
 |-- address: string (nullable = true)
 |-- address_type: string (nullable = true)
 |-- agency_id: string (nullable = true)
 |-- call_date: string (nullable = true)
 |-- call_date_time: string (nullable = true)
 |-- call_time: string (nullable = true)
 |-- city: string (nullable = true)
 |-- common_location: string (nullable = true)
 |-- crime_id: string (nullable = true)
 |-- disposition: string (nullable = true)
 |-- offense_date: string (nullable = true)
 |-- original_crime_type_name: string (nullable = true)
 |-- report_date: string (nullable = true)
 |-- state: string (nullable = true)



In [32]:
    distinct_table = service_table.select(
        service_table.call_date_time,
        service_table.original_crime_type_name,
        service_table.disposition
    ) \
    .distinct()
    
    distinct_table.printSchema()
    
    distinct_table.show(30, False)

root
 |-- call_date_time: string (nullable = true)
 |-- original_crime_type_name: string (nullable = true)
 |-- disposition: string (nullable = true)

+--------------------+------------------------+------------+
|      call_date_time|original_crime_type_name| disposition|
+--------------------+------------------------+------------+
|2018-12-31T22:53:...|      Auto Boost / Strip|         REP|
|2018-12-31T17:10:...|         Muni Inspection|Not recorded|
|2018-12-31T17:07:...|         Muni Inspection|Not recorded|
|2018-12-31T16:47:...|            Passing Call|Not recorded|
|2018-12-31T16:38:...|                  Family|         HAN|
|2018-12-31T13:38:...|                  7.2.27|         CAN|
|2018-12-31T13:37:...|      Suspicious Vehicle|          ND|
|2018-12-31T13:17:...|            Passing Call|         HAN|
|2018-12-31T11:32:...|          Stolen Vehicle|         REP|
|2018-12-31T08:31:...|             Grand Theft|         UTL|
|2018-12-31T07:51:...|           Audible Alarm|         

In [37]:
    agg_df = distinct_table.select(
        distinct_table.call_date_time,
        distinct_table.original_crime_type_name, 
        distinct_table.disposition
    ) \
    .withWatermark("call_date_time", "5 minutes") \
    .groupby(psf.window(distinct_table.call_date_time,"30 minutes","1 minute"), distinct_table.original_crime_type_name, distinct_table.disposition) \
    .count() \
    .orderBy("count", ascending=False)
    
    agg_df.printSchema()
    
    agg_df.show(30, False)

root
 |-- window: struct (nullable = true)
 |    |-- start: timestamp (nullable = true)
 |    |-- end: timestamp (nullable = true)
 |-- original_crime_type_name: string (nullable = true)
 |-- disposition: string (nullable = true)
 |-- count: long (nullable = false)

+------------------------------------------+------------------------+-----------+-----+
|window                                    |original_crime_type_name|disposition|count|
+------------------------------------------+------------------------+-----------+-----+
|[2018-11-30 15:15:00, 2018-11-30 15:45:00]|Passing Call            |HAN        |24   |
|[2018-10-30 18:10:00, 2018-10-30 18:40:00]|Passing Call            |HAN        |23   |
|[2018-11-30 15:16:00, 2018-11-30 15:46:00]|Passing Call            |HAN        |23   |
|[2018-11-30 15:17:00, 2018-11-30 15:47:00]|Passing Call            |HAN        |23   |
|[2018-11-30 15:14:00, 2018-11-30 15:44:00]|Passing Call            |HAN        |23   |
|[2018-11-30 15:18:00, 2018-1

In [43]:
    radio_code_json_filepath = "./radio_code.json"
    radio_code_df = spark.read.option("multiline","true").json(radio_code_json_filepath)

    # clean up your data so that the column names match on radio_code_df and agg_df
    # we will want to join on the disposition code

    # TODO rename disposition_code column to disposition
    radio_code_df = radio_code_df.withColumnRenamed("disposition_code", "disposition")
    
    radio_code_df.show(30, False)

    # TODO join on disposition column
    agg_df_join = agg_df \
        .join(radio_code_df, "disposition")
        
    agg_df_join.printSchema()
    
    agg_df_join.show(30, False)

+--------------------+-----------+
|         description|disposition|
+--------------------+-----------+
|              Abated|        ABA|
|          Admonished|        ADM|
|             Advised|        ADV|
|              Arrest|        ARR|
|              Cancel|        CAN|
|     CPSA assignment|        CSA|
|              Cancel|         22|
|               Cited|        CIT|
| Criminal Activation|        CRM|
|     Gone on Arrival|        GOA|
|             Handled|        HAN|
|        Non-Criminal|        NCR|
|      No Disposition|         ND|
|            No Merit|        NOM|
|Premises Appears ...|        PAS|
|              Report|        REP|
|SFFD Medical Staf...|        SFD|
|    Unable to Locate|        UTL|
|Vehicle Appears S...|        VAS|
+--------------------+-----------+

+--------------------------+-----------+
|description               |disposition|
+--------------------------+-----------+
|Abated                    |ABA        |
|Admonished                |AD