# Data cleaning using Spark Streaming

This notebook reads data from the `ingest` topic on our Kafka distributed queue, cleans each of the messages and write the result to the Kafka topic `ingest-cleaned`.

Spark Structured Streaming treats a live data stream as a table to which we can continuously append. We can run queries on this table to our heart's content. A new event results in a new record in the table, after which the result of the queries will be recomputed in an intelligent way so it does not have to recompute everything, but instead works with a delta change.  

Spark is responsible for updating the results table when there is  new data and relieves us from maintaining running aggregrations, ensuring data consistency and fault tolerance. Everything is done for us, which makes our lives simpler, allowing us to focus on the essentials.

#### Input

We will now use the Spark Structured Streaming API to clean our event stream. We will us e a DataStreamReader to read from a Kafka source. We have added events to our Kafka distributed queue in notebook `1_read_and_POST.ipynb` and are now ready to process them.



#### Output

We will output our resulting data to a Kafka sink. Each row of our dataframe will be written to the Kafka topic `ingest-cleaned`. We will use the outpot mode `append`, which allows us to append new rows to the results table.

## Cleaning the data

In [1]:
%%bash
# Ensure the required Python 3 dependencies are installed.
python3 -m pip install kafka-python



We will now create a Spark context and specify that the Python spark-kafka libraries need to be added.

In [2]:
from IPython.display import display, clear_output
from time import sleep

import os
os.environ['PYSPARK_SUBMIT_ARGS'] = '--packages org.apache.spark:spark-sql-kafka-0-10_2.12:3.0.0 pyspark-shell'

import pyspark 
from pyspark import SparkContext
from pyspark.sql.session import SparkSession
from pyspark.sql.types import *
from pyspark.sql.functions import *

# Create a local Spark cluster with two executors (if it doesn't already exist)
spark = SparkSession.builder.master('local[2]').getOrCreate()
sc = spark.sparkContext


We will now creating a streaming DataFrame that respresents the events received from the Kafka topic `ingest`.

In [11]:
input = (
    spark.readStream.format("kafka")
    .option("kafka.bootstrap.servers","localhost:9092")
    # Change `ingest` to your topic of choice.
    .option("subscribe", "ingest")
    # earliest: start reading from the beginning of the queue
    # this will also read all messages already present on the Kafka topic
    .option("startingOffsets", "earliest")
    .load()
)


We can't just run the query and see the output because the query will never stop. After all, these are streaming dataframes. For debugging purposes, we can spin up a query, wait a few seconds so we have some results, and show the contents of the in-memory table.

We stop the running query so we don't run out of memory.

In [12]:
stream_decoded = (
    input
    .withColumn("value", input["value"].cast("string"))
    .select("value", "timestamp")
    )

In [13]:
try:
    # In case the previous query wasn't stopped
    tq.stop()
except:
    pass

tq = (
    # Create an output stream
    stream_decoded.writeStream               
    # Only write new rows to the output
    # To clean data, we can only use the outputMode 'append'
    .outputMode("append")           
    # Write output stream to an in-memory Spark table (a DataFrame)
    .format("memory")               
    # The name of the output table will be the same as the name of the query
    .queryName("test_query")
    # Submit the query to Spark and execute it
    .start()
)

sleep(2)

# When the status says "Waiting for data to arrive", that means the query
# has finished its current iteration and is waiting for new messages from
# Kafka.
display(tq.status)

memory_sink = spark.table("test_query")
# Show result table in Jupyter Notebook. Since Jupyter Notebooks have native support for showing pandas tables,
# we convert the Spark DataFrame.
display(memory_sink.toPandas())

# Stop the query
tq.stop()

{'message': 'Waiting for data to arrive',
 'isDataAvailable': False,
 'isTriggerActive': False}

Unnamed: 0,value,timestamp
0,"{""lat"": 40.297875899999994, ""lng"": -75.5812935...",2020-11-26 11:11:49.990
1,"{""lat"": 40.2580614, ""lng"": -75.26467990000002,...",2020-11-26 11:11:50.359
2,"{""lat"": 40.121181799999995, ""lng"": -75.3519752...",2020-11-26 11:11:51.034
3,"{""lat"": 40.116153000000004, ""lng"": -75.343513,...",2020-11-26 11:11:51.659
4,"{""lat"": 40.251492, ""lng"": -75.6033497, ""desc"":...",2020-11-26 11:11:52.623
...,...,...
5777,"{""lat"": 40.114239000000005, ""lng"": -75.3385079...",2020-11-26 21:26:02.046
5778,"{""lat"": 40.1179476, ""lng"": -75.20984759999999,...",2020-11-26 21:26:02.175
5779,"{""lat"": 40.1990064, ""lng"": -75.3000584, ""desc""...",2020-11-26 21:26:02.226
5780,"{""lat"": 40.143325700000005, ""lng"": -75.4228189...",2020-11-26 21:26:02.295


We use the `from_json` function to convert our JSON to a tuple in one column. We will later flatten this column so that each field of our tuple becomes a column in our DataFrame. 

In [14]:
# lat,lng,desc,zip,title,timeStamp,twp,addr,e

schema = StructType([
    StructField("lat", DoubleType()),
    StructField("lng", DoubleType()),
    StructField("desc", StringType()),
    StructField("zip", FloatType()),
    StructField("title", StringType()),
    StructField("timeStamp", TimestampType()),
    StructField("twp", StringType()),
    StructField("addr", StringType()),
    StructField("e", IntegerType()),
])

decoded_json_stream = (
    stream_decoded.withColumn("nineoneone", from_json(col("value"), schema))
)


In [15]:
import numpy as np

flattened_stream = (
    decoded_json_stream
    .select("nineoneone.*") 
)
# Create two requested columns from column 'title'
split_col = pyspark.sql.functions.split(flattened_stream['title'], ':')
flattened_stream = flattened_stream.withColumn('majorTitle', split_col.getItem(0)).withColumn('minorTitle', split_col.getItem(1))

# Deal with NaN
flattened_stream = flattened_stream.replace(float('nan'), None)
flattened_stream = flattened_stream.withColumn("zip", flattened_stream["zip"].cast(IntegerType()))



Let's take a look at our flattened stream. We will do so using Pandas and we can also see that our columns are typed appropriately by using `dtypes`.

In [25]:
try:
    # In case the previous query wasn't stopped
    tq.stop()
except:
    pass

tq = (
    # Create an output stream
    flattened_stream.writeStream               
    # Only write new rows to the output
    .outputMode("append")           
    # Write output stream to an in-memory Spark table (a DataFrame)
    .format("memory")               
    # The name of the output table will be the same as the name of the query
    .queryName("test_query")
    # Submit the query to Spark and execute it
    .start()
)

sleep(2)

# When the status says "Waiting for data to arrive", that means the query
# has finished its current iteration and is waiting for new messages from
# Kafka.
display(tq.status)

memory_sink = spark.table("test_query")


# Show result table in Jupyter Notebook. Since Jupyter Notebooks have native support for showing pandas tables,
# we convert the Spark DataFrame.
display(memory_sink.toPandas().head(10))
display(memory_sink.dtypes)

# Stop the query
tq.stop()

{'message': 'Waiting for data to arrive',
 'isDataAvailable': False,
 'isTriggerActive': False}

Unnamed: 0,lat,lng,desc,zip,title,timeStamp,twp,addr,e,majorTitle,minorTitle
0,40.297876,-75.581294,REINDEER CT & DEAD END; NEW HANOVER; Station ...,19525.0,EMS: BACK PAINS/INJURY,2015-12-10 17:10:52,NEW HANOVER,REINDEER CT & DEAD END,1,EMS,BACK PAINS/INJURY
1,40.258061,-75.26468,BRIAR PATH & WHITEMARSH LN; HATFIELD TOWNSHIP...,19446.0,EMS: DIABETIC EMERGENCY,2015-12-10 17:29:21,HATFIELD TOWNSHIP,BRIAR PATH & WHITEMARSH LN,1,EMS,DIABETIC EMERGENCY
2,40.121182,-75.351975,HAWS AVE; NORRISTOWN; 2015-12-10 @ 14:39:21-St...,19401.0,Fire: GAS-ODOR/LEAK,2015-12-10 14:39:21,NORRISTOWN,HAWS AVE,1,Fire,GAS-ODOR/LEAK
3,40.116153,-75.343513,AIRY ST & SWEDE ST; NORRISTOWN; Station 308A;...,19401.0,EMS: CARDIAC EMERGENCY,2015-12-10 16:47:36,NORRISTOWN,AIRY ST & SWEDE ST,1,EMS,CARDIAC EMERGENCY
4,40.251492,-75.60335,CHERRYWOOD CT & DEAD END; LOWER POTTSGROVE; S...,,EMS: DIZZINESS,2015-12-10 16:56:52,LOWER POTTSGROVE,CHERRYWOOD CT & DEAD END,1,EMS,DIZZINESS
5,40.253473,-75.283245,CANNON AVE & W 9TH ST; LANSDALE; Station 345;...,19446.0,EMS: HEAD INJURY,2015-12-10 15:39:04,LANSDALE,CANNON AVE & W 9TH ST,1,EMS,HEAD INJURY
6,40.182111,-75.127795,LAUREL AVE & OAKDALE AVE; HORSHAM; Station 35...,19044.0,EMS: NAUSEA/VOMITING,2015-12-10 16:46:48,HORSHAM,LAUREL AVE & OAKDALE AVE,1,EMS,NAUSEA/VOMITING
7,40.217286,-75.405182,COLLEGEVILLE RD & LYWISKI RD; SKIPPACK; Stati...,19426.0,EMS: RESPIRATORY EMERGENCY,2015-12-10 16:17:05,SKIPPACK,COLLEGEVILLE RD & LYWISKI RD,1,EMS,RESPIRATORY EMERGENCY
8,40.289027,-75.39959,MAIN ST & OLD SUMNEYTOWN PIKE; LOWER SALFORD;...,19438.0,EMS: SYNCOPAL EPISODE,2015-12-10 16:51:42,LOWER SALFORD,MAIN ST & OLD SUMNEYTOWN PIKE,1,EMS,SYNCOPAL EPISODE
9,40.102398,-75.291458,BLUEROUTE & RAMP I476 NB TO CHEMICAL RD; PLYM...,19462.0,Traffic: VEHICLE ACCIDENT -,2015-12-10 17:35:41,PLYMOUTH,BLUEROUTE & RAMP I476 NB TO CHEMICAL RD,1,Traffic,VEHICLE ACCIDENT -


[('lat', 'double'),
 ('lng', 'double'),
 ('desc', 'string'),
 ('zip', 'int'),
 ('title', 'string'),
 ('timeStamp', 'timestamp'),
 ('twp', 'string'),
 ('addr', 'string'),
 ('e', 'int'),
 ('majorTitle', 'string'),
 ('minorTitle', 'string')]

Notice above that the ZIP contains NaNs.

Our EDA (see project lab 1) showed that the only columns that contained NULL/NaN values were:
* ZIP
* twp (Township)

These columns are of minor importance and will probably not be used, thus we will not drop any rows from our dataset.

However, we notice that Township is also included in the description of a row. As such, it might be possible to fix these NULLs in township using this description field. The description field consists of values separated by a semicolon. The township the second value in this list of values. Unfortunately, if the township is missing from the entry, then the township is also not included in the description. Thus, the only way to fix these entries would be to look at the ZIP code of this entry and find entries with the same ZIP code. We could then use those entries their township to fill in the missing township. Unfortunately, there are about 266 times more ZIP codes that are missing, so we opted not to do this.

We could try to fix the ZIP codes by looking at the township. However, multiple ZIP codes are possible for the same township, so we would have to narrow our search to also compare the streets. The chance that these streets appear multiple times in this dataset might be relatively small for certain entries that we are trying to fix.

We decided to use the longitude and latitude in the Google Maps API to figure out what the ZIP code of a location is.

**Unfortunately, we were unable to figure out how we could update our stream with the dataframe that now contains the correct ZIP codes (so longer NULL values). We understand that doing it this way, we only adjust the memory_sink pyspark dataframe, but we wanted to show that our function works and fills in ZIPs correctly. We attempted to apply this on the flattened_stream object as well, in a similar fashion as how we do it now, with `flattened_stream` instead of `memory_sink`, but alas, this would wipe our whole `flattened_stream`. The result would be an empty stream. Now, we notice that indeed the stream does not contain the adjusted ZIPs, because we only performed the changes on a local pyspark dataframe in memory, but we had no other way than to do it like this to show that we managed to fill in all ZIPs correctly. Please let us know how we can 'persist' these changes to our stream object, which we can then stream to our Kafka topic `ingest-cleaned`.**

In [34]:
from pyspark.sql.functions import udf
import requests
import numpy as np

# Function to fetch township and zip from Google Maps API based on longitude and latitude of our 911 emergency
def reverse_geocode(lat, lng, zip):
    # Request to Google Maps.
    try:  
        response = requests.get('https://maps.googleapis.com/maps/api/geocode/json?latlng=' + str(lat) + ',' + str(lng) + '&key=AIzaSyBCs46NUg8q6lcAH6aIsVQwfQ4SEMIBob4')
        
        # Checking if the length of the zipcode is 5 characters and starts with a 1 (-> Pennsylvania)
        for i in range(len(response.json()['results']) - 1):
            zipcode = response.json()['results'][i]['formatted_address'].split(',')[-2].split()[1]
            
            if len(zipcode) == 5 and zipcode[0] == '1':
                zipcode = zipcode
                break
            else:
                zipcode = '-1'

        return zipcode
    
    except:
        return '-1'

In [11]:
# this @#!@? doesn't seem to work :D ...

# UDF function which calls the reverse_geocode function and fills in the ZIP code appropriately

# zip_udf = udf(lambda x: reverse_geocode(x[0], x[1]), IntegerType())

# Update the zip column appropriately
# the code below results in an empty stream, even though we've done this in a similar fashion in other cells before

# flattened_stream = flattened_stream.withColumn('zip', zip_udf(struct(['lat', 'lng'])))            

Let's perform our changes based on the above function.

In [36]:
try:
    # In case the previous query wasn't stopped
    tq.stop()
except:
    pass

tq = (
    # Create an output stream
    flattened_stream.writeStream               
    # Only write new rows to the output
    .outputMode("append")           
    # Write output stream to an in-memory Spark table (a DataFrame)
    .format("memory")               
    # The name of the output table will be the same as the name of the query
    .queryName("test_query")
    # Submit the query to Spark and execute it
    .start()
)

sleep(2)

# When the status says "Waiting for data to arrive", that means the query
# has finished its current iteration and is waiting for new messages from
# Kafka.
display(tq.status)
memory_sink = spark.table("test_query")

# This changes the ZIPs correctly on the memory_sink dataframe, but how to we persist these changes to our streaming dataframe :(
# We can't apply this to our streaming dataframe either, because this results in an empty streaming dataframe...
# UDF function which calls the reverse_geocode function and fills in the ZIP code appropriately
zip_udf = udf(lambda x: int(reverse_geocode(x[0], x[1], x[2])), IntegerType())
# Update the zip column appropriately
memory_sink = memory_sink.withColumn('zip', zip_udf(struct(['lat', 'lng', 'zip'])))

# Show result table in Jupyter Notebook. Since Jupyter Notebooks have native support for showing pandas tables,
# we convert the Spark DataFrame.
display(memory_sink.toPandas().head(10))
display(memory_sink.dtypes)


# Stop the query
tq.stop()

{'message': 'Processing new data',
 'isDataAvailable': True,
 'isTriggerActive': True}

Notice in the above table that the NaNs were replaced by ZIP codes. Unfortunately, as mentioned earlier, we do not know how we can persist these data cleaning changes to our stream, to then consequently stream it to our Kafka topic.

## Write entries to ingest-cleaned Kafka topic

Finally, we want to write the cleaned 911 entries to the `ingest-cleaned` Kafka topic. This Kafka output stream expects a dataframe, a value and an optional key column.

To create the `value` column, we first create a struct from all columns in the dataframe by using the `struct` function, serialize the result to json using `to_json`, and keep only the value column using `select` and `alias`.

In [348]:
output_stream = flattened_stream.select(to_json(struct("*")).alias("value"))

In [349]:
try:
    # In case the previous query wasn't stopped
    tq.stop()
    # Remove old checkpoint dir, other you'll get weird runtime faults
    os.rmdir("checkpoints-cleanup")
except:
    pass

# Prepare df for Kafka and write to kafka
tq = (
    output_stream
    .writeStream.format("kafka")
    .option("kafka.bootstrap.servers", "localhost:9092")
    .option("topic", "ingest-cleaned")
    .option("checkpointLocation", "checkpoints-cleanup")
    .start()
)

sleep(2)
display(tq.status)


{'message': 'Waiting for data to arrive',
 'isDataAvailable': False,
 'isTriggerActive': False}