# Final Project Clean Data

In [2]:
# import spark
from pyspark.sql import Row, SparkSession
from pyspark.sql.functions import *

# Create or get SparkSession
spark = SparkSession.builder.appName("Clean Data").getOrCreate()

### Read Dataset, Drop Columns, and Rename Columns

In [3]:
# Create dataframe from "NYPD_Complaint_Data_Historic.csv"
df = spark.read.csv("NYPD_Complaint_Data_Historic.csv", inferSchema=True, header=True).cache()

In [4]:
df.printSchema()

root
 |-- CMPLNT_NUM: integer (nullable = true)
 |-- CMPLNT_FR_DT: string (nullable = true)
 |-- CMPLNT_FR_TM: string (nullable = true)
 |-- CMPLNT_TO_DT: string (nullable = true)
 |-- CMPLNT_TO_TM: string (nullable = true)
 |-- ADDR_PCT_CD: integer (nullable = true)
 |-- RPT_DT: string (nullable = true)
 |-- KY_CD: integer (nullable = true)
 |-- OFNS_DESC: string (nullable = true)
 |-- PD_CD: integer (nullable = true)
 |-- PD_DESC: string (nullable = true)
 |-- CRM_ATPT_CPTD_CD: string (nullable = true)
 |-- LAW_CAT_CD: string (nullable = true)
 |-- BORO_NM: string (nullable = true)
 |-- LOC_OF_OCCUR_DESC: string (nullable = true)
 |-- PREM_TYP_DESC: string (nullable = true)
 |-- JURIS_DESC: string (nullable = true)
 |-- JURISDICTION_CODE: integer (nullable = true)
 |-- PARKS_NM: string (nullable = true)
 |-- HADEVELOPT: string (nullable = true)
 |-- HOUSING_PSA: string (nullable = true)
 |-- X_COORD_CD: integer (nullable = true)
 |-- Y_COORD_CD: integer (nullable = true)
 |-- SUSP_AG

In [3]:
# Drop unnecessary columns
df = df.drop("PARKS_NM", "VIC_AGE_GROUP", "VIC_RACE", "VIC_SEX", "STATION_NAME", "PATROL_BORO", "TRANSIT_DISTRICT", 
             "SUSP_SEX", "SUSP_RACE", "SUSP_AGE_GROUP", "X_COORD_CD", "Y_COORD_CD", "HOUSING_PSA", "HADEVELOPT", 
             "JURISDICTION_CODE", "JURIS_DESC", "JURIS_DESC", "RPT_DT", "JURIS_DESC", "PREM_TYP_DESC",
             "PD_CD", "PD_DESC", "LOC_OF_OCCUR_DESC", "CMPLNT_TO_TM", "CMPLNT_TO_DT")

In [4]:
# Rename columns
df = df.toDF('Complaint_ID', 'Start_Date','Start_Time', 'Neighborhood', 'Offence_Code', 'Offence_Type',
        'Status','Offence_Level','Borough','Latitude','Longitude','Lat_Lon')

In [5]:
# Conver start date and end date to timestamp
df = df.withColumn('Start_Date', to_timestamp('Start_Date', 'MM/dd/yyyy'))
df = df.filter(df['Start_Date'] >= lit('2006-01-01 00:00:00')).filter(df['Start_Date'] <= lit('2018-01-01 00:00:00'))

### Save the DataFrame

In [7]:
# The DataFrame is saved in data/NYC_Crime_Cleaned.csv/ as a csv file
df.coalesce(1).write.option("header", "true").csv('data/NYC_Crime_Cleaned.csv')