# Some Basic PySpark

## Setup

In [5]:
import os
os.environ['JAVA_HOME'] = 'C:\\Program Files\Java\\jre1.8.0_271'
os.environ['SPARK_HOME'] = 'C:\\Spark\\spark-3.0.1-bin-hadoop2.7'

import pandas as pd
import numpy as np

from pyspark import SparkContext
sc = SparkContext.getOrCreate()

import pyspark
from pyspark.sql import SparkSession
spark = SparkSession.builder.getOrCreate()
spark

## Import Data

In [35]:
from pyspark.sql.functions import to_timestamp,col,lit

rc = spark.read.csv('../data/reported-crimes.csv', header=True, inferSchema=True).withColumn('Date', to_timestamp(col('Date'), 'MM/dd/yyyy hh:mm:ss a')).filter(col('Date') <= lit('2018-11-11'))

In [15]:
# Use this since Pandas has nicer output
rc.limit(5).toPandas()

Unnamed: 0,ID,Case Number,Date,Block,IUCR,Primary Type,Description,Location Description,Arrest,Domestic,...,Ward,Community Area,FBI Code,X Coordinate,Y Coordinate,Year,Updated On,Latitude,Longitude,Location
0,11034701,JA366925,2001-01-01 11:00:00,016XX E 86TH PL,1153,DECEPTIVE PRACTICE,FINANCIAL IDENTITY THEFT OVER $ 300,RESIDENCE,False,False,...,8,45,11,,,2001,08/05/2017 03:50:08 PM,,,
1,11227287,JB147188,2017-10-08 03:00:00,092XX S RACINE AVE,281,CRIM SEXUAL ASSAULT,NON-AGGRAVATED,RESIDENCE,False,False,...,21,73,2,,,2017,02/11/2018 03:57:41 PM,,,
2,11227583,JB147595,2017-03-28 14:00:00,026XX W 79TH ST,620,BURGLARY,UNLAWFUL ENTRY,OTHER,False,False,...,18,70,5,,,2017,02/11/2018 03:57:41 PM,,,
3,11227293,JB147230,2017-09-09 20:17:00,060XX S EBERHART AVE,810,THEFT,OVER $500,RESIDENCE,False,False,...,20,42,6,,,2017,02/11/2018 03:57:41 PM,,,
4,11227634,JB147599,2017-08-26 10:00:00,001XX W RANDOLPH ST,281,CRIM SEXUAL ASSAULT,NON-AGGRAVATED,HOTEL/MOTEL,False,False,...,42,32,2,,,2017,02/11/2018 03:57:41 PM,,,


### View Schema

In [11]:
rc.printSchema()

root
 |-- ID: integer (nullable = true)
 |-- Case Number: string (nullable = true)
 |-- Date: timestamp (nullable = true)
 |-- Block: string (nullable = true)
 |-- IUCR: string (nullable = true)
 |-- Primary Type: string (nullable = true)
 |-- Description: string (nullable = true)
 |-- Location Description: string (nullable = true)
 |-- Arrest: boolean (nullable = true)
 |-- Domestic: boolean (nullable = true)
 |-- Beat: integer (nullable = true)
 |-- District: integer (nullable = true)
 |-- Ward: integer (nullable = true)
 |-- Community Area: integer (nullable = true)
 |-- FBI Code: string (nullable = true)
 |-- X Coordinate: integer (nullable = true)
 |-- Y Coordinate: integer (nullable = true)
 |-- Year: integer (nullable = true)
 |-- Updated On: string (nullable = true)
 |-- Latitude: double (nullable = true)
 |-- Longitude: double (nullable = true)
 |-- Location: string (nullable = true)



## Basic Column Commands

In [17]:
rc.select('IUCR').show(5) # can also put col('IUCR') or rc.ICUR in the select call

+----+
|IUCR|
+----+
|1153|
|0281|
|0620|
|0810|
|0281|
+----+
only showing top 5 rows



In [19]:
rc.select('Case Number', 'Date', 'Arrest').show(4)

+-----------+-------------------+------+
|Case Number|               Date|Arrest|
+-----------+-------------------+------+
|   JA366925|2001-01-01 11:00:00| false|
|   JB147188|2017-10-08 03:00:00| false|
|   JB147595|2017-03-28 14:00:00| false|
|   JB147230|2017-09-09 20:17:00| false|
+-----------+-------------------+------+
only showing top 4 rows



In [21]:
# Add column with constant value
rc.withColumn('One', lit(1)).limit(5).toPandas()

Unnamed: 0,ID,Case Number,Date,Block,IUCR,Primary Type,Description,Location Description,Arrest,Domestic,...,Community Area,FBI Code,X Coordinate,Y Coordinate,Year,Updated On,Latitude,Longitude,Location,One
0,11034701,JA366925,2001-01-01 11:00:00,016XX E 86TH PL,1153,DECEPTIVE PRACTICE,FINANCIAL IDENTITY THEFT OVER $ 300,RESIDENCE,False,False,...,45,11,,,2001,08/05/2017 03:50:08 PM,,,,1
1,11227287,JB147188,2017-10-08 03:00:00,092XX S RACINE AVE,281,CRIM SEXUAL ASSAULT,NON-AGGRAVATED,RESIDENCE,False,False,...,73,2,,,2017,02/11/2018 03:57:41 PM,,,,1
2,11227583,JB147595,2017-03-28 14:00:00,026XX W 79TH ST,620,BURGLARY,UNLAWFUL ENTRY,OTHER,False,False,...,70,5,,,2017,02/11/2018 03:57:41 PM,,,,1
3,11227293,JB147230,2017-09-09 20:17:00,060XX S EBERHART AVE,810,THEFT,OVER $500,RESIDENCE,False,False,...,42,6,,,2017,02/11/2018 03:57:41 PM,,,,1
4,11227634,JB147599,2017-08-26 10:00:00,001XX W RANDOLPH ST,281,CRIM SEXUAL ASSAULT,NON-AGGRAVATED,HOTEL/MOTEL,False,False,...,32,2,,,2017,02/11/2018 03:57:41 PM,,,,1


## Working with Rows

In [31]:
# Append new day's data to the dataframe
new_day = spark.read.csv('../data/reported-crimes.csv', header=True, inferSchema=True).withColumn('Date', to_timestamp(col('Date'), 'MM/dd/yyyy hh:mm:ss a')).filter(col('Date') == lit('2018-11-12'))
new_day.toPandas()

Unnamed: 0,ID,Case Number,Date,Block,IUCR,Primary Type,Description,Location Description,Arrest,Domestic,...,Ward,Community Area,FBI Code,X Coordinate,Y Coordinate,Year,Updated On,Latitude,Longitude,Location
0,11505149,JB513151,2018-11-12,003XX S WHIPPLE ST,810,THEFT,OVER $500,STREET,False,False,...,28,27,6,1156099,1898319,2018,11/19/2018 04:22:02 PM,41.876776,-87.702318,"(41.876776356, -87.702317641)"
1,11516594,JB528186,2018-11-12,049XX S PRAIRIE AVE,2826,OTHER OFFENSE,HARASSMENT BY ELECTRONIC MEANS,OTHER,False,False,...,3,38,26,1178879,1872259,2018,11/28/2018 04:14:58 PM,41.804776,-87.619472,"(41.804775828, -87.619472488)"
2,11540042,JB559262,2018-11-12,010XX N DEARBORN ST,1140,DECEPTIVE PRACTICE,EMBEZZLEMENT,CONVENIENCE STORE,True,False,...,2,8,12,1175747,1907348,2018,03/16/2019 04:01:20 PM,41.901133,-87.629905,"(41.901133376, -87.629904979)"


In [37]:
rc.union(new_day).orderBy('Date', ascending=False).limit(5).toPandas()

Unnamed: 0,ID,Case Number,Date,Block,IUCR,Primary Type,Description,Location Description,Arrest,Domestic,...,Ward,Community Area,FBI Code,X Coordinate,Y Coordinate,Year,Updated On,Latitude,Longitude,Location
0,11516594,JB528186,2018-11-12,049XX S PRAIRIE AVE,2826,OTHER OFFENSE,HARASSMENT BY ELECTRONIC MEANS,OTHER,False,False,...,3,38,26,1178879.0,1872259.0,2018,11/28/2018 04:14:58 PM,41.804776,-87.619472,"(41.804775828, -87.619472488)"
1,11505149,JB513151,2018-11-12,003XX S WHIPPLE ST,810,THEFT,OVER $500,STREET,False,False,...,28,27,6,1156099.0,1898319.0,2018,11/19/2018 04:22:02 PM,41.876776,-87.702318,"(41.876776356, -87.702317641)"
2,11540042,JB559262,2018-11-12,010XX N DEARBORN ST,1140,DECEPTIVE PRACTICE,EMBEZZLEMENT,CONVENIENCE STORE,True,False,...,2,8,12,1175747.0,1907348.0,2018,03/16/2019 04:01:20 PM,41.901133,-87.629905,"(41.901133376, -87.629904979)"
3,11513303,JB523990,2018-11-11,007XX S CICERO AVE,281,CRIMINAL SEXUAL ASSAULT,NON-AGGRAVATED,STREET,False,False,...,24,25,2,1144511.0,1896107.0,2018,03/19/2020 03:44:13 PM,41.870932,-87.744921,"(41.870932257, -87.744921277)"
4,11595518,JC152109,2018-11-11,043XX N GREENVIEW AVE,1153,DECEPTIVE PRACTICE,FINANCIAL IDENTITY THEFT OVER $ 300,SMALL RETAIL STORE,False,False,...,47,6,11,,,2018,02/15/2019 04:04:58 PM,,,


In [38]:
# Top 10 num of reported crimes by Primary type
rc.groupBy('Primary Type').count().orderBy('count', ascending=False).limit(10).toPandas()

Unnamed: 0,Primary Type,count
0,THEFT,1418446
1,BATTERY,1232228
2,CRIMINAL DAMAGE,771503
3,NARCOTICS,711744
4,OTHER OFFENSE,418858
5,ASSAULT,418509
6,BURGLARY,388031
7,MOTOR VEHICLE THEFT,314131
8,DECEPTIVE PRACTICE,266035
9,ROBBERY,255599


## Challenge Questions

What % of reported crimes resulted in an arrest?

In [42]:
rc.filter(col('Arrest')).count() / rc.count()

0.2775442315884519

Find the top 3 locations for reported crimes

In [43]:
rc.groupBy('Location Description').count().orderBy('count', ascending=False).limit(3).toPandas()

Unnamed: 0,Location Description,count
0,STREET,1770578
1,RESIDENCE,1144972
2,APARTMENT,698336


## Built-in Functions

### String Functions

In [45]:
from pyspark.sql.functions import lower,upper,substring

rc.select(lower(col('Primary Type')), upper(col('Primary Type')), substring(col('Primary Type'), 1, 4)).limit(5).toPandas()

Unnamed: 0,lower(Primary Type),upper(Primary Type),"substring(Primary Type, 1, 4)"
0,deceptive practice,DECEPTIVE PRACTICE,DECE
1,crim sexual assault,CRIM SEXUAL ASSAULT,CRIM
2,burglary,BURGLARY,BURG
3,theft,THEFT,THEF
4,crim sexual assault,CRIM SEXUAL ASSAULT,CRIM


### Numeric Functions

In [48]:
from pyspark.sql.functions import min,max

rc.select(min(col('Community Area')), max(col('Community Area'))).show(1)

+-------------------+-------------------+
|min(Community Area)|max(Community Area)|
+-------------------+-------------------+
|                  0|                 77|
+-------------------+-------------------+



### Date Functions

What is 3 days earlier than the oldest date and 3 days later than the most recent date?

In [49]:
from pyspark.sql.functions import date_add,date_sub

rc.select(date_sub(min('Date'), 3), date_add(max('Date'), 3)).show(1)

+----------------------+----------------------+
|date_sub(min(Date), 3)|date_add(max(Date), 3)|
+----------------------+----------------------+
|            2000-12-29|            2018-11-14|
+----------------------+----------------------+



### Joins

Also getting police station data to use joins

In [53]:
ps = spark.read.csv('../data/Police_Stations.csv', header=True, inferSchema=True)
ps = ps.drop('WEBSITE')
ps.limit(5).toPandas()

Unnamed: 0,DISTRICT,DISTRICT NAME,ADDRESS,CITY,STATE,ZIP,PHONE,FAX,TTY,X COORDINATE,Y COORDINATE,LATITUDE,LONGITUDE,LOCATION
0,1,Central,1718 S State St,Chicago,IL,60616,312-745-4290,312-745-3694,312-745-3693,1176569.052,1891771.704,41.858373,-87.627356,"(41.8583725929, -87.627356171)"
1,6,Gresham,7808 S Halsted St,Chicago,IL,60620,312-745-3617,312-745-3649,312-745-3639,1172283.013,1853022.646,41.752137,-87.644229,"(41.7521368378, -87.6442289066)"
2,11,Harrison,3151 W Harrison St,Chicago,IL,60612,312-746-8386,312-746-4281,312-746-5151,1155244.069,1897148.755,41.873582,-87.705488,"(41.8735822883, -87.705488126)"
3,16,Jefferson Park,5151 N Milwaukee Ave,Chicago,IL,60630,312-742-4480,312-742-4421,312-742-4423,1138480.758,1933660.473,41.974094,-87.766149,"(41.9740944511, -87.7661488432)"
4,Headquarters,Headquarters,3510 S Michigan Ave,Chicago,IL,60653,,,,1177731.401,1881697.404,41.830702,-87.623395,"(41.8307016873, -87.6233953459)"


In [54]:
rc.cache()
rc.count()

6753601

In [55]:
ps.select('DISTRICT').distinct().limit(15).toPandas()

Unnamed: 0,DISTRICT
0,7
1,15
2,11
3,3
4,8
5,22
6,16
7,5
8,18
9,17


In [56]:
rc.select('District').distinct().limit(15).toPandas()

Unnamed: 0,District
0,31.0
1,12.0
2,22.0
3,
4,1.0
5,6.0
6,16.0
7,3.0
8,20.0
9,5.0


In [61]:
rc.join(ps, rc.District == ps.DISTRICT, 'left_outer').limit(5).toPandas()

Unnamed: 0,ID,Case Number,Date,Block,IUCR,Primary Type,Description,Location Description,Arrest,Domestic,...,STATE,ZIP,PHONE,FAX,TTY,X COORDINATE,Y COORDINATE,LATITUDE,LONGITUDE,LOCATION
0,11034701,JA366925,2001-01-01 11:00:00,016XX E 86TH PL,1153,DECEPTIVE PRACTICE,FINANCIAL IDENTITY THEFT OVER $ 300,RESIDENCE,False,False,...,IL,60617,312-747-7581,312-747-5276,312-747-9169,1193131.299,1837090.265,41.707933,-87.568349,"(41.7079332906, -87.5683491228)"
1,11227287,JB147188,2017-10-08 03:00:00,092XX S RACINE AVE,281,CRIM SEXUAL ASSAULT,NON-AGGRAVATED,RESIDENCE,False,False,...,IL,60643,312-745-0710,312-745-0814,312-745-0569,1165825.476,1830851.333,41.691435,-87.66852,"(41.6914347795, -87.6685203937)"
2,11227583,JB147595,2017-03-28 14:00:00,026XX W 79TH ST,620,BURGLARY,UNLAWFUL ENTRY,OTHER,False,False,...,IL,60629,312-747-8730,312-747-8545,312-747-8116,1154575.242,1862672.049,41.778987,-87.708864,"(41.778987189, -87.7088638153)"
3,11227293,JB147230,2017-09-09 20:17:00,060XX S EBERHART AVE,810,THEFT,OVER $500,RESIDENCE,False,False,...,IL,60637,312-747-8201,312-747-5479,312-747-9168,1182739.183,1858317.732,41.766431,-87.605748,"(41.7664308925, -87.6057478606)"
4,11227634,JB147599,2017-08-26 10:00:00,001XX W RANDOLPH ST,281,CRIM SEXUAL ASSAULT,NON-AGGRAVATED,HOTEL/MOTEL,False,False,...,IL,60616,312-745-4290,312-745-3694,312-745-3693,1176569.052,1891771.704,41.858373,-87.627356,"(41.8583725929, -87.627356171)"


## Challenge Questions

Most frequently reported noncriminal activity

In [64]:
nc = rc.filter((col('Primary Type') == 'NON - CRIMINAL') | (col('Primary Type') == 'NON-CRIMINAL') | (col('Primary Type') == 'NON-CRIMINAL (SUBJECT SPECIFIED)'))
nc.groupBy('Description').count().orderBy('count', ascending=False).show(1)

+-------------+-----+
|  Description|count|
+-------------+-----+
|LOST PASSPORT|  107|
+-------------+-----+
only showing top 1 row



Find the day of the week with the most reported crime

In [67]:
from pyspark.sql.functions import date_format

rc.withColumn('dow', date_format(col('Date'), 'EEEE')).groupBy('dow').count().orderBy('count', ascending=False).show()

+---------+-------+
|      dow|  count|
+---------+-------+
|   Friday|1017004|
|Wednesday| 973937|
|  Tuesday| 968074|
| Saturday| 965206|
| Thursday| 964597|
|   Monday| 952779|
|   Sunday| 912004|
+---------+-------+



## Basic RDD Examples

In [68]:
psrdd = sc.textFile('../data/Police_Stations.csv')
psrdd.first()

'DISTRICT,DISTRICT NAME,ADDRESS,CITY,STATE,ZIP,WEBSITE,PHONE,FAX,TTY,X COORDINATE,Y COORDINATE,LATITUDE,LONGITUDE,LOCATION'

In [69]:
ps_header = psrdd.first()
ps_other = psrdd.filter(lambda line: line != ps_header)
ps_other.first()

'1,Central,1718 S State St,Chicago,IL,60616,http://home.chicagopolice.org/community/districts/1st-district-central/,312-745-4290,312-745-3694,312-745-3693,1176569.052,1891771.704,41.85837259,-87.62735617,"(41.8583725929, -87.627356171)"'

How many police stations are there?

In [75]:
ps_other.map(lambda line: line.split(',')).count()

24

Display DistrictID, District name, address, and zip of district id 7

In [77]:
# Requires detailed knowledge of the dataset
(ps_other.filter(lambda line: line.split(',')[0] == '7')
    .map(lambda line: (line.split(',')[0],
                       line.split(',')[1],
                       line.split(',')[2],
                       line.split(',')[5]
                       )).collect())

[('7', 'Englewood', '1438 W 63rd St', '60636')]