# Chicago Crime Data Investigation using PySpark

## Install Spark

In [None]:
!apt-get update
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
!wget -q http://archive.apache.org/dist/spark/spark-2.3.1/spark-2.3.1-bin-hadoop2.7.tgz
!tar xf spark-2.3.1-bin-hadoop2.7.tgz
!pip install -q findspark

0% [Working]            Hit:1 http://archive.ubuntu.com/ubuntu bionic InRelease
0% [Connecting to security.ubuntu.com (91.189.91.38)] [Connected to cloud.r-pro                                                                               Get:2 http://archive.ubuntu.com/ubuntu bionic-updates InRelease [88.7 kB]
                                                                               Get:3 http://ppa.launchpad.net/c2d4u.team/c2d4u4.0+/ubuntu bionic InRelease [15.9 kB]
                                                                               Get:4 http://ppa.launchpad.net/graphics-drivers/ppa/ubuntu bionic InRelease [21.3 kB]
0% [2 InRelease 59.1 kB/88.7 kB 67%] [Connecting to security.ubuntu.com (91.1890% [1 InRelease gpgv 242 kB] [2 InRelease 64.9 kB/88.7 kB 73%] [Connecting to s0% [1 InRelease gpgv 242 kB] [Connecting to security.ubuntu.com (91.189.91.38)]0% [1 InRelease gpgv 242 kB] [Waiting for headers] [Connecting to security.ubun                           

In [None]:
# Seetting up the paths
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-2.3.1-bin-hadoop2.7"

In [None]:
# check the list of files and folders in the current dorectory
!ls

sample_data  spark-2.3.1-bin-hadoop2.7	spark-2.3.1-bin-hadoop2.7.tgz


In [None]:
# import findspark and checking out how to create a spark session
import findspark
findspark.init()
from pyspark import SparkContext

sc = SparkContext.getOrCreate()
sc

In [None]:
import pyspark
from pyspark.sql import SparkSession
spark = SparkSession.builder.getOrCreate() 
spark

## Downloading and preprocessing Chicago's Reported Crime Data

In [None]:
#!wget https://data.cityofchicago.org/api/views/ijzp-q8t2/rows.csv?accessType=DOWNLOAD
!wget https://data.cityofchicago.org/api/views/qzdf-xmn8/rows.csv?accessType=DOWNLOAD

--2020-10-01 01:47:58--  https://data.cityofchicago.org/api/views/qzdf-xmn8/rows.csv?accessType=DOWNLOAD
Resolving data.cityofchicago.org (data.cityofchicago.org)... 52.206.140.205, 52.206.140.199, 52.206.68.26
Connecting to data.cityofchicago.org (data.cityofchicago.org)|52.206.140.205|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: unspecified [text/csv]
Saving to: ‘rows.csv?accessType=DOWNLOAD’

rows.csv?accessType     [             <=>    ]  34.83M  3.30MB/s    in 10s     

2020-10-01 01:48:09 (3.36 MB/s) - ‘rows.csv?accessType=DOWNLOAD’ saved [36525557]



In [None]:
# check all the files in the directory
!ls

 reported-crimes.csv		 spark-2.3.1-bin-hadoop2.7
'rows.csv?accessType=DOWNLOAD'	 spark-2.3.1-bin-hadoop2.7.tgz
 sample_data			 spark-warehouse


In [None]:
# rename the file to something simple
!mv rows.csv\?accessType\=DOWNLOAD reported-crimes.csv

In [None]:
# check all the files in the directory
!ls

reported-crimes.csv  spark-2.3.1-bin-hadoop2.7	    spark-warehouse
sample_data	     spark-2.3.1-bin-hadoop2.7.tgz


In [None]:
# loading data as a dataframe
from pyspark.sql.functions import to_timestamp,col,lit
rc = spark.read.csv('reported-crimes.csv',header=True).withColumn('Date',to_timestamp(col('Date'),'MM/dd/yyyy hh:mm:ss a')).filter(col('Date') <= lit('2020-09-01'))

In [None]:
# lets check the top five rows
rc.show(5)

+--------+-----------+-------------------+--------------------+----+------------------+--------------------+--------------------+------+--------+----+--------+----+--------------+--------+------------+------------+----+--------------------+--------+---------+--------+
|      ID|Case Number|               Date|               Block|IUCR|      Primary Type|         Description|Location Description|Arrest|Domestic|Beat|District|Ward|Community Area|FBI Code|X Coordinate|Y Coordinate|Year|          Updated On|Latitude|Longitude|Location|
+--------+-----------+-------------------+--------------------+----+------------------+--------------------+--------------------+------+--------+----+--------+----+--------------+--------+------------+------------+----+--------------------+--------+---------+--------+
|12178298|   JD376412|2020-08-10 08:00:00| 078XX S LANGLEY AVE|1130|DECEPTIVE PRACTICE|FRAUD OR CONFIDEN...|           RESIDENCE| false|   false|0624|     006|   6|            69|      11|     

In [None]:
rc.count()

140050

## Working with columns

**Display only the first 5 rows of the column name IUCR **

In [None]:
rc.select(col("IUCR")).show(5)

+----+
|IUCR|
+----+
|1130|
|4387|
|1310|
|1130|
|1154|
+----+
only showing top 5 rows



In [None]:
rc.select("IUCR").show(5)

+----+
|IUCR|
+----+
|1130|
|4387|
|1310|
|1130|
|1154|
+----+
only showing top 5 rows



  **Display only the first 4 rows of the column names Case Number, Date and Arrest**

In [None]:
rc.select("Case Number","Date","Arrest").show(4)

+-----------+-------------------+------+
|Case Number|               Date|Arrest|
+-----------+-------------------+------+
|   JD376412|2020-08-10 08:00:00| false|
|   JD382021|2020-08-29 14:00:00| false|
|   JD382852|2020-08-02 19:46:00| false|
|   JD382444|2020-08-25 16:00:00| false|
+-----------+-------------------+------+
only showing top 4 rows



In [None]:
rc.select(col("Case Number"),col("Date"),col("Arrest")).show(4)

+-----------+-------------------+------+
|Case Number|               Date|Arrest|
+-----------+-------------------+------+
|   JD376412|2020-08-10 08:00:00| false|
|   JD382021|2020-08-29 14:00:00| false|
|   JD382852|2020-08-02 19:46:00| false|
|   JD382444|2020-08-25 16:00:00| false|
+-----------+-------------------+------+
only showing top 4 rows



** Add a column with name One, with entries all 1s **

In [None]:
from pyspark.sql.functions import lit
rc.withColumn('One',lit(1)).show(5)

+--------+-----------+-------------------+--------------------+----+------------------+--------------------+--------------------+------+--------+----+--------+----+--------------+--------+------------+------------+----+--------------------+--------+---------+--------+---+
|      ID|Case Number|               Date|               Block|IUCR|      Primary Type|         Description|Location Description|Arrest|Domestic|Beat|District|Ward|Community Area|FBI Code|X Coordinate|Y Coordinate|Year|          Updated On|Latitude|Longitude|Location|One|
+--------+-----------+-------------------+--------------------+----+------------------+--------------------+--------------------+------+--------+----+--------+----+--------------+--------+------------+------------+----+--------------------+--------+---------+--------+---+
|12178298|   JD376412|2020-08-10 08:00:00| 078XX S LANGLEY AVE|1130|DECEPTIVE PRACTICE|FRAUD OR CONFIDEN...|           RESIDENCE| false|   false|0624|     006|   6|            69|  

** Remove the column IUCR **

In [None]:
rc.drop(col("IUCR")).show(5)

+--------+-----------+-------------------+--------------------+------------------+--------------------+--------------------+------+--------+----+--------+----+--------------+--------+------------+------------+----+--------------------+--------+---------+--------+
|      ID|Case Number|               Date|               Block|      Primary Type|         Description|Location Description|Arrest|Domestic|Beat|District|Ward|Community Area|FBI Code|X Coordinate|Y Coordinate|Year|          Updated On|Latitude|Longitude|Location|
+--------+-----------+-------------------+--------------------+------------------+--------------------+--------------------+------+--------+----+--------+----+--------------+--------+------------+------------+----+--------------------+--------+---------+--------+
|12178298|   JD376412|2020-08-10 08:00:00| 078XX S LANGLEY AVE|DECEPTIVE PRACTICE|FRAUD OR CONFIDEN...|           RESIDENCE| false|   false|0624|     006|   6|            69|      11|        null|        null

In [None]:
rc.count()

140050

## Working with rows

**Add the reported crimes for an additional day, 02-September-2020, to our dataset.**

In [None]:
one_day = spark.read.csv('reported-crimes.csv',header=True).withColumn('Date',to_timestamp(col('Date'),'MM/dd/yyyy hh:mm:ss a')).filter(col('Date') == lit('2020-09-02'))
one_day.count()


7

In [None]:
rc = rc.union(one_day)
rc.show(5)

+--------+-----------+-------------------+--------------------+----+------------------+--------------------+--------------------+------+--------+----+--------+----+--------------+--------+------------+------------+----+--------------------+--------+---------+--------+
|      ID|Case Number|               Date|               Block|IUCR|      Primary Type|         Description|Location Description|Arrest|Domestic|Beat|District|Ward|Community Area|FBI Code|X Coordinate|Y Coordinate|Year|          Updated On|Latitude|Longitude|Location|
+--------+-----------+-------------------+--------------------+----+------------------+--------------------+--------------------+------+--------+----+--------+----+--------------+--------+------------+------------+----+--------------------+--------+---------+--------+
|12178298|   JD376412|2020-08-10 08:00:00| 078XX S LANGLEY AVE|1130|DECEPTIVE PRACTICE|FRAUD OR CONFIDEN...|           RESIDENCE| false|   false|0624|     006|   6|            69|      11|     

In [None]:
rc.count()

140057

6841687

+--------+-----------+-------------------+--------------------+----+--------------------+--------------+--------------------+------+--------+----+--------+----+--------------+--------+------------+------------+----+--------------------+------------+-------------+--------------------+--------------------------+---------+---------------+-------------+-----+----------------------+----------------+------------+
|      ID|Case Number|               Date|               Block|IUCR|        Primary Type|   Description|Location Description|Arrest|Domestic|Beat|District|Ward|Community Area|FBI Code|X Coordinate|Y Coordinate|Year|          Updated On|    Latitude|    Longitude|            Location|Historical Wards 2003-2015|Zip Codes|Community Areas|Census Tracts|Wards|Boundaries - ZIP Codes|Police Districts|Police Beats|
+--------+-----------+-------------------+--------------------+----+--------------------+--------------+--------------------+------+--------+----+--------+----+--------------+---

**What are the top 10 number of reported crimes by Primary type, in descending order of occurence?**

In [None]:
rc.groupBy(col("Primary type")).count().orderBy(col("count"),ascending=False).show(10)

+-------------------+-----+
|       Primary type|count|
+-------------------+-----+
|            BATTERY|28833|
|              THEFT|28071|
|    CRIMINAL DAMAGE|16952|
|            ASSAULT|12391|
| DECEPTIVE PRACTICE| 8994|
|      OTHER OFFENSE| 8435|
|           BURGLARY| 6296|
|MOTOR VEHICLE THEFT| 6145|
|  WEAPONS VIOLATION| 5237|
|            ROBBERY| 5034|
+-------------------+-----+
only showing top 10 rows



+--------------------+-------+
|        Primary type|  count|
+--------------------+-------+
|               THEFT|1440495|
|             BATTERY|1249262|
|     CRIMINAL DAMAGE| 780494|
|           NARCOTICS| 716461|
|             ASSAULT| 425573|
|       OTHER OFFENSE| 425091|
|            BURGLARY| 391699|
| MOTOR VEHICLE THEFT| 317694|
|  DECEPTIVE PRACTICE| 270421|
|             ROBBERY| 258609|
|   CRIMINAL TRESPASS| 195803|
|   WEAPONS VIOLATION|  72699|
|        PROSTITUTION|  68564|
|PUBLIC PEACE VIOL...|  48315|
|OFFENSE INVOLVING...|  46274|
| CRIM SEXUAL ASSAULT|  27851|
|         SEX OFFENSE|  25612|
|INTERFERENCE WITH...|  15601|
|            GAMBLING|  14438|
|LIQUOR LAW VIOLATION|  14130|
+--------------------+-------+
only showing top 20 rows



## Challenge questions

**What percentage of reported crimes resulted in an arrest?**

In [None]:
rc.filter(col("Arrest")=="true").count()/rc.count()

0.16365479768951213

  **What are the top 3 locations for reported crimes?**

In [None]:
rc.groupBy(col("Location Description")).count().orderBy(col("count"),ascending=False).show(3)

+--------------------+-----+
|Location Description|count|
+--------------------+-----+
|              STREET|32967|
|           RESIDENCE|24083|
|           APARTMENT|23216|
+--------------------+-----+
only showing top 3 rows



## Built-in functions

In [None]:
from pyspark.sql import functions

In [None]:
print(dir(functions))



## String functions

**Display the Primary Type column in lower and upper characters, and the first 4 characters of the column**

In [None]:
from pyspark.sql.functions import lower,upper,substring

In [None]:
help(lower)

Help on function lower in module pyspark.sql.functions:

lower(col)
    Converts a string column to lower case.
    
    .. versionadded:: 1.5



In [None]:
rc.withColumn("New_Col",rc["Primary Type"].substr(1,5)).show()

+--------+-----------+-------------------+--------------------+----+------------------+--------------------+--------------------+------+--------+----+--------+----+--------------+--------+------------+------------+----+--------------------+--------+---------+--------+-------+
|      ID|Case Number|               Date|               Block|IUCR|      Primary Type|         Description|Location Description|Arrest|Domestic|Beat|District|Ward|Community Area|FBI Code|X Coordinate|Y Coordinate|Year|          Updated On|Latitude|Longitude|Location|New_Col|
+--------+-----------+-------------------+--------------------+----+------------------+--------------------+--------------------+------+--------+----+--------+----+--------------+--------+------------+------------+----+--------------------+--------+---------+--------+-------+
|12178298|   JD376412|2020-08-10 08:00:00| 078XX S LANGLEY AVE|1130|DECEPTIVE PRACTICE|FRAUD OR CONFIDEN...|           RESIDENCE| false|   false|0624|     006|   6|     

# MLLib with Apache Spark

In [None]:
train_data_path = "/content/sample_data/california_housing_train.csv"
test_data_path = "/content/sample_data/california_housing_test.csv"

In [None]:
# creating a spark session
from pyspark.sql import SparkSession
spark = SparkSession.builder.getOrCreate()

In [None]:
#load data
train =  spark.read.csv(train_data_path,header=True,inferSchema=True)
test = spark.read.csv(test_data_path,header=True,inferSchema=True)

In [None]:
train.show(5)

+---------+--------+------------------+-----------+--------------+----------+----------+-------------+------------------+
|longitude|latitude|housing_median_age|total_rooms|total_bedrooms|population|households|median_income|median_house_value|
+---------+--------+------------------+-----------+--------------+----------+----------+-------------+------------------+
|  -114.31|   34.19|              15.0|     5612.0|        1283.0|    1015.0|     472.0|       1.4936|           66900.0|
|  -114.47|    34.4|              19.0|     7650.0|        1901.0|    1129.0|     463.0|         1.82|           80100.0|
|  -114.56|   33.69|              17.0|      720.0|         174.0|     333.0|     117.0|       1.6509|           85700.0|
|  -114.57|   33.64|              14.0|     1501.0|         337.0|     515.0|     226.0|       3.1917|           73400.0|
|  -114.57|   33.57|              20.0|     1454.0|         326.0|     624.0|     262.0|        1.925|           65500.0|
+---------+--------+----

In [None]:
# creating a feature array
feature_columns = train.columns[2:-1]

from pyspark.ml.feature import VectorAssembler
assembler = VectorAssembler(inputCols=feature_columns,outputCol="features")
train2 = assembler.transform(train)
train2.show(5)

# applying Linear Regression

In [None]:
from pyspark.ml.regression import LinearRegression
algo = LinearRegression(featuresCol="features",labelCol="median_house_value")
model = algo.fit(train2)

In [None]:
test2 = assembler.transform(test)
eval_result = model.evaluate(test2)

<pyspark.ml.regression.LinearRegressionSummary object at 0x7f1d062984a8>


In [None]:
eval_result.r2

0.545883534674609

In [None]:
eval_result.meanSquaredError

5808966246.710222