### Cleaning using PySpark - Working in Progress
#### Tiancheng Zhang

In [81]:
# This is a data cleaning script without "import pandas as pd"!

In [119]:
import pyspark
from pyspark.sql import *
from pyspark.sql import SparkSession
import pyspark.sql.types
from pyspark.sql.types import *
import pyspark.sql.functions as F
from pyspark.sql.functions import isnan, when, count, col

from datetime import datetime as Date
from pyspark.sql.dataframe import DataFrame

In [110]:
#Create a SparkSession
#.master('local[*]') specifies to use all cores on local machine
spark = SparkSession.builder \
                    .master('local[*]') \
                    .config("spark-master", "local") \
                    .appName('INSY695') \
                    .getOrCreate()

In [158]:
#we specify the schema before loading the data, in this case it doesn't need to
#detect the data type which makes the loading faster

#Note that we load the date and time as strings and transform them
#back to date and time once we dealt with the NUlls
#since isnan() and col().isNull() doesn't seem to work with date
bpd_schema = StructType([
    # Define the name field
    StructField('CrimeDate', StringType(), True), # Add the CrimeDate field
    StructField('CrimeTime', StringType(), False), # Add the CrimeTime field
    StructField('CrimeCode', StringType(), False), # Add the CrimeCode field
    StructField('Location', StringType(), False), # Add the Location field
    StructField('Description', StringType(), False), # Add the Description field
    StructField('Inside/Outside', StringType(), False), # Add the Inside/Outside field
    StructField('Weapon', StringType(), False), # Add the Weapon field
    StructField('Post', FloatType(), False), # Add the Post field
    StructField('District', StringType(), False), # Add the District field
    StructField('Neighborhood', StringType(), False), # Add the Neighborhood field
    StructField('Longitude', FloatType(), False), # Add the Longitude field
    StructField('Latitude', FloatType(), False), # Add the Latitude field
    StructField('Location 1', StringType(), False), # Add the Location 1 field
    StructField('Premise', StringType(), False), # Add the Premise field
    StructField('Total Incidents', IntegerType(), False) # Add the Total Incidents field
])

In [200]:
bpd_df = spark.read.csv('./DATA/BPD_CRIME_DATA.csv', header=True, dateFormat="dd/MM/yyyy", schema=bpd_schema)

In [201]:
#This line allows to display the Dataframe
spark.conf.set("spark.sql.repl.eagerEval.enabled", True)
bpd_df

CrimeDate,CrimeTime,CrimeCode,Location,Description,Inside/Outside,Weapon,Post,District,Neighborhood,Longitude,Latitude,Location 1,Premise,Total Incidents
09/02/2017,23:30:00,3JK,4200 AUDREY AVE,ROBBERY - RESIDENCE,I,KNIFE,913.0,SOUTHERN,Brooklyn,-76.60541,39.22951,"(39.2295100000, -...",ROW/TOWNHO,1
09/02/2017,23:00:00,7A,800 NEWINGTON AVE,AUTO THEFT,O,,133.0,CENTRAL,Reservoir Hill,-76.63217,39.3136,"(39.3136000000, -...",STREET,1
09/02/2017,22:53:00,9S,600 RADNOR AV,SHOOTING,Outside,FIREARM,524.0,NORTHERN,Winston-Govans,-76.60697,39.34768,"(39.3476800000, -...",Street,1
09/02/2017,22:50:00,4C,1800 RAMSAY ST,AGG. ASSAULT,I,OTHER,934.0,SOUTHERN,Carrollton Ridge,-76.64526,39.28315,"(39.2831500000, -...",ROW/TOWNHO,1
09/02/2017,22:31:00,4E,100 LIGHT ST,COMMON ASSAULT,O,HANDS,113.0,CENTRAL,Downtown West,-76.61365,39.28756,"(39.2875600000, -...",STREET,1
09/02/2017,22:00:00,5A,CHERRYCREST RD,BURGLARY,I,,922.0,SOUTHERN,Cherry Hill,-76.62131,39.24867,"(39.2486700000, -...",ROW/TOWNHO,1
09/02/2017,21:15:00,1F,3400 HARMONY CT,HOMICIDE,Outside,FIREARM,232.0,SOUTHEASTERN,Canton,-76.56827,39.28202,"(39.2820200000, -...",Street,1
09/02/2017,21:35:00,3B,400 W LANVALE ST,ROBBERY - STREET,O,,123.0,CENTRAL,Upton,-76.62789,39.30254,"(39.3025400000, -...",STREET,1
09/02/2017,21:00:00,4C,2300 LYNDHURST AVE,AGG. ASSAULT,O,OTHER,641.0,NORTHWESTERN,Windsor Hills,-76.68365,39.3137,"(39.3137000000, -...",STREET,1
09/02/2017,21:00:00,4E,1200 N ELLWOOD AVE,COMMON ASSAULT,I,HANDS,332.0,EASTERN,Berea,-76.57419,39.30551,"(39.3055100000, -...",ROW/TOWNHO,1


#### We follow the same steps as we did in Cleaning_Part1.ipynb
#### First, we count the NULL values in the dataset.

In [202]:
bpd_df.select([count(when(isnan(c) | col(c).isNull(), c)).alias(c) for c in bpd_df.columns])

CrimeDate,CrimeTime,CrimeCode,Location,Description,Inside/Outside,Weapon,Post,District,Neighborhood,Longitude,Latitude,Location 1,Premise,Total Incidents
0,0,0,2207,0,10279,180952,224,80,2740,2204,2204,2204,10757,0


#### We drop the columns with repeated information.

In [203]:
columns_to_drop = ['CrimeCode', 'Location', 'Post', 'District', 'Location 1', 'Total Incidents']
bpd_df = bpd_df.drop(*columns_to_drop)

In [204]:
#Modify the data type of CrimeDate and rename to Crime_Date
bpd_df = bpd_df.withColumn('Crime_Date', F.from_unixtime(F.unix_timestamp('CrimeDate', 'dd/MM/yyyy')).cast(DateType()))
bpd_df = bpd_df.drop(bpd_df.CrimeDate)

#Modify the data type of CrimeTime and rename to Crime_Time
bpd_df = bpd_df.withColumn('Crime_Time', F.from_unixtime(F.unix_timestamp('CrimeTime', 'HH:mm:ss')).cast(TimeType()))
bpd_df = bpd_df.drop(bpd_df.CrimeTime)

NameError: name 'TimeType' is not defined

In [None]:
bpd_df.schema

In [153]:
func =  udf(lambda x: datetime.strptime(x, 'dd/MM/yyyy'), DateType())

bpd_df = bpd_df.withColumn('test', func(col('CrimeDate')))

In [157]:
bpd_df.withColumn("CrimeDate",bpd_df['CrimeDate'].cast(DateType()))

PythonException: 
  An exception was thrown from the Python worker. Please see the stack trace below.
Traceback (most recent call last):
  File "/Users/tuliprichard/anaconda3/lib/python3.7/site-packages/pyspark/python/lib/pyspark.zip/pyspark/worker.py", line 605, in main
    process()
  File "/Users/tuliprichard/anaconda3/lib/python3.7/site-packages/pyspark/python/lib/pyspark.zip/pyspark/worker.py", line 597, in process
    serializer.dump_stream(out_iter, outfile)
  File "/Users/tuliprichard/anaconda3/lib/python3.7/site-packages/pyspark/python/lib/pyspark.zip/pyspark/serializers.py", line 223, in dump_stream
    self.serializer.dump_stream(self._batched(iterator), stream)
  File "/Users/tuliprichard/anaconda3/lib/python3.7/site-packages/pyspark/python/lib/pyspark.zip/pyspark/serializers.py", line 141, in dump_stream
    for obj in iterator:
  File "/Users/tuliprichard/anaconda3/lib/python3.7/site-packages/pyspark/python/lib/pyspark.zip/pyspark/serializers.py", line 212, in _batched
    for item in iterator:
  File "/Users/tuliprichard/anaconda3/lib/python3.7/site-packages/pyspark/python/lib/pyspark.zip/pyspark/worker.py", line 450, in mapper
    result = tuple(f(*[a[o] for o in arg_offsets]) for (arg_offsets, f) in udfs)
  File "/Users/tuliprichard/anaconda3/lib/python3.7/site-packages/pyspark/python/lib/pyspark.zip/pyspark/worker.py", line 450, in <genexpr>
    result = tuple(f(*[a[o] for o in arg_offsets]) for (arg_offsets, f) in udfs)
  File "/Users/tuliprichard/anaconda3/lib/python3.7/site-packages/pyspark/python/lib/pyspark.zip/pyspark/worker.py", line 88, in <lambda>
    return lambda *a: toInternal(f(*a))
  File "/Users/tuliprichard/anaconda3/lib/python3.7/site-packages/pyspark/python/lib/pyspark.zip/pyspark/util.py", line 107, in wrapper
    return f(*args, **kwargs)
  File "<ipython-input-153-bb47505e6338>", line 1, in <lambda>
NameError: name 'datetime' is not defined


PythonException: 
  An exception was thrown from the Python worker. Please see the stack trace below.
Traceback (most recent call last):
  File "/Users/tuliprichard/anaconda3/lib/python3.7/site-packages/pyspark/python/lib/pyspark.zip/pyspark/worker.py", line 605, in main
    process()
  File "/Users/tuliprichard/anaconda3/lib/python3.7/site-packages/pyspark/python/lib/pyspark.zip/pyspark/worker.py", line 597, in process
    serializer.dump_stream(out_iter, outfile)
  File "/Users/tuliprichard/anaconda3/lib/python3.7/site-packages/pyspark/python/lib/pyspark.zip/pyspark/serializers.py", line 223, in dump_stream
    self.serializer.dump_stream(self._batched(iterator), stream)
  File "/Users/tuliprichard/anaconda3/lib/python3.7/site-packages/pyspark/python/lib/pyspark.zip/pyspark/serializers.py", line 141, in dump_stream
    for obj in iterator:
  File "/Users/tuliprichard/anaconda3/lib/python3.7/site-packages/pyspark/python/lib/pyspark.zip/pyspark/serializers.py", line 212, in _batched
    for item in iterator:
  File "/Users/tuliprichard/anaconda3/lib/python3.7/site-packages/pyspark/python/lib/pyspark.zip/pyspark/worker.py", line 450, in mapper
    result = tuple(f(*[a[o] for o in arg_offsets]) for (arg_offsets, f) in udfs)
  File "/Users/tuliprichard/anaconda3/lib/python3.7/site-packages/pyspark/python/lib/pyspark.zip/pyspark/worker.py", line 450, in <genexpr>
    result = tuple(f(*[a[o] for o in arg_offsets]) for (arg_offsets, f) in udfs)
  File "/Users/tuliprichard/anaconda3/lib/python3.7/site-packages/pyspark/python/lib/pyspark.zip/pyspark/worker.py", line 88, in <lambda>
    return lambda *a: toInternal(f(*a))
  File "/Users/tuliprichard/anaconda3/lib/python3.7/site-packages/pyspark/python/lib/pyspark.zip/pyspark/util.py", line 107, in wrapper
    return f(*args, **kwargs)
  File "<ipython-input-153-bb47505e6338>", line 1, in <lambda>
NameError: name 'datetime' is not defined


In [156]:
bpd_df.CrimeDate = bpd_df.select(F.to_timestamp(bpd_df.CrimeDate, 'dd/MM/yyyy').alias('CrimeDate'))
bpd_df

TypeError: Invalid argument, not a string or column: +-------------------+
|          CrimeDate|
+-------------------+
|2017-02-09 00:00:00|
|2017-02-09 00:00:00|
|2017-02-09 00:00:00|
|2017-02-09 00:00:00|
|2017-02-09 00:00:00|
|2017-02-09 00:00:00|
|2017-02-09 00:00:00|
|2017-02-09 00:00:00|
|2017-02-09 00:00:00|
|2017-02-09 00:00:00|
|2017-02-09 00:00:00|
|2017-02-09 00:00:00|
|2017-02-09 00:00:00|
|2017-02-09 00:00:00|
|2017-02-09 00:00:00|
|2017-02-09 00:00:00|
|2017-02-09 00:00:00|
|2017-02-09 00:00:00|
|2017-02-09 00:00:00|
|2017-02-09 00:00:00|
+-------------------+
only showing top 20 rows
 of type <class 'pyspark.sql.dataframe.DataFrame'>. For column literals, use 'lit', 'array', 'struct' or 'create_map' function.

In [None]:
df.write.parquet('filename.parquet')