# Reading and Writing Data with Spark

This notebook contains the code from the previous screencast. The only difference is that instead of reading in a dataset from a remote cluster, the data set is read in from a local file. You can see the file by clicking on the "jupyter" icon and opening the folder titled "data".

Run the code cell to see how everything works. 

First let's import SparkConf and SparkSession

In [1]:
import findspark
findspark.init('/home/yannick/dev/spark-2.4.4-bin-hadoop2.7')

In [2]:
import pyspark
from pyspark import SparkConf
from pyspark.sql import SparkSession
%matplotlib inline
import matplotlib.pyplot as plt

#
from pyspark import SparkContext



Since we're using Spark locally we already have both a sparkcontext and a sparksession running. We can update some of the parameters, such our application's name. Let's just call it "Our first Python Spark SQL example"

In [3]:
spark = SparkSession \
    .builder \
    .appName("Explore_Json_File") \
    .getOrCreate()

Let's check if the change went through

In [4]:
spark.sparkContext.getConf().getAll()

[('spark.driver.host', '192.168.219.153'),
 ('spark.driver.port', '42787'),
 ('spark.rdd.compress', 'True'),
 ('spark.serializer.objectStreamReset', '100'),
 ('spark.master', 'local[*]'),
 ('spark.executor.id', 'driver'),
 ('spark.submit.deployMode', 'client'),
 ('spark.app.name', 'Explore_Json_File'),
 ('spark.ui.showConsoleProgress', 'true'),
 ('spark.app.id', 'local-1579372964344')]

In [5]:
#conf = SparkConf().setMaster("local[2]").setAppName("RDD Example")
#sc = SparkContext(conf=conf)
#For dataframe use sparkCOntext
#For sql use sparkSession

In [5]:
spark

As you can see the app name is exactly how we set it

Let's create our first dataframe from a fairly small sample data set. Througout the course we'll work with a log file data set that describes user interactions with a music streaming service. The records describe events such as logging in to the site, visiting a page, listening to the next song, seeing an ad.

In [6]:
path = "./police-department-calls-for-service.json"
crime_log = spark.read.json(path,multiLine=True)

In [8]:
crime_log.printSchema()
#crime_log.describe()
#crime_log.map(lambda x : x.function())

root
 |-- address: string (nullable = true)
 |-- address_type: string (nullable = true)
 |-- agency_id: string (nullable = true)
 |-- call_date: string (nullable = true)
 |-- call_date_time: string (nullable = true)
 |-- call_time: string (nullable = true)
 |-- city: string (nullable = true)
 |-- common_location: string (nullable = true)
 |-- crime_id: string (nullable = true)
 |-- disposition: string (nullable = true)
 |-- offense_date: string (nullable = true)
 |-- original_crime_type_name: string (nullable = true)
 |-- report_date: string (nullable = true)
 |-- state: string (nullable = true)



In [7]:
crime_log.take(10)
#crime_log.describe()

[Row(address='Geary Bl/divisadero St', address_type='Intersection', agency_id='1', call_date='2018-12-31T00:00:00.000', call_date_time='2018-12-31T23:57:00.000', call_time='23:57', city='San Francisco', common_location='', crime_id='183653763', disposition='ADM', offense_date='2018-12-31T00:00:00.000', original_crime_type_name='Traffic Stop', report_date='2018-12-31T00:00:00.000', state='CA'),
 Row(address='100 Blk Howard St', address_type='Geo-Override', agency_id='1', call_date='2018-12-31T00:00:00.000', call_date_time='2018-12-31T23:54:00.000', call_time='23:54', city='San Francisco', common_location='', crime_id='183653756', disposition='CIT', offense_date='2018-12-31T00:00:00.000', original_crime_type_name='Traf Violation Cite', report_date='2018-12-31T00:00:00.000', state='CA'),
 Row(address='3300 Block Of 20th Av', address_type='Common Location', agency_id='1', call_date='2018-12-31T00:00:00.000', call_date_time='2018-12-31T23:49:00.000', call_time='23:49', city='San Francisco',

In [8]:
 #kafka_df = crime_log.selectExpr("CAST(value AS STRING)")
crime_log.count()
crime_log.describe().show()


+-------+------------------+---------------+---------+--------------------+--------------------+---------+-----------+------------------+--------------------+-----------+--------------------+------------------------+--------------------+------+
|summary|           address|   address_type|agency_id|           call_date|      call_date_time|call_time|       city|   common_location|            crime_id|disposition|        offense_date|original_crime_type_name|         report_date| state|
+-------+------------------+---------------+---------+--------------------+--------------------+---------+-----------+------------------+--------------------+-----------+--------------------+------------------------+--------------------+------+
|  count|            199999|         199999|   199999|              199999|              199999|   199999|     199999|            199999|              199999|     199999|              199999|                  199999|              199999|199999|
|   mean| 133.090909

In [11]:
crime_log.describe('crime_id').show()
crime_log.describe('address').show()
crime_log.describe('original_crime_type_name').show()

+-------+--------------------+
|summary|            crime_id|
+-------+--------------------+
|  count|              199999|
|   mean|1.8318701972651362E8|
| stddev|  260420.76950813897|
|    min|           182751007|
|    max|           183653763|
+-------+--------------------+

+-------+------------------+
|summary|           address|
+-------+------------------+
|  count|            199999|
|   mean| 133.0909090909091|
| stddev|243.86613317369614|
|    min|      #1 Church St|
|    max|               Zoo|
+-------+------------------+

+-------+------------------------+
|summary|original_crime_type_name|
+-------+------------------------+
|  count|                  199999|
|   mean|      4069.9766435812558|
| stddev|       9006.415090272823|
|    min|           "Bumper Jack"|
|    max|                      `X|
+-------+------------------------+



In [12]:
crime_log.select('original_crime_type_name').dropDuplicates().sort("original_crime_type_name").show()

+------------------------+
|original_crime_type_name|
+------------------------+
|           "Bumper Jack"|
|                  $1,295|
|            $2,520 Worth|
|                    $200|
|                 & 415's|
|             'S Drinking|
|                  ***909|
|                 **909**|
|          **Disregard***|
|                   *909*|
|        *909* Encampment|
|                       .|
|                     .25|
|                 .25/.70|
|                 .25/500|
|                .25/500b|
|                .25/500e|
|                .25/500h|
|                .25/500l|
|                     .26|
+------------------------+
only showing top 20 rows



In [13]:
crime_log.select(['crime_id','original_crime_type_name','call_date_time','call_time']).where(crime_log.crime_id == 182751007).collect()
crime_log.printSchema()

root
 |-- address: string (nullable = true)
 |-- address_type: string (nullable = true)
 |-- agency_id: string (nullable = true)
 |-- call_date: string (nullable = true)
 |-- call_date_time: string (nullable = true)
 |-- call_time: string (nullable = true)
 |-- city: string (nullable = true)
 |-- common_location: string (nullable = true)
 |-- crime_id: string (nullable = true)
 |-- disposition: string (nullable = true)
 |-- offense_date: string (nullable = true)
 |-- original_crime_type_name: string (nullable = true)
 |-- report_date: string (nullable = true)
 |-- state: string (nullable = true)



In [14]:
crime_log.select(['crime_id','original_crime_type_name','call_date_time','call_time']).collect()

[Row(crime_id='183653763', original_crime_type_name='Traffic Stop', call_date_time='2018-12-31T23:57:00.000', call_time='23:57'),
 Row(crime_id='183653756', original_crime_type_name='Traf Violation Cite', call_date_time='2018-12-31T23:54:00.000', call_time='23:54'),
 Row(crime_id='183653746', original_crime_type_name='Passing Call', call_date_time='2018-12-31T23:49:00.000', call_time='23:49'),
 Row(crime_id='183653745', original_crime_type_name='Audible Alarm', call_date_time='2018-12-31T23:47:00.000', call_time='23:47'),
 Row(crime_id='183653737', original_crime_type_name='Traffic Stop', call_date_time='2018-12-31T23:46:00.000', call_time='23:46'),
 Row(crime_id='183653719', original_crime_type_name='Passing Call', call_date_time='2018-12-31T23:38:00.000', call_time='23:38'),
 Row(crime_id='183653722', original_crime_type_name='Traffic Stop', call_date_time='2018-12-31T23:38:00.000', call_time='23:38'),
 Row(crime_id='183653711', original_crime_type_name='Traffic Stop', call_date_time

In [19]:
import logging
import json
from pyspark.sql.types import *
import pyspark.sql.functions as psf

In [25]:
kafka_df = crime_log.selectExpr("CAST(value AS STRING)")

AnalysisException: "cannot resolve '`value`' given input columns: [report_date, offense_date, address_type, agency_id, call_time, crime_id, original_crime_type_name, common_location, city, disposition, address, state, call_date_time, call_date]; line 1 pos 5;\n'Project [unresolvedalias(cast('value as string), None)]\n+- Relation[address#0,address_type#1,agency_id#2,call_date#3,call_date_time#4,call_time#5,city#6,common_location#7,crime_id#8,disposition#9,offense_date#10,original_crime_type_name#11,report_date#12,state#13] json\n"

In [28]:
distinct_table = crime_log \
        .select('original_crime_type_name', 'disposition', 'call_date_time') \
        .distinct() \
        .withWatermark('call_date_time', "1 minute")
distinct_table.show()

+------------------------+------------+--------------------+
|original_crime_type_name| disposition|      call_date_time|
+------------------------+------------+--------------------+
|       Suspicious Person|         ADV|2018-12-31T20:24:...|
|                     518|         GOA|2018-12-31T15:38:...|
|                    500e|         CAN|2018-12-31T11:39:...|
|            Passing Call|         HAN|2018-12-31T09:55:...|
|                   Drugs|         ADV|2018-12-31T08:54:...|
|                Burglary|         CAN|2018-12-31T07:42:...|
|                     311|         UTL|2018-12-31T00:59:...|
|            Passing Call|         HAN|2018-12-30T23:25:...|
|      Auto Boost / Strip|         REP|2018-12-30T22:14:...|
|      Auto Boost / Strip|         REP|2018-12-30T21:02:...|
|      Traf Violation Tow|         GOA|2018-12-30T19:02:...|
|            Passing Call|         HAN|2018-12-30T15:49:...|
|       Assault / Battery|         REP|2018-12-30T15:33:...|
|      Suspicious Vehicl

In [35]:
agg_df = distinct_table \
        .dropna() \
        .select('original_crime_type_name')\
        .groupby('original_crime_type_name') \
        .agg({'original_crime_type_name': 'count'}) \
        .orderBy('count(original_crime_type_name)', ascending=False)
agg_df.show()

+------------------------+-------------------------------+
|original_crime_type_name|count(original_crime_type_name)|
+------------------------+-------------------------------+
|            Passing Call|                          32123|
|            Traffic Stop|                          13041|
|     Traf Violation Cite|                           9992|
|       Suspicious Person|                           9910|
|      Homeless Complaint|                           5950|
|              Trespasser|                           5799|
|           Audible Alarm|                           5727|
|                  22500e|                           5617|
|        Well Being Check|                           5514|
|         Muni Inspection|                           5159|
|      Suspicious Vehicle|                           5002|
|         Fight No Weapon|                           4185|
|          Noise Nuisance|                           4052|
|      Auto Boost / Strip|                           295

In [7]:
path_r = "./radio_code.json"
c = spark.read.json(path_r,multiLine=True)

In [12]:
radio_log.collect()
 # select original_crime_type_name and disposition
distinct_table = crime_log \
        .select('original_crime_type_name', 'disposition', 'call_date_time') \
        .distinct() \
        .withWatermark('call_date_time', "1 minute")

    # count the number of original crime type
agg_df = distinct_table \
        .dropna() \
        .select('original_crime_type_name') \
        .groupby('original_crime_type_name') \
        .agg({'original_crime_type_name': 'count'}) \
        .orderBy('count(original_crime_type_name)', ascending=True)
distinct_table.collect()
#agg_df.collect()

[Row(original_crime_type_name='Suspicious Person', disposition='ADV', call_date_time='2018-12-31T20:24:00.000'),
 Row(original_crime_type_name='518', disposition='GOA', call_date_time='2018-12-31T15:38:00.000'),
 Row(original_crime_type_name='500e', disposition='CAN', call_date_time='2018-12-31T11:39:00.000'),
 Row(original_crime_type_name='Passing Call', disposition='HAN', call_date_time='2018-12-31T09:55:00.000'),
 Row(original_crime_type_name='Drugs', disposition='ADV', call_date_time='2018-12-31T08:54:00.000'),
 Row(original_crime_type_name='Burglary', disposition='CAN', call_date_time='2018-12-31T07:42:00.000'),
 Row(original_crime_type_name='311', disposition='UTL', call_date_time='2018-12-31T00:59:00.000'),
 Row(original_crime_type_name='Passing Call', disposition='HAN', call_date_time='2018-12-30T23:25:00.000'),
 Row(original_crime_type_name='Auto Boost / Strip', disposition='REP', call_date_time='2018-12-30T22:14:00.000'),
 Row(original_crime_type_name='Auto Boost / Strip', di

In [24]:
from pyspark.sql.types import*
import pyspark.sql.functions as psf

In [28]:
radio_code_df = radio_log.withColumnRenamed("disposition_code", "disposition")
radio_code_df.collect()
radio_code_df.select('disposition').collect()

[Row(disposition='ABA'),
 Row(disposition='ADM'),
 Row(disposition='ADV'),
 Row(disposition='ARR'),
 Row(disposition='CAN'),
 Row(disposition='CSA'),
 Row(disposition='22'),
 Row(disposition='CIT'),
 Row(disposition='CRM'),
 Row(disposition='GOA'),
 Row(disposition='HAN'),
 Row(disposition='NCR'),
 Row(disposition='ND'),
 Row(disposition='NOM'),
 Row(disposition='PAS'),
 Row(disposition='REP'),
 Row(disposition='SFD'),
 Row(disposition='UTL'),
 Row(disposition='VAS')]

In [34]:
join_query = agg_df.join(radio_code_df, col("agg_df.disposition") === col("radio_code_df.disposition"),"inner")
#join_query= agg_df.join(radio_code_df).filter($"agg_df.disposition" === $"radio_code_df.disposition")

SyntaxError: invalid syntax (<ipython-input-34-5b09dd817377>, line 1)

In [142]:
import datetime
import pandas as pd
from pyspark.sql.functions import udf

#get_hour = udf(lambda x : datetime.datetime.fromtimestamp(x / 1000.0).hour)
#
get_hour = udf(lambda x : 2 * x )

crime_log = crime_log.withColumn("added_col_time", get_hour(crime_log.crime_id))


In [None]:
crime_log_pd = crime_log.toPandas()
plt.scatter(crime_log_pd["crime_id"] , crime_log_pd["agency_id"])
plt.xlim(-1,24)
plt.ylim(0, 1.2* max(crime_log_pd["agency_id"]))
plt.xlabel("X___")
plt.ylabel("y____")