In [0]:
import pandas as pd

In [0]:
!ls

# Install Spark

In [0]:
!apt-get update 
!sudo apt-get install openjdk-8-jre-headless 
!wget -q https://downloads.apache.org/spark/spark-2.4.5/spark-2.4.5-bin-hadoop2.7.tgz
!tar xf spark-2.4.5-bin-hadoop2.7.tgz
!pip install -q findspark

In [0]:
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-2.4.5-bin-hadoop2.7"

In [0]:
import findspark
findspark.init()
from pyspark import SparkContext

sc = SparkContext.getOrCreate()

import pyspark
from pyspark.sql import  SparkSession
spark = SparkSession.builder.getOrCreate()
spark

# Downloading and preprocessing Chicago's Reported Crime Data

In [0]:
!wget https://data.cityofchicago.org/api/views/ijzp-q8t2/rows.csv?accessType=DOWNLOAD

In [0]:
!ls

In [0]:
!mv rows.csv\?accessType\=DOWNLOAD reported-crimes.csv

In [0]:
from pyspark.sql.functions import to_timestamp, col, lit
rc = spark.read.csv('reported-crimes.csv', header=True).withColumn('Date', to_timestamp(col('Date'), 'MM/dd/yyyy hh:mm:ss a')) 
rc.show(5)

# Schemas

In [0]:
rc.printSchema()

In [0]:
from pyspark.sql.types import StringType, StructField, StructType, TimestampType, BooleanType, DoubleType, IntegerType

In [0]:
rc.columns

In [0]:
labels = [
          ('ID', StringType()),
          ('Case Number', StringType()),
          ('Date', TimestampType()),
          ('Block', StringType()),
          ('IUCR', StringType()),
          ('Primary Type', StringType()),
          ('Description', StringType()),
          ('Location Description', StringType()),
          ('Arrest', StringType()),
          ('Domestic', BooleanType()),
          ('Beat', StringType()),
          ('District', StringType()),
          ('Ward', StringType()),
          ('Community Area', StringType()),
          ('FBI Code', StringType()),
          ('X Coordinate', StringType()),
          ('Y Coordinate', StringType()),
          ('Year', IntegerType()),
          ('Updated On', StringType()),
          ('Latitude', DoubleType()),
          ('Longitude', DoubleType()),
          ('Location', StringType()),
 ]

In [0]:
schema = StructType([StructField(x[0], x[1], True) for x in labels])
schema

In [0]:
''' rc = spark.read.csv('reported-crimes.csv', schema=schema)
rc.printSchema() '''

In [0]:
rc.show(5)

In [0]:
rc.select('IUCR').show(5)
rc.select(rc.IUCR).show(5)

In [0]:
rc.select(col('IUCR')).show(5)

In [0]:
rc.select('Case Number', 'Date', 'Arrest').show(4)

In [0]:
from pyspark.sql.functions import  lit
rc.withColumn('One', lit(1)).show(5)

In [0]:
rc = rc.drop('IUCR')
rc.show(5)

# Working with rows

In [0]:
one_day = spark.read.csv('reported-crimes.csv', header=True).withColumn('Date', to_timestamp(col('Date'), 'MM/dd/yyyy hh:mm:ss a')).filter(col('Date') == lit('2018-11-12')) 
one_day.count()

In [0]:
# rc.union(one_day).orderBy('Date', ascending=False).show(5)

In [0]:
rc.groupBy('Primary Type').count().show()

In [0]:
rc.groupBy('Primary Type').count().orderBy('count', ascending=False).show(5)

# What percentage of reported crimes resulted in an arrest?

In [0]:
rc.select('Arrest').distinct().show()

In [0]:
rc.printSchema()

In [0]:
rc.filter(col('Arrest') == 'true').count() / rc.select('Arrest').count()

**What are the top 3 locations for reported crimes?**

In [0]:
rc.groupby('Location Description').count().orderBy('count', ascending=False).show(3)

# Functions

In [0]:
from pyspark.sql import functions 
print(dir(functions))

**String Functions**

In [0]:
from pyspark.sql.functions import lower, upper, substring

In [0]:
help(substring)

In [0]:
rc.select(lower(col('Primary Type')), upper(col('Primary Type')), substring(col('Primary Type'),1,4)).show(5)

**Numeric Functions**

In [0]:
from pyspark.sql.functions import min, max

In [0]:
rc.select(min(col('Date')), max(col('Date'))).show(1)

**Date**

In [0]:
from pyspark.sql.functions import date_add, date_sub

In [0]:
help(date_add)
help(date_sub)

In [0]:
rc.select(date_sub(min(col('Date')),3), date_add(max(col('Date')),3)).show(1)

In [0]:
from pyspark.sql.functions import to_date, to_timestamp, lit

In [0]:
df = spark.createDataFrame([('2020-12-25 13:30:00', )], ['Christmas'])
df.show(1)

In [0]:
df.select(to_date(col('Christmas'), 'yyyy-MM-dd HH:mm:ss'), to_timestamp(col('Christmas'), 'yyyy-MM-dd HH:mm:ss')).show(1)

In [0]:
df = spark.createDataFrame([('25/Dec/2020 13:30:00', )], ['Christmas'])
df.show(1)

In [0]:
df.select(to_date(col('Christmas'), 'dd/MMM/yyyy HH:mm:ss'), to_timestamp(col('Christmas'), 'dd/MMM/yyyy HH:mm:ss')).show(1)

In [0]:
df = spark.createDataFrame([('25/Dec/2020 01:30:00 PM', )], ['Christmas'])
df.show(1, truncate=False)

In [0]:
df.select(to_date(col('Christmas'), 'MM/dd/yyyy hh:mm:ss aa'), to_timestamp(col('Christmas'), 'MM/dd/yyyy hh:mm:ss aa')).show(1)

In [0]:
n = spark.read.csv('reported-crimes.csv', header=True)
n.show(5, truncate=False)

# Joins

**Download police stations data from Chicago data portal**

In [0]:
!wget -O police-station.csv https://data.cityofchicago.org/api/views/z8bn-74gv/rows.csv?accessType=DOWNLOAD
!ls -l

In [0]:
ps = spark.read.csv('police-station.csv', header=True)
ps.show(5)

**The reported crimes dataset has only the district number. Add the district name by joining the with the police station dataset**

In [0]:
rc.cache()
rc.count()

In [0]:
ps.select(col('DISTRICT')).distinct().show(30)

In [0]:
rc.select(col('District')).distinct().show(30)

In [0]:
from pyspark.sql.functions import lpad

In [0]:
help(lpad)

*add zeros for crime districts numbers be same with police station district numbers*

In [0]:
ps.select(lpad(col('DISTRICT'), 3, '0')).show()

In [0]:
ps = ps.withColumn('Format_district', lpad(col('DISTRICT'), 3, '0'))
ps.show(5)

In [0]:
rc.join(ps, rc.District == ps.Format_district, 'left_outer').show(5)

In [0]:
ps.columns

In [0]:
rc.join(ps, rc.District == ps.Format_district, 'left_outer').drop(
  'ADDRESS',
  'CITY',
  'STATE',
  'ZIP',
  'WEBSITE',
  'PHONE',
  'FAX',
  'TTY',
  'X COORDINATE',
  'Y COORDINATE',
  'LATITUDE',
  'LONGITUDE',
  'LOCATION',
  'Format_district'
  ).show(5)

**What is the most frequently reported non-criminal activity?**

In [0]:
rc.cache()
rc.count()

In [0]:
rc.show(5)

In [0]:
rc.select(col('Primary Type')).distinct().count()

In [0]:
rc.select(col('Primary Type')).distinct().orderBy(col('Primary Type')).show(35, truncate=False)

In [0]:
nc = rc.filter( (col('Primary Type') == 'NON - CRIMINAL') | (col('Primary Type') == 'NON-CRIMINAL')
 | (col('Primary Type') == 'NON-CRIMINAL (SUBJECT SPECIFIED)') )
nc.show(50)

In [0]:
nc.groupBy(col('Description')).count().orderBy('count', ascending=False).show(truncate=False)

**Which day of the week has the most number of reported crime?**

In [0]:
from pyspark.sql.functions import dayofweek

In [0]:
help(dayofweek)

In [0]:
rc.show(5)

In [0]:
rc.select(col('Date'), dayofweek(col('Date'))).show(5)

In [0]:
from pyspark.sql.functions import date_format

In [0]:
help(date_format)

In [0]:
rc.select(col('Date'), dayofweek(col('Date')), date_format(col('Date'), 'E')).show(5)

In [0]:
rc.groupBy(date_format(col('Date'), 'E')).count().show()

In [0]:
rc.groupBy(date_format(col('Date'), 'E')).count().orderBy('count', ascending=False).show()

In [0]:
rc.groupBy(date_format(col('Date'), 'E')).count().collect()

In [0]:
day_of_week = [x[0] for x in rc.groupBy(date_format(col('Date'), 'E')).count().collect()]
day_of_week

In [0]:
counts = [x[1] for x in rc.groupBy(date_format(col('Date'), 'E')).count().collect()]
counts

**Show graph**

In [0]:
import pandas as pd
import matplotlib.pyplot as plt

In [0]:
cp = pd.DataFrame({'Day_of_Week': day_of_week, 'Count': counts})
cp.head(7)

In [0]:
cp.sort_values('Count', ascending=False).plot(kind='bar', color='green', x = 'Day_of_Week', y='Count')
plt.xlabel('Day of the week')
plt.ylabel('Count')
plt.title('The number of reported crimes per day of the week from 2001 to present')

# RDDs setup

In [0]:
psrdd = sc.textFile('police-station.csv')
psrdd.first()

In [0]:
ps_header = psrdd.first()

In [0]:
ps_rest = psrdd.filter(lambda line: line!=ps_header)
ps_rest.first()

**How many police stations are there?**

In [0]:
ps_rest.map(lambda line: line.split(',')).collect()

In [0]:
ps_rest.map(lambda line: line.split(',')).count()

**Display the District ID, District name, Address and Zip for the police station with District ID 7**

In [0]:
(ps_rest.filter(lambda line: line.split(',')[0] == '7')
.map(lambda line: (line.split(',')[0], 
                   line.split(',')[1],
                   line.split(',')[2],
                   line.split(',')[5]
)).collect())

Police stations 10 and 11 are geographically close each other. Display the District ID, District name, Address and Zip for the police station with District ID 

In [0]:
(ps_rest.filter(lambda line: line.split(',')[0] in ['10','11'])
.map(lambda line: (line.split(',')[0], 
                   line.split(',')[1],
                   line.split(',')[2],
                   line.split(',')[5]
)).collect())