In [7]:
# Section must be included at the beginning of each new notebook. Remember to change the app name. 
# If you're using VirtualBox, change the below to '/home/user/spark-2.1.1-bin-hadoop2.7'
import findspark
findspark.init('/home/ubuntu/spark-2.1.1-bin-hadoop2.7')
import pyspark
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName('CrimeAnalysis').getOrCreate()

ERROR:py4j.java_gateway:An error occurred while trying to connect to the Java server (127.0.0.1:38395)
Traceback (most recent call last):
  File "/home/ubuntu/spark-2.1.1-bin-hadoop2.7/python/lib/py4j-0.10.4-src.zip/py4j/java_gateway.py", line 827, in _get_connection
    connection = self.deque.pop()
IndexError: pop from an empty deque

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/home/ubuntu/spark-2.1.1-bin-hadoop2.7/python/lib/py4j-0.10.4-src.zip/py4j/java_gateway.py", line 963, in start
    self.socket.connect((self.address, self.port))
ConnectionRefusedError: [Errno 111] Connection refused


Py4JNetworkError: An error occurred while trying to connect to the Java server (127.0.0.1:38395)

## Import datasets

In [None]:
#import crime data and print schema
df_crime = spark.read.csv('./Datasets/Crimes_-_2012.csv', header = True, inferSchema = True)
df_crime.printSchema()
print(df_crime.columns)

#import census data and print schema
df_census = spark.read.csv('./Datasets/Census_Data_-_Selected_socioeconomic_indicators_in_Chicago__2008___2012.csv',
                          header = True, inferSchema = True)
df_census.printSchema()
print(df_census.columns)

## Explorer the data

In [None]:
# Import pandas.
import pandas as pd

# Take the first five rows of crime data, and visualise.
pd.DataFrame(df_crime.take(5), columns = df_crime.columns)

In [None]:
# Take the first five rows of census data, and visualise.
pd.DataFrame(df_census.take(5), columns = df_census.columns)

## Verify the data quality

In [None]:
# Generating summary statistics for crime data, and converting to a Pandas DataFrame.
df_crime.describe().toPandas()

In [None]:
# Generating summary statistics for census data, and converting to a Pandas DataFrame.
df_census.describe().toPandas()

## Clean the data

In [None]:
# Drop only if NaN in “Community Area” for the crime dataset
df_crime_clean = df_crime.dropna(subset=['Community Area'])

# comparing sizes of data frames 
print("Old crime data frame length:", df_crime.count(), "\nNew crime data frame length:",  
       df_crime_clean.count())

# Drop only if NaN in “Community Area Number” for the census dataset
df_census_clean = df_census.dropna(subset=['Community Area Number'])

# comparing sizes of data frames 
print("Old census data frame length:", df_census.count(), "\nNew census data frame length:",  
       df_census_clean.count())

## Counstruct the data - Derive new columns

In [None]:
import pyspark.sql.functions as f

# Extract month from "Date"
split_col = f.split(df_crime_clean['Date'], '/')
df_crime_clean = df_crime_clean.withColumn('Month', split_col.getItem(1))

# Extract Hour from "TIME"
df_crime_clean = df_crime_clean.withColumn("Hour", f.from_unixtime(f.unix_timestamp("TIME",'hh:mm:ss aa'),'HH'))

# Verify the results
pd.DataFrame(df_crime_clean.select('Date','TIME','Month','Hour').take(15), columns = ['Date','TIME','Month','Hour'])



## Counstruct the data - Aggregation

In [6]:
# Register DataFrame as SQL View
df_crime_clean.createOrReplaceTempView('tb_crime_clean')
df_census_clean.createOrReplaceTempView('tb_census_clean')

# Aggregate crime data by month for analysis 
df_crime_month = spark.sql("SELECT Month, `Primary Type` CrimeType, count(*) RecordCount " + 
                           "FROM tb_crime_clean GROUP BY Month,`Primary Type` ")

# Aggregate crime data by Community Area for analysis 
df_crime_community = spark.sql("SELECT t2.`COMMUNITY AREA NAME` Community, t1.`Primary Type` CrimeType, count(*) RecordCount " + 
                               "FROM tb_crime_clean t1, tb_census_clean t2 " + 
                               "WHERE t1.`Community Area` = t2.`Community Area Number` " + 
                               "GROUP BY t2.`COMMUNITY AREA NAME`, t1.`Primary Type` ")

# Aggregate crime data by Hour and merge with the census data for building model 
df_crime_model = spark.sql("SELECT Hour, `Primary Type` CrimeType, `Community Area` CommunityArea, count(*) RecordCount " + 
                               "FROM tb_crime_clean " + 
                               "GROUP BY Hour, `Primary Type`, `Community Area`  ")

df_join = df_crime_model.join(df_census_clean, 
                              df_crime_model.CommunityArea == df_census_clean["Community Area Number"],
                              "left_outer")

pd.DataFrame(df_join.take(5), columns = df_join.columns)


NameError: name 'df_crime_clean' is not defined

## Data Transformation - Reduce the data

In [None]:
# Select crime type of "THEFT" to build the predition model
df_theft = df_join.where(df_join['CrimeType'] == "THEFT")

# Generating summary statistics for THEFT data
df_theft.describe().toPandas()


## Data Transformation - Project the data

In [None]:
# Derive a new flag "IsCrimeHigh" as the target of prediction
from pyspark.sql.functions import udf

IsCrimeHigh = udf(lambda RecordCount: "1" if RecordCount >=40 else "0")

df_theft = df_theft.withColumn("IsCrimeHigh", IsCrimeHigh(df_theft.RecordCount))

pd.DataFrame(df_theft.take(5), columns = df_theft.columns)


In [None]:
# Find out how many data points we have for each class
df_theft.groupby('IsCrimeHigh').count().toPandas()

In [None]:
# Feature selection 
import numpy as np
import seaborn as sn

corrMatt = df_theft[["RecordCount","PERCENT OF HOUSING CROWDED","PERCENT HOUSEHOLDS BELOW POVERTY",
                     "PERCENT AGED 16+ UNEMPLOYED","PERCENT AGED 25+ WITHOUT HIGH SCHOOL DIPLOMA",
                     "PERCENT AGED UNDER 18 OR OVER 64","PER CAPITA INCOME ", "HARDSHIP INDEX"]].corr()
#mask = np.array(corrMatt)
#mask[np.tril_indices_from(mask)] = False
#fig,ax = plt.subplots()
#fig.set_size_inches(20, 10)
#sn.heatmap(corrMatt, mask = mask, vmax = 0.8, square = True, annot = True)
