<a href="https://colab.research.google.com/github/hfarley/bigdata-finalproject/blob/main/BigDataFinal.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [1]:
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"

In [2]:
#install spark. we are using the one that uses hadoop as the underlying scheduler.
# !rm spark-3.2.4-bin-hadoop3.2.tgz 
!wget -q https://downloads.apache.org/spark/spark-3.2.4/spark-3.2.4-bin-hadoop3.2.tgz
!tar xf  spark-3.2.4-bin-hadoop3.2.tgz
!ls -l

#Provides findspark.init() to make pyspark importable as a regular library.
os.environ["SPARK_HOME"] = "spark-3.2.4-bin-hadoop3.2"
os.environ['PYSPARK_SUBMIT_ARGS'] = '--packages org.apache.spark:spark-streaming-kafka-0-8_2.11:2.4.7 pyspark-shell'

total 296072
-rw-r--r--  1 root root   1975841 Apr 28 22:01 nfd_incidents_xd_seg.parquet
drwxr-xr-x  1 root root      4096 Apr 27 13:35 sample_data
drwxr-xr-x 13 1000 1000      4096 Apr  9 21:17 spark-3.2.4-bin-hadoop3.2
-rw-r--r--  1 root root 301183180 Apr  9 21:35 spark-3.2.4-bin-hadoop3.2.tgz


In [3]:
!pip install -q findspark pyspark
import findspark
findspark.init()

[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m310.8/310.8 MB[0m [31m4.0 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone


In [None]:
# 1. query any month of any year within the database and it will show a map of the incidents during that month. 
# 2. input a road segment, and it will show a graph of the density of the times 
# that incidents occur in that road segment. 
# 3. query the average response time for each road segment, and sort it to find
#  which road segments have the highest and lowest response times. 



In [5]:
#this creates a smaller sample dataset to work with locally

from pyspark.sql import SparkSession


spark = SparkSession.builder.appName("read_parquet").getOrCreate()

df = spark.read.parquet("nfd_incidents_xd_seg.parquet")

# create a smaller DataFrame with the first 1000 entries
df_small = df.limit(10000)

# write the output 
df_small.write.parquet("incidents_10000.parquet")
spark.stop()

Notes: 


*   .show() will give the first 20 entries
*   in the spark assignment we only used SparkContext, but here we also need Spark Session

For the first task we need ID, latitude & longitude (or, alternatively geometry), and one of the time variables - I decided to use the local time variable since that would be more meaningful for someone trying to visualize the data

We also add columns for month and year (based on time_local) for querying purposes

* Need to decide how we want to visualize this and get the coordinates to draw a map of Davidson county which then shows the incidents
* Month & year are specified as command line arguments



In [None]:
%%file month_incidents.py


import argparse
from pyspark import SparkContext, SparkConf
from pyspark.sql import SparkSession
from pyspark.sql.functions import year, month

if __name__ == '__main__':

  parser = argparse.ArgumentParser(description='Process some strings.')
  parser.add_argument('--monthArg', type=int, choices=range(1,13), help='month (1-12)', required=True)
  parser.add_argument('--yearArg', type=int, choices=range(2017,2023), help='year (2017-2022)', required=True)
  args = parser.parse_args()

  # replace this line with the s3 pass when testing over EMR
  conf = SparkConf().setAppName('month_incidents').set('spark.hadoop.validateOutputSpecs', False)
  sc = SparkContext(conf=conf).getOrCreate()
  spark = SparkSession(sc)

  
  try:

    incidents = spark.read.parquet("incidents_100000.parquet")

    incidents = incidents.drop("emdCardNumber", "time_utc", "response_time_sec", "day_of_week", "weekend_or_not", "Incident_ID", "Dist_to_Seg", "XDSegID")

    incidents = incidents.withColumn("year", year(incidents["time_local"]))
    incidents = incidents.withColumn("month", month(incidents["time_local"]))
    
    filtered_incidents = incidents.filter((incidents.year == args.yearArg) & (incidents.month == args.monthArg))
    
    filtered_incidents.show()
    
    # counts.repartition(1).saveAsTextFile("")

  finally:
    # very important: stop the context. Otherwise you may get an error that context is still alive. if you are on colab just restart the runtime if you face problem
    #finally is used to make sure the context is stopped even with errors
    sc.stop()

  pass

Overwriting month_incidents.py


In [None]:
#to test locally
!spark-submit --packages org.apache.spark:spark-streaming-kafka-0-8_2.11:2.4.7 month_incidents.py --monthArg 2 --yearArg 2017

In profs sample code they use plotly express, but: 


*   plotly express can only be used with pandas, which means we have to convert our spark df to a pandas df to use plotly express but obviously this is an issue with the size of our dataset
*   to use regular plotly, we have to sign up/create an account to get an access key for their API calls to create the maps we need



In [None]:
%%file road_density_incidents.py

import argparse

from pyspark import SparkContext, SparkConf
from pyspark.sql import SparkSession
from pyspark.sql.functions import hour
import plotly.express as px

if __name__ == '__main__':

  # replace this line with the s3 pass when testing over EMR
  conf = SparkConf().setAppName('month_incidents').set('spark.hadoop.validateOutputSpecs', False)
  sc = SparkContext(conf=conf).getOrCreate()
  spark = SparkSession(sc)

  parser = argparse.ArgumentParser(description='Process some strings.')
  parser.add_argument('--XDSegID', type=float, help='Enter desired XDSegID', required=True)
  args = parser.parse_args()

  
  try:
    incidents = spark.read.parquet("incidents_100000.parquet")

    incidents = incidents.drop("emdCardNumber", "time_utc", "latitude", "longitude", "response_time_sec", "geometry", "day_of_week", "weekend_or_not", "Incident_ID", "Dist_to_Seg")
    incidents = incidents.withColumn("hour", hour(incidents["time_local"]))
    ex_id = args.XDSegID
    incidents = incidents.drop("time_local")
    incidents = incidents.filter(incidents.XDSegID == ex_id)
    hours = [val.hour for val in incidents.select('hour').collect()]

    fig = px.histogram(hours, nbins=24, title=f"Incident Hourly Density at Road Segment {ex_id}")
    fig.update_layout(
        xaxis_title="Hour", yaxis_title="Count"
    )
    fig.write_html("incident_segment_hour_density.html")
  

  finally:
    sc.stop()

  pass

Overwriting road_density_incidents.py


In [None]:
!spark-submit --packages org.apache.spark:spark-streaming-kafka-0-8_2.11:2.4.7 road_density_incidents.py --XDSegID 1524393684.0

:: loading settings :: url = jar:file:/content/spark-3.2.4-bin-hadoop3.2/jars/ivy-2.5.1.jar!/org/apache/ivy/core/settings/ivysettings.xml
Ivy Default Cache set to: /root/.ivy2/cache
The jars for the packages stored in: /root/.ivy2/jars
org.apache.spark#spark-streaming-kafka-0-8_2.11 added as a dependency
:: resolving dependencies :: org.apache.spark#spark-submit-parent-707d8ca6-0822-432d-aa2d-cb481b027e89;1.0
	confs: [default]
	found org.apache.spark#spark-streaming-kafka-0-8_2.11;2.4.7 in central
	found org.apache.kafka#kafka_2.11;0.8.2.1 in central
	found org.scala-lang.modules#scala-xml_2.11;1.0.2 in central
	found com.yammer.metrics#metrics-core;2.2.0 in central
	found org.slf4j#slf4j-api;1.7.16 in central
	found org.scala-lang.modules#scala-parser-combinators_2.11;1.1.0 in central
	found com.101tec#zkclient;0.3 in central
	found log4j#log4j;1.2.17 in central
	found org.apache.kafka#kafka-clients;0.8.2.1 in central
	found net.jpountz.lz4#lz4;1.2.0 in central
	found org.xerial.snapp

In [42]:
# 3. query the average response time for each road segment, and sort it to find
#  which road segments have the highest and lowest response times. 

from pyspark import SparkContext, SparkConf
from operator import add
from pyspark.sql import SparkSession

conf = SparkConf().setAppName('month_incidents').set('spark.hadoop.validateOutputSpecs', False)
sc = SparkContext(conf=conf).getOrCreate()
spark = SparkSession(sc)


In [43]:
incidents = spark.read.parquet("incidents_10000.parquet")

incidents = incidents.drop("latitude", "longitude", "emdCardNumber", "time_utc", "day_of_week", "weekend_or_not", "Incident_ID", "geometry")
incidents = incidents.na.drop(subset=["response_time_sec","XDSegID"])

In [44]:

incidents = incidents.rdd
Seg_response_sum = incidents.map(lambda x: (x.XDSegID, x.response_time_sec)).reduceByKey(lambda a, b: a + b)
Seg_count = incidents.map(lambda x: (x.XDSegID, 1)).reduceByKey(lambda a, b: a + b)

Seg_response_sum.take(10)
Seg_count.take(10)

[(1524393684.0, 12),
 (1524356434.0, 3),
 (449620819.0, 1),
 (396068370.0, 2),
 (156041062.0, 2),
 (1524450692.0, 16),
 (1524630585.0, 8),
 (450433846.0, 3),
 (1524470172.0, 3),
 (449631045.0, 5)]

In [45]:
join = Seg_response_sum.join(Seg_count)
output = join.map(lambda a: (a[0], a[1][0]/a[1][1]))
output = output.sortBy(lambda x: x[1], ascending=False)
output.take(25)

[(1524369685.0, 1550.0),
 (160847340.0, 1384.0),
 (1524578575.0, 1367.0),
 (449617585.0, 1356.0),
 (1524435487.0, 1283.0),
 (1524353049.0, 1275.0),
 (1524321749.0, 1228.6666666666667),
 (1524596883.0, 1215.0),
 (449620896.0, 1179.0),
 (449633453.0, 1163.0),
 (1524321798.0, 1123.0),
 (449620756.0, 1117.0),
 (1524393513.0, 1058.0),
 (449617681.0, 968.0),
 (450428426.0, 960.0),
 (396083671.0, 949.0),
 (450430876.0, 942.0),
 (449633533.0, 936.0),
 (1524292955.0, 934.0),
 (1524308532.0, 930.75),
 (449629747.0, 930.0),
 (1524416516.0, 923.0),
 (1524393609.0, 918.0),
 (1524282578.0, 901.0),
 (1524321726.0, 899.0)]

+-------------+---------------------+
|       XSegID|Average Response Time|
+-------------+---------------------+
|1.524369685E9|               1550.0|
|  1.6084734E8|               1384.0|
|1.524578575E9|               1367.0|
| 4.49617585E8|               1356.0|
|1.524435487E9|               1283.0|
|1.524353049E9|               1275.0|
|1.524321749E9|   1228.6666666666667|
|1.524596883E9|               1215.0|
| 4.49620896E8|               1179.0|
| 4.49633453E8|               1163.0|
|1.524321798E9|               1123.0|
| 4.49620756E8|               1117.0|
|1.524393513E9|               1058.0|
| 4.49617681E8|                968.0|
| 4.50428426E8|                960.0|
| 3.96083671E8|                949.0|
| 4.50430876E8|                942.0|
| 4.49633533E8|                936.0|
|1.524292955E9|                934.0|
|1.524308532E9|               930.75|
+-------------+---------------------+
only showing top 20 rows



In [50]:
sc.stop()

In [49]:
%%file road_avg_response.py

# 3. query the average response time for each road segment, and sort it to find
#  which road segments have the highest and lowest response times. 

from pyspark import SparkContext, SparkConf
from operator import add
from pyspark.sql import SparkSession


if __name__ == '__main__':

  # replace this line with the s3 pass when testing over EMR
  conf = SparkConf().setAppName('month_incidents').set('spark.hadoop.validateOutputSpecs', False)
  sc = SparkContext(conf=conf).getOrCreate()
  spark = SparkSession(sc)

  
  try:

    incidents = spark.read.parquet("incidents_10000.parquet")

    incidents = incidents.drop("latitude", "longitude", "emdCardNumber", "time_utc", "day_of_week", "weekend_or_not", "Incident_ID", "geometry")
    incidents = incidents.na.drop(subset=["response_time_sec","XDSegID"])
    incidents = incidents.rdd
    seg_response_sum = incidents.map(lambda x: (x.XDSegID, x.response_time_sec)).reduceByKey(lambda a, b: a + b)
    seg_count = incidents.map(lambda x: (x.XDSegID, 1)).reduceByKey(lambda a, b: a + b)

    join = seg_response_sum.join(seg_count)
    output = join.map(lambda a: (a[0], a[1][0]/a[1][1]))
    output = output.sortBy(lambda x: x[1], ascending=False)
    df = output.toDF(['XSegID', 'Average Response Time'])
    df.show()

  finally:
    sc.stop()

  pass

Overwriting road_avg_response.py


In [51]:
!spark-submit --packages org.apache.spark:spark-streaming-kafka-0-8_2.11:2.4.7 road_avg_response.py

:: loading settings :: url = jar:file:/content/spark-3.2.4-bin-hadoop3.2/jars/ivy-2.5.1.jar!/org/apache/ivy/core/settings/ivysettings.xml
Ivy Default Cache set to: /root/.ivy2/cache
The jars for the packages stored in: /root/.ivy2/jars
org.apache.spark#spark-streaming-kafka-0-8_2.11 added as a dependency
:: resolving dependencies :: org.apache.spark#spark-submit-parent-d6b7a40a-e73e-41e7-988a-fd14375f5886;1.0
	confs: [default]
	found org.apache.spark#spark-streaming-kafka-0-8_2.11;2.4.7 in central
	found org.apache.kafka#kafka_2.11;0.8.2.1 in central
	found org.scala-lang.modules#scala-xml_2.11;1.0.2 in central
	found com.yammer.metrics#metrics-core;2.2.0 in central
	found org.slf4j#slf4j-api;1.7.16 in central
	found org.scala-lang.modules#scala-parser-combinators_2.11;1.1.0 in central
	found com.101tec#zkclient;0.3 in central
	found log4j#log4j;1.2.17 in central
	found org.apache.kafka#kafka-clients;0.8.2.1 in central
	found net.jpountz.lz4#lz4;1.2.0 in central
	found org.xerial.snapp