In [2]:
crimes = spark.read.csv("../data/crimes/Crimes_-_One_year_prior_to_present.csv", header=True, inferSchema=True)
# try the above without the header and inferSchema option. see what happens!

In [3]:
# Let's rename the improperly formatted column names
columnNames = crimes.columns
for col in columnNames:
    crimes = crimes.withColumnRenamed(col, col.strip())
    
crimes.columns

['CASE#',
 'DATE  OF OCCURRENCE',
 'BLOCK',
 'IUCR',
 'PRIMARY DESCRIPTION',
 'SECONDARY DESCRIPTION',
 'LOCATION DESCRIPTION',
 'ARREST',
 'DOMESTIC',
 'BEAT',
 'WARD',
 'FBI CD',
 'X COORDINATE',
 'Y COORDINATE',
 'LATITUDE',
 'LONGITUDE',
 'LOCATION']

In [5]:
# Creating a user-defined function to parse an existing column and create a new column
import pyspark.sql.functions as F
import pyspark.sql.types as T

def get_date_in_DD_MM_YYYY(col):
    return col.split(' ')[0]

my_udf = F.UserDefinedFunction(get_date_in_DD_MM_YYYY, T.StringType())

crimes = crimes.withColumn('DATE_AS_STRING', my_udf('DATE  OF OCCURRENCE'))
crimes.take(1)

[Row(CASE#='JB241987', DATE  OF OCCURRENCE='04/28/2018 10:05:00 PM', BLOCK='009XX N LONG AVE', IUCR='2092', PRIMARY DESCRIPTION='NARCOTICS', SECONDARY DESCRIPTION='SOLICIT NARCOTICS ON PUBLICWAY', LOCATION DESCRIPTION='SIDEWALK', ARREST='Y', DOMESTIC='N', BEAT=1524, WARD=37, FBI CD='18', X COORDINATE=1140136, Y COORDINATE=1905903, LATITUDE=41.897894893, LONGITUDE=-87.760743714, LOCATION='(41.897894893, -87.760743714)', DATE_AS_STRING='04/28/2018')]

In [6]:
# Verify that we can convert a column of type string to type datetime
from pyspark.sql.functions import to_date

crimes.select(to_date("DATE_AS_STRING", 'MM/dd/yyyy').alias('DATE')).take(1)

[Row(DATE=datetime.date(2018, 4, 28))]

In [7]:
# Add a new DATE column in the crimes dataframe
crimes = crimes.withColumn("DATE", to_date("DATE_AS_STRING", 'MM/dd/yyyy'))
crimes.take(1)

[Row(CASE#='JB241987', DATE  OF OCCURRENCE='04/28/2018 10:05:00 PM', BLOCK='009XX N LONG AVE', IUCR='2092', PRIMARY DESCRIPTION='NARCOTICS', SECONDARY DESCRIPTION='SOLICIT NARCOTICS ON PUBLICWAY', LOCATION DESCRIPTION='SIDEWALK', ARREST='Y', DOMESTIC='N', BEAT=1524, WARD=37, FBI CD='18', X COORDINATE=1140136, Y COORDINATE=1905903, LATITUDE=41.897894893, LONGITUDE=-87.760743714, LOCATION='(41.897894893, -87.760743714)', DATE_AS_STRING='04/28/2018', DATE=datetime.date(2018, 4, 28))]

In [9]:
# Add a new MONTH column in the crimes dataframe
from pyspark.sql.functions import month

crimes = crimes.withColumn('MONTH', month('DATE'))
crimes.take(1)

[Row(CASE#='JB241987', DATE  OF OCCURRENCE='04/28/2018 10:05:00 PM', BLOCK='009XX N LONG AVE', IUCR='2092', PRIMARY DESCRIPTION='NARCOTICS', SECONDARY DESCRIPTION='SOLICIT NARCOTICS ON PUBLICWAY', LOCATION DESCRIPTION='SIDEWALK', ARREST='Y', DOMESTIC='N', BEAT=1524, WARD=37, FBI CD='18', X COORDINATE=1140136, Y COORDINATE=1905903, LATITUDE=41.897894893, LONGITUDE=-87.760743714, LOCATION='(41.897894893, -87.760743714)', DATE_AS_STRING='04/28/2018', DATE=datetime.date(2018, 4, 28), MONTH=4)]

In [11]:
# Time to Profit! Let's see which months have the highest number of crimes!
from pyspark.sql.functions import desc

crimes.groupBy('MONTH').count().sort(desc("count")).collect()

[Row(MONTH=8, count=24619),
 Row(MONTH=7, count=24322),
 Row(MONTH=5, count=24125),
 Row(MONTH=6, count=23459),
 Row(MONTH=10, count=22769),
 Row(MONTH=9, count=22726),
 Row(MONTH=11, count=21366),
 Row(MONTH=12, count=20937),
 Row(MONTH=3, count=20899),
 Row(MONTH=4, count=20728),
 Row(MONTH=1, count=20126),
 Row(MONTH=2, count=17115)]