### Instructions
In this exercise, you will briefly perform the following tasks:
1. Ready the local environment for running PySpark (refer to the attached guidelines),
2. Initialize a spark context or session and import data into a spark dataframe,
3. Perform basic analyses and check for inconsistencies,
4. Manipulate the data using joins, grouping, slicing,
5. Analyze the required aspects and visualize.

### 1. Refer to these guidelines to set up the environment for PySpark.
Linux/ MacOS: https://blog.sicara.com/get-started-pyspark-jupyter-guide-tutorial-ae2fe84f594f <br>
Windows: https://changhsinlee.com/install-pyspark-windows-jupyter/

#### 1.1. Check if pyspark is installed properly and import required packages

In [5]:
import pyspark
import numpy
import pandas
import seaborn
import matplotlib
from datetime import *

### 2. Import data into a local spark cluster

#### 2.1. Initialize a spark session (or a spark context)

In [8]:
sparkSession = spark.builder.getOrCreate()

#### 2.2. Import all datasets as spark dataframes. Retain the following names for consistency.

In [10]:
# crimes = Read 'pittsburgh_police_blotter.csv'
# employment_2011 = Read 'ACS_11_5YR_B19052_with_ann.csv.csv'
# employment_2012 = Read 'ACS_12_5YR_B19052_with_ann.csv.csv'
# employment_2013 = Read 'ACS_13_5YR_B19052_with_ann.csv.csv'
# employment_2014 = Read 'ACS_14_5YR_B19052_with_ann.csv.csv'
# employment_2015 = Read 'ACS_15_5YR_B19052_with_ann.csv.csv'
# employment_2016 = Read 'ACS_16_5YR_B19052_with_ann.csv.csv'
# employment_2017 = Read 'ACS_17_5YR_B19052_with_ann.csv.csv'

#crimes = sparkSession.read.option("header","true").csv("pittsburgh_police_blotter.csv")
crimes = sparkSession.read.format("csv").option("header", "true").load("/FileStore/tables/blotter_new.csv")
employment_2011 = sparkSession.read.option("header","true").csv("/FileStore/tables/ACS_11_5YR_B19052_with_ann.csv")
employment_2012 = sparkSession.read.option("header","true").csv("/FileStore/tables/ACS_12_5YR_B19052_with_ann.csv")
employment_2013 = sparkSession.read.option("header","true").csv("/FileStore/tables/ACS_13_5YR_B19052_with_ann.csv")
employment_2014 = sparkSession.read.option("header","true").csv("/FileStore/tables/ACS_14_5YR_B19052_with_ann.csv")
employment_2015 = sparkSession.read.option("header","true").csv("/FileStore/tables/ACS_15_5YR_B19052_with_ann.csv")
employment_2016 = sparkSession.read.option("header","true").csv("/FileStore/tables/ACS_16_5YR_B19052_with_ann.csv")
employment_2017 = sparkSession.read.option("header","true").csv("/FileStore/tables/ACS_17_5YR_B19052_with_ann.csv")

In [11]:
# Optional data cleaning
# Your code here

crimes.show(5)

#### 2.3. Push the data to the cluster and view all available tables

In [13]:
# Your code here

# Check if all the data is in the cluster
# Your code here


crimes.createOrReplaceTempView("crimes_Tab")
employment_2011.createOrReplaceTempView("employment_2011_tab")
employment_2012.createOrReplaceTempView("employment_2012_tab")
employment_2013.createOrReplaceTempView("employment_2013_tab")
employment_2014.createOrReplaceTempView("employment_2014_tab")
employment_2015.createOrReplaceTempView("employment_2015_tab")
employment_2016.createOrReplaceTempView("employment_2016_tab")
employment_2017.createOrReplaceTempView("employment_2017_tab")


### 3. Basic analyses and inconsistencies

#### 3.1. View available attributes (columns) of each table

In [16]:
%sql select * from crimes_Tab

PK,CCR,HIERARCHY,INCIDENTTIME,INCIDENTLOCATION,CLEAREDFLAG,INCIDENTNEIGHBORHOOD,INCIDENTZONE,HIERARCHYDESC,OFFENSES,INCIDENTTRACT,X,Y,zipcodes
2075487,5000381,6,1/1/2005 8:00,"5400 Block NORTHUMBERLAND ST PITTSBURGH, PA",N,Squirrel Hill North,4,MOTOR THEFT (OTHER)-PLATE ONLY,3921 Theft by Unlawful Taking or Disposition.,1401.0,-79.93402692,40.43886986,15232
2075783,5001279,7,1/3/2005 2:00,"PARKVIEW PITTSBURGH, PA",N,Squirrel Hill North,4,MTR VEH THEFT (AUTO),3921 Theft by Unlawful Taking or Disposition.,407.0,0.0,0.0,0
2075786,5001488,7,1/3/2005 17:00,"CROSBY AND PAULINE & PITTSBURGH, PA",N,Squirrel Hill North,3,MOTOR VEH THEFT/TRUCK-BUS,3921 Theft by Unlawful Taking or Disposition.,1907.0,0.0,0.0,0
2076278,5002628,7,1/5/2005 19:30,"1100 Block COLFAX ST PITTSBURGH, PA",N,Squirrel Hill North,1,MTR VEH THEFT (AUTO),3921 Theft by Unlawful Taking or Disposition.,2102.0,-80.02432875,40.46254226,15290
2076279,5002647,7,1/5/2005 20:20,"ALDER/SHADY & PITTSBURGH, PA",N,Shadyside,4,MTR VEH THEFT (AUTO),3921 Theft by Unlawful Taking or Disposition.,705.0,0.0,0.0,0
2087291,5032041,99,3/2/2005 19:34,"2400 Block CHAUNCEY DR PITTSBURGH, PA",Y,Bedford Dwellings,2,,9501 Bench Warrant,509.0,-79.97459725,40.44992033,15222
2087312,5032053,6,3/2/2005 16:00,"6400 Block APPLE AV PITTSBURGH, PA",N,Lincoln-Lemington-Belmar,5,THEFT FROM AUTO,3304 Criminal Mischief. / 3934 Theft from Vehicle,1203.0,-79.90639102,40.46855137,15206
2087366,5032060,6,3/2/2005 20:00,"400 Block FORELAND ST PITTSBURGH, PA",N,Lincoln-Lemington-Belmar,1,THEFT FROM BUILDING,3921 Theft by Unlawful Taking or Disposition.,2302.0,-80.00193979999999,40.45413683,15222
2087381,5032065,10,3/2/2005 20:05,"38TH ST & BUTLER ST PITTSBURGH, PA",Y,Lower Lawrenceville,2,HARRASSMENT/THREAT/ATTEMPT/PHY,2709 Harassment (Non-Specific),603.0,-79.96427468,40.46685485,15201
2087421,5032082,99,3/2/2005 20:40,"800 Block GUCKERT ST PITTSBURGH, PA",N,Lower Lawrenceville,1,,9497 Aided Case,2302.0,-79.99651613,40.45489044,15222


In [17]:
# Your code here

df = spark.sql("select * from employment_2011_tab")

df.show()

#### 3.2. Get the sizes of each of these datasets

In [19]:
%sql  select count(*) from crimes_Tab
UNION
select count(*) from employment_2011_tab

count(1)
50
1046


#### 3.3. View the first 5 rows of 'crimes' and 'employment_2011'.

In [21]:
# Your code here

crimes.show(5)
employment_2011.show(5)

In [22]:
# Your code here

employment_2011.printSchema()

#### 3.4. Print summary statistics of 'crimes' and 'employment_2011'.

In [24]:
# Your code here

In [25]:
# Your code here

#### 3.4. Report inconsistencies, if any. (Check for nulls, zeros, etc.)

In [27]:
# Find if there are any nulls or unexpected values, or other quality checks.
# Your code here
%sql select * from crimes_Tab where 

In [28]:
# Your code here


In [29]:
# Your code here


### 4. Data Manipulation

#### Task 1/3. Query all the rows from the table 'crimes' corresponding to incidents reported in the year 2011 and inner join this table with 'employment_2011' on crimes.zipcodes=employment_2011.GEO.id2. Save this view as 'view1'. <br>
For simplicity, retain only 'INCIDENTTIME', 'INCIDENTNEIGHBORHOOD', 'HIERARCHYDESC', 'zipcodes' from 'crimes' and only 'GEO.id2', 'HD01_VD01', 'HD01_VD02', 'HD01_VD03' from 'employment_2011'.

In [32]:
# Your code here

# Upload to the cluster
# Your code here
from pyspark.sql.functions import to_date, year

crimes_df = crimes.select("INCIDENTTIME","INCIDENTNEIGHBORHOOD","HIERARCHYDESC","zipcodes")\
                  .where((year(to_date("INCIDENTTIME", "MM/dd/yyyy"))) == '2005')

crimes_df.show()

In [33]:
employment_2011_df = employment_2011.withColumnRenamed("GEO.id2", "GEO_ID2").withColumnRenamed("GEO.id", "GEO_ID").withColumnRenamed("GEO.display-label","GEO_display_label");

employment_2011_df.createOrReplaceTempView("emp_2011")




In [34]:
%sql select * from crimes_Tab, emp_2011 where zipcodes = GEO_ID2;

PK,CCR,HIERARCHY,INCIDENTTIME,INCIDENTLOCATION,CLEAREDFLAG,INCIDENTNEIGHBORHOOD,INCIDENTZONE,HIERARCHYDESC,OFFENSES,INCIDENTTRACT,X,Y,zipcodes,GEO_ID,GEO_ID2,GEO_display_label,HD01_VD01,HD02_VD01,HD01_VD02,HD02_VD02,HD01_VD03,HD02_VD03
2075487,5000381,6,1/1/2005 8:00,"5400 Block NORTHUMBERLAND ST PITTSBURGH, PA",N,Squirrel Hill North,4,MOTOR THEFT (OTHER)-PLATE ONLY,3921 Theft by Unlawful Taking or Disposition.,1401.0,-79.93402692,40.43886986,15232,8600000US15232,15232,ZCTA5 15232,6501,292,5526,312,975,187
2076278,5002628,7,1/5/2005 19:30,"1100 Block COLFAX ST PITTSBURGH, PA",N,Squirrel Hill North,1,MTR VEH THEFT (AUTO),3921 Theft by Unlawful Taking or Disposition.,2102.0,-80.02432875,40.46254226,15290,8600000US15290,15290,ZCTA5 15290,0,81,0,81,0,81
2087291,5032041,99,3/2/2005 19:34,"2400 Block CHAUNCEY DR PITTSBURGH, PA",Y,Bedford Dwellings,2,,9501 Bench Warrant,509.0,-79.97459725,40.44992033,15222,8600000US15222,15222,ZCTA5 15222,1447,199,1111,176,336,84
2087312,5032053,6,3/2/2005 16:00,"6400 Block APPLE AV PITTSBURGH, PA",N,Lincoln-Lemington-Belmar,5,THEFT FROM AUTO,3304 Criminal Mischief. / 3934 Theft from Vehicle,1203.0,-79.90639102,40.46855137,15206,8600000US15206,15206,ZCTA5 15206,14040,388,10472,432,3568,372
2087366,5032060,6,3/2/2005 20:00,"400 Block FORELAND ST PITTSBURGH, PA",N,Lincoln-Lemington-Belmar,1,THEFT FROM BUILDING,3921 Theft by Unlawful Taking or Disposition.,2302.0,-80.00193979999999,40.45413683,15222,8600000US15222,15222,ZCTA5 15222,1447,199,1111,176,336,84
2087381,5032065,10,3/2/2005 20:05,"38TH ST & BUTLER ST PITTSBURGH, PA",Y,Lower Lawrenceville,2,HARRASSMENT/THREAT/ATTEMPT/PHY,2709 Harassment (Non-Specific),603.0,-79.96427468,40.46685485,15201,8600000US15201,15201,ZCTA5 15201,5873,258,4189,271,1684,235
2087421,5032082,99,3/2/2005 20:40,"800 Block GUCKERT ST PITTSBURGH, PA",N,Lower Lawrenceville,1,,9497 Aided Case,2302.0,-79.99651613,40.45489044,15222,8600000US15222,15222,ZCTA5 15222,1447,199,1111,176,336,84
2087428,5032088,6,3/2/2005 20:00,"400 Block FORELAND ST PITTSBURGH, PA",N,Lower Lawrenceville,1,THEFT FROM BUILDING,3921 Theft by Unlawful Taking or Disposition.,2302.0,-80.00193979999999,40.45413683,15222,8600000US15222,15222,ZCTA5 15222,1447,199,1111,176,336,84
2087429,5032093,14,3/2/2005 19:05,"200 Block FISK ST PITTSBURGH, PA",Y,Central Lawrenceville,2,CRIMINAL MISCHIEF,3304 Criminal Mischief.,902.0,-79.96158405,40.46924037,15201,8600000US15201,15201,ZCTA5 15201,5873,258,4189,271,1684,235
2087438,5032101,99,3/2/2005 17:35,"GARRISON PL & PENN AV PITTSBURGH, PA",N,Golden Triangle/Civic Arena,2,,3743 Accidents Involving Damage to Attended Veh. or Property,201.0,-79.99738969,40.44398999,15219,8600000US15219,15219,ZCTA5 15219,5703,266,3196,272,2507,241


In [35]:
joinExpression = crimes["zipcodes"] == employment_2011_df['GEO_ID2']

view = crimes.join(employment_2011_df, joinExpression)


view.createOrReplaceTempView("View1")

#### Task 2/3. Create a derived quantity called 'EMPLOYMENT_SCORE' from 'view1'. 'EMPLOYMENT_SCORE' is defined as below. Group this dataframe by 'zipcodes' and aggregate by the count (number of rows corresponding to each zip code) and save as 'view2'. The final dataframe will have 2 columns- count and 'EMPLOYMENT_SCORE'.

In [37]:
# EMPLOYMENT_SCORE = 0.2*(HD01_VD01)+0.3*(HD01_VD02)+0.5*(HD01_VD03).

# Your code here

# Upload to the cluster
# Your code here

df = spark.sql("select 0.2*(HD01_VD01)+0.3*(HD01_VD02)+0.5*(HD01_VD03) as employment_score ,count(*) from view1 group by zipcodes,HD01_VD01,HD01_VD02,HD01_VD03")

df.show()

#### Task 3/3. Create a view to indicate the number of crimes reported in the zip code 15232 over all possible years, and the corresponding numbers of employed and unemployed people. Save this as 'view3'. For example, for the zipcode 15104, this will look like the following table: <br>
(The total number of employed people is given by 'HD01_VD02' and the total number of unemployed people is given by 'HD01_VD03').

Sample view3 for zip code 15104.
+-----+----+-----+--------+----------+
|  Zip|Year|Count|Employed|Unemployed|
+-----+----+-----+--------+----------+
|15104|2011|    2|    2433|      1402|
|15104|2012|    3|    2390|      1432|
|15104|2013|    3|    2553|      1442|
|15104|2014|    2|    2362|      1417|
|15104|2015|    4|    2358|      1442|
+-----+----+-----+--------+----------+

In [40]:
from pyspark.sql.functions import *
from pyspark.sql.types import StringType, IntegerType, DoubleType
from datetime import datetime

schema = StructType( [
 StructField("PK" , IntegerType(), True),
 StructField("CCR", IntegerType(), True),
 StructField("HIERARCHY", IntegerType(), True),
 StructField("INCIDENTTIME", DateType(), True),
 StructField("INCIDENTLOCATION", StringType(), True),
 StructField("CLEAREDFLAG", StringType(), True),
 StructField("INCIDENTNEIGHBORHOOD", StringType(), True),
 StructField("INCIDENTZONE", StringType(), True),
 StructField("HIERARCHYDESC", StringType(), True),
 StructField("OFFENSES", StringType(), True),
 StructField("INCIDENTTRACT", DoubleType(), True),
 StructField("X", DoubleType(), True),
 StructField("Y", DoubleType(), True),
 StructField("zipcodes", IntegerType(), True)
 ])
  
crimes_df = spark.read.format("csv").option("header", "true").schema(schema).load("/FileStore/tables/blotter_new.csv")

In [41]:
crimes_df.show()

In [42]:
new_view = view.select(year(to_date("INCIDENTTIME", "MM/dd/yyyy")).alias("YEAR") ,"*").drop("INCIDENTTIME")
new_view.createOrReplaceTempView("new_view1")

#'HD01_VD02' and the total number of unemployed people is given by 'HD01_VD03')

In [43]:

%sql select count(*),ZIPCODES,YEAR, HD01_VD02 as unemployed ,HD01_VD01 as employed from new_view1 group by ZIPCODES,YEAR,HD01_VD02,HD01_VD01;

count(1),ZIPCODES,YEAR,unemployed,employed
70,15222,2005,1111,1447
28,15260,2005,0,0
3,15112,2005,1037,1508
6,15233,2005,847,1118
1,15147,2005,5413,8073
11,15207,2005,3472,5100
1,15234,2005,4810,6323
4,15216,2005,8361,10627
31,15217,2005,8750,11578
28,15210,2005,7667,11274


In [44]:
new_view.printSchema()

### 5. Generating Insights

(To be presented during the screening)