In [1]:
import os
import sys

from pyspark.sql import SparkSession
import pyspark.sql.types as pst
import pyspark.sql.functions as psf

In [2]:
LOG_LEVEL = os.environ.get("LOG_LEVEL", "INFO")

RADIO_CODE_JSON_FILEPATH = os.environ.get("RADIO_CODE_JSON_FILEPATH","./radio_code.json")

KAFKA_BROKER_URL = os.environ.get("KAFKA_BROKER_URL", "localhost:9092")
KAFKA_TOPIC = "udacity.project.spark-streaming.police"


schema = pst.StructType([
    pst.StructField("crime_id", pst.StringType()),  # : "183653763",
    pst.StructField("original_crime_type_name", pst.StringType()),  # : "Traffic Stop",
    pst.StructField("report_date", pst.DateType()),  # : "2018-12-31T00:00:00.000",
    pst.StructField("call_date", pst.DateType()),  # : "2018-12-31T00:00:00.000",
    pst.StructField("offense_date", pst.DateType()),  # : "2018-12-31T00:00:00.000",
    pst.StructField("call_time", pst.StringType()),  # : "23:57",
    pst.StructField("call_date_time", pst.TimestampType()),  # : "2018-12-31T23:57:00.000",
    pst.StructField("disposition", pst.StringType()),  # : "ADM",
    pst.StructField("address", pst.StringType()),  # : "Geary Bl/divisadero St",
    pst.StructField("city", pst.StringType()),  # : "San Francisco",
    pst.StructField("state", pst.StringType()),  # : "CA",
    pst.StructField("agency_id", pst.StringType()),  # : "1",
    pst.StructField("address_type", pst.StringType()),  # : "Intersection",
    pst.StructField("common_location", pst.StringType()),  # : ""
])

In [3]:
# TODO Create Spark in Standalone mode
spark = SparkSession \
    .builder \
    .master("local[*]") \
    .appName("KafkaSparkStructuredStreaming") \
    .getOrCreate()

spark

In [4]:
df = (
    spark
    .readStream
    .format("kafka")
    .option("kafka.bootstrap.servers", KAFKA_BROKER_URL)
    .option("subscribe", KAFKA_TOPIC)
    .option("startingOffsets", "earliest")
    .option("maxOffsetsPerTrigger", 6000)
    .option("stopGracefullyOnShutdown", "true")
    .load()
)
df.printSchema()

root
 |-- key: binary (nullable = true)
 |-- value: binary (nullable = true)
 |-- topic: string (nullable = true)
 |-- partition: integer (nullable = true)
 |-- offset: long (nullable = true)
 |-- timestamp: timestamp (nullable = true)
 |-- timestampType: integer (nullable = true)



# Base Query

In [5]:
kafka_df = df.selectExpr("CAST(value AS STRING)")

service_table = kafka_df.select(
    psf.from_json(psf.col("value"), schema).alias("parsed")
).select("parsed.*")
service_table.createOrReplaceTempView("service")

In [6]:
query = (
    service_table
    .writeStream
    .outputMode("update")
    .format("memory")
    .queryName("service_mem")
    .start()
)

In [17]:
spark.sql("SELECT * from service_mem").show(2, vertical=True)

-RECORD 0----------------------------------------
 crime_id                 | 182751009            
 original_crime_type_name | Suspicious Person    
 report_date              | 2018-10-02           
 call_date                | 2018-10-02           
 offense_date             | 2018-10-02           
 call_time                | 09:11                
 call_date_time           | 2018-10-02 09:11:00  
 disposition              | GOA                  
 address                  | 300 Block Of Hyde St 
 city                     | San Francisco        
 state                    | CA                   
 agency_id                | 1                    
 address_type             | Premise Address      
 common_location          |                      
-RECORD 1----------------------------------------
 crime_id                 | 182751010            
 original_crime_type_name | Passing Call         
 report_date              | 2018-10-02           
 call_date                | 2018-10-02           


In [18]:
query.stop()
query.lastProgress

{'id': '0116aed6-3d32-43c0-8caa-a0995b6a39f2',
 'runId': '168758d2-f103-43b4-b725-ff0d8ea43237',
 'name': 'service_mem',
 'timestamp': '2020-02-22T12:23:44.858Z',
 'batchId': 27,
 'numInputRows': 68,
 'inputRowsPerSecond': 618.1818181818181,
 'processedRowsPerSecond': 456.37583892617454,
 'durationMs': {'addBatch': 45,
  'getBatch': 0,
  'getEndOffset': 0,
  'queryPlanning': 22,
  'setOffsetRange': 2,
  'triggerExecution': 149,
  'walCommit': 16},
 'stateOperators': [],
 'sources': [{'description': 'KafkaV2[Subscribe[udacity.project.spark-streaming.police]]',
   'startOffset': {'udacity.project.spark-streaming.police': {'0': 11736}},
   'endOffset': {'udacity.project.spark-streaming.police': {'0': 11804}},
   'numInputRows': 68,
   'inputRowsPerSecond': 618.1818181818181,
   'processedRowsPerSecond': 456.37583892617454}],
 'sink': {'description': 'MemorySink'}}

# Window Query 1

In [35]:
window_agg_df = (
    service_table
    .withWatermark("call_date_time", "1 hour") 
    .groupBy(
        psf.window(service_table.call_date_time, "30 minutes"),
        service_table.original_crime_type_name,
        service_table.disposition,
    ).count()
)
window_agg_df.createOrReplaceTempView("windowed_crime_types")

In [36]:
query = (
    window_agg_df
    .writeStream
    .outputMode("update")
    .format("memory")
    .queryName("agg_mem")
    .start()
)

In [44]:
spark.sql("""
    SELECT * from agg_mem
""").show(20, vertical=False, truncate=False)

+------------------------------------------+------------------------+------------+-----+
|window                                    |original_crime_type_name|disposition |count|
+------------------------------------------+------------------------+------------+-----+
|[2018-10-02 10:30:00, 2018-10-02 11:00:00]|Traf Violation Cite     |HAN         |1    |
|[2018-10-02 14:30:00, 2018-10-02 15:00:00]|Threats / Harassment    |ND          |1    |
|[2018-10-02 17:00:00, 2018-10-02 17:30:00]|Burglary                |REP         |2    |
|[2018-10-02 17:30:00, 2018-10-02 18:00:00]|Passing Call            |HAN         |3    |
|[2018-10-02 20:30:00, 2018-10-02 21:00:00]|Trespasser              |GOA         |2    |
|[2018-10-02 21:30:00, 2018-10-02 22:00:00]|Trespasser              |HAN         |1    |
|[2018-10-02 23:30:00, 2018-10-03 00:00:00]|Meet W/citizen          |HAN         |1    |
|[2018-10-03 06:00:00, 2018-10-03 06:30:00]|Audible Alarm           |NCR         |1    |
|[2018-10-03 06:30:00

In [45]:
query.stop()
query.lastProgress

{'id': 'dac0272f-4a87-4058-aa4d-3c4ddd6b04bb',
 'runId': 'cf71d657-79ef-4a9c-8961-607ffc311a0a',
 'name': 'agg_mem',
 'timestamp': '2020-02-22T12:32:38.934Z',
 'batchId': 1,
 'numInputRows': 6000,
 'inputRowsPerSecond': 1146.1318051575931,
 'processedRowsPerSecond': 2951.3034923757996,
 'durationMs': {'addBatch': 1961,
  'getBatch': 0,
  'getEndOffset': 0,
  'queryPlanning': 43,
  'setOffsetRange': 1,
  'triggerExecution': 2033,
  'walCommit': 16},
 'eventTime': {'avg': '2018-10-05T23:41:15.140Z',
  'max': '2018-10-07T02:33:00.000Z',
  'min': '2018-10-04T17:52:00.000Z',
  'watermark': '2018-10-04T16:52:00.000Z'},
 'stateOperators': [{'numRowsTotal': 8538,
   'numRowsUpdated': 4333,
   'memoryUsedBytes': 2369767,
   'customMetrics': {'loadedMapCacheHitCount': 400,
    'loadedMapCacheMissCount': 0,
    'stateOnCurrentVersionSizeBytes': 2140999}}],
 'sources': [{'description': 'KafkaV2[Subscribe[udacity.project.spark-streaming.police]]',
   'startOffset': {'udacity.project.spark-streaming

# Join Query 2

In [46]:
radio_code_df = spark.read.option("multiLine", True).json(RADIO_CODE_JSON_FILEPATH)
radio_code_df.createOrReplaceTempView("radio_code")

In [47]:
spark.sql("SELECT * from radio_code").show(5, truncate=False)

+-----------+----------------+
|description|disposition_code|
+-----------+----------------+
|Abated     |ABA             |
|Admonished |ADM             |
|Advised    |ADV             |
|Arrest     |ARR             |
|Cancel     |CAN             |
+-----------+----------------+
only showing top 5 rows



In [48]:
join_query = spark.sql("""
    SELECT ct.*, r.description
    FROM windowed_crime_types as ct
    LEFT JOIN radio_code as r on r.disposition_code = ct.disposition
""")
join_query

DataFrame[window: struct<start:timestamp,end:timestamp>, original_crime_type_name: string, disposition: string, count: bigint, description: string, disposition_code: string]

In [49]:
query = (
    join_query
    .writeStream
    .outputMode("update")
    .format("memory")
    .queryName("join_mem")
    .start()
)

In [56]:
spark.sql("""
    SELECT * from join_mem
""").show(2, vertical=True, truncate=False)

-RECORD 0--------------------------------------------------------------
 window                   | [2018-10-02 10:30:00, 2018-10-02 11:00:00] 
 original_crime_type_name | Traf Violation Cite                        
 disposition              | HAN                                        
 count                    | 1                                          
 description              | Handled                                    
 disposition_code         | HAN                                        
-RECORD 1--------------------------------------------------------------
 window                   | [2018-10-02 14:30:00, 2018-10-02 15:00:00] 
 original_crime_type_name | Threats / Harassment                       
 disposition              | ND                                         
 count                    | 1                                          
 description              | No Disposition                             
 disposition_code         | ND                                  

In [55]:
query.stop()
query.lastProgress

{'id': '35c01e4d-ff47-4e88-bddb-a03d447b1fec',
 'runId': '4b2a3314-6cf3-4e74-b876-0f1dc95e3cd8',
 'name': 'join_mem',
 'timestamp': '2020-02-22T12:33:34.960Z',
 'batchId': 3,
 'numInputRows': 6000,
 'inputRowsPerSecond': 2983.5902536051713,
 'processedRowsPerSecond': 3227.5416890801507,
 'durationMs': {'addBatch': 1786,
  'getBatch': 0,
  'getEndOffset': 0,
  'queryPlanning': 42,
  'setOffsetRange': 2,
  'triggerExecution': 1859,
  'walCommit': 11},
 'eventTime': {'avg': '2018-10-10T20:58:40.429Z',
  'max': '2018-10-12T06:27:00.000Z',
  'min': '2018-10-09T15:00:00.000Z',
  'watermark': '2018-10-09T14:00:00.000Z'},
 'stateOperators': [{'numRowsTotal': 17173,
   'numRowsUpdated': 4347,
   'memoryUsedBytes': 4975391,
   'customMetrics': {'loadedMapCacheHitCount': 1200,
    'loadedMapCacheMissCount': 0,
    'stateOnCurrentVersionSizeBytes': 4348359}}],
 'sources': [{'description': 'KafkaV2[Subscribe[udacity.project.spark-streaming.police]]',
   'startOffset': {'udacity.project.spark-stream