# Feature Engineering

In [1]:
%%configure -f
{
    "conf": {
        "spark.pyspark.python": "python3",
        "spark.pyspark.virtualenv.enabled": "true",
        "spark.pyspark.virtualenv.type":"native",
        "spark.pyspark.virtualenv.bin.path":"/usr/bin/virtualenv"
    }
}

ID,YARN Application ID,Kind,State,Spark UI,Driver log,Current session?
0,application_1716473697126_0002,pyspark,idle,Link,Link,
1,application_1716473697126_0003,pyspark,idle,Link,Link,


In [2]:
sc.install_pypi_package("matplotlib==3.2.1", "https://pypi.org/simple")
sc.install_pypi_package("pandas==1.0.5", "https://pypi.org/simple")
sc.install_pypi_package("scipy==1.4.1", "https://pypi.org/simple")
sc.install_pypi_package("seaborn==0.11.2", "https://pypi.org/simple")

Starting Spark application


ID,YARN Application ID,Kind,State,Spark UI,Driver log,Current session?
2,application_1716473697126_0004,pyspark,idle,Link,Link,✔


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

SparkSession available as 'spark'.


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Collecting matplotlib==3.2.1
  Using cached https://files.pythonhosted.org/packages/b2/c2/71fcf957710f3ba1f09088b35776a799ba7dd95f7c2b195ec800933b276b/matplotlib-3.2.1-cp37-cp37m-manylinux1_x86_64.whl
Collecting pyparsing!=2.0.4,!=2.1.2,!=2.1.6,>=2.0.1 (from matplotlib==3.2.1)
  Using cached https://files.pythonhosted.org/packages/9d/ea/6d76df31432a0e6fdf81681a895f009a4bb47b3c39036db3e1b528191d52/pyparsing-3.1.2-py3-none-any.whl
Collecting python-dateutil>=2.1 (from matplotlib==3.2.1)
  Using cached https://files.pythonhosted.org/packages/ec/57/56b9bcc3c9c6a792fcbaf139543cee77261f3651ca9da0c93f5c1221264b/python_dateutil-2.9.0.post0-py2.py3-none-any.whl
Collecting cycler>=0.10 (from matplotlib==3.2.1)
  Using cached https://files.pythonhosted.org/packages/5c/f9/695d6bedebd747e5eb0fe8fad57b72fdf25411273a39791cde838d5a8f51/cycler-0.11.0-py3-none-any.whl
Collecting kiwisolver>=1.0.1 (from matplotlib==3.2.1)
  Using cached https://files.pythonhosted.org/packages/f9/77/e3046bf19720b22e3e0b7c

In [3]:
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql.functions import col
from pyspark.sql.functions import year, month
import matplotlib.pyplot as plt
from pyspark.sql.window import Window
import pyspark.sql.functions as F
from pyspark.ml.regression import LinearRegression
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.feature import VectorAssembler
import matplotlib.pyplot as plt

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [4]:
crimes = spark.read \
    .option("quote", "\"")  \
    .option("escape", "\"") \
    .option("ignoreLeadingWhiteSpace", True) \
    .option("header", "true") \
    .csv("s3://hvpachisia-chicago-crime/data_after_eda/")

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

## Adding features

In [5]:
crimes = crimes.withColumn("HourOfDay", F.hour(crimes["Date"]))

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [6]:
crimes = crimes.withColumn("DayOfWeek", F.dayofweek(crimes["Date"]))


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [7]:
crimes = crimes.withColumn("Season", F.when(F.month(crimes["Date"]).isin(3, 4, 5), "Spring")
                                .when(F.month(crimes["Date"]).isin(6, 7, 8), "Summer")
                                .when(F.month(crimes["Date"]).isin(9, 10, 11), "Fall")
                                .otherwise("Winter"))


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [8]:
crimes = crimes.withColumn("IsWeekend", F.dayofweek(crimes["Date"]).isin([1, 7]).cast("integer"))

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [9]:
crime_density = crimes.groupBy("community_area").count().withColumnRenamed("count", "AreaCrimeCount")
crimes = crimes.join(crime_density, "community_area", "left")

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [25]:
#previous crimes at same location
windowSpec = Window.partitionBy("Block").orderBy("Date")
crimes = crimes.withColumn("PrevCrimesAtLocation", F.count("ID").over(windowSpec) - 1)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [12]:
crimes.printSchema()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

root
 |-- community_area: string (nullable = true)
 |-- ID: string (nullable = true)
 |-- Case Number: string (nullable = true)
 |-- Date: string (nullable = true)
 |-- Block: string (nullable = true)
 |-- Primary Type: string (nullable = true)
 |-- Description: string (nullable = true)
 |-- Location Description: string (nullable = true)
 |-- Arrest: string (nullable = true)
 |-- Domestic: string (nullable = true)
 |-- Beat: string (nullable = true)
 |-- District: string (nullable = true)
 |-- Ward: string (nullable = true)
 |-- X Coordinate: string (nullable = true)
 |-- Y Coordinate: string (nullable = true)
 |-- Year: string (nullable = true)
 |-- Latitude: string (nullable = true)
 |-- Longitude: string (nullable = true)
 |-- Location: string (nullable = true)
 |-- Month: string (nullable = true)
 |-- community_name: string (nullable = true)
 |-- Birth Rate: string (nullable = true)
 |-- Assault (Homicide): string (nullable = true)
 |-- Below Poverty Level: string (nullable = true)

One hot encoding categorical features

In [14]:
categorical_columns = ['Primary Type','Season', 'DayOfWeek', 'Month']
from pyspark.ml.feature import StringIndexer, OneHotEncoder, VectorAssembler
from pyspark.ml import Pipeline
indexers = [StringIndexer(inputCol=column, outputCol=f"{column}_Index") for column in categorical_columns]
encoders = [OneHotEncoder(inputCol=f"{column}_Index", outputCol=f"{column}_Vec") for column in categorical_columns]
pipeline = Pipeline(stages=indexers + encoders)
model = pipeline.fit(crimes)
crimes = model.transform(crimes)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [15]:
crimes.printSchema()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

root
 |-- community_area: string (nullable = true)
 |-- ID: string (nullable = true)
 |-- Case Number: string (nullable = true)
 |-- Date: string (nullable = true)
 |-- Block: string (nullable = true)
 |-- Primary Type: string (nullable = true)
 |-- Description: string (nullable = true)
 |-- Location Description: string (nullable = true)
 |-- Arrest: string (nullable = true)
 |-- Domestic: string (nullable = true)
 |-- Beat: string (nullable = true)
 |-- District: string (nullable = true)
 |-- Ward: string (nullable = true)
 |-- X Coordinate: string (nullable = true)
 |-- Y Coordinate: string (nullable = true)
 |-- Year: string (nullable = true)
 |-- Latitude: string (nullable = true)
 |-- Longitude: string (nullable = true)
 |-- Location: string (nullable = true)
 |-- Month: string (nullable = true)
 |-- community_name: string (nullable = true)
 |-- Birth Rate: string (nullable = true)
 |-- Assault (Homicide): string (nullable = true)
 |-- Below Poverty Level: string (nullable = true)

The following is the column categorization:

Timing related columns: Year, Month, DayofMonth, DayOfWeek, HourOfDay

Location related columns: Block, Location Description, Beat, District,Police District Name,Ward, Community Area,Community Area Name, Community Area Side, Latitude, Longitude

Crime related columns: ID, Case Number, Primary Type, Description, Arrest, Domestic, IUCR, Index Code, FBI Code

Socio-economic factors columns: Below Poverty Level, Crowded Housing, No High-school Diploma, per Capita Income, Unemployment

In [19]:
from pyspark.sql.functions import col
column_names = crimes.schema.names

# New column names, replacing spaces with underscores and converting to lowercase
new_column_names = [name.replace(" ", "_").lower() for name in column_names]

# Renaming the columns
for old_name, new_name in zip(column_names, new_column_names):
    crimes = crimes.withColumnRenamed(old_name, new_name)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [20]:
crimes.printSchema()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

root
 |-- community_area: string (nullable = true)
 |-- id: string (nullable = true)
 |-- case_number: string (nullable = true)
 |-- date: string (nullable = true)
 |-- block: string (nullable = true)
 |-- primary_type: string (nullable = true)
 |-- description: string (nullable = true)
 |-- location_description: string (nullable = true)
 |-- arrest: string (nullable = true)
 |-- domestic: string (nullable = true)
 |-- beat: string (nullable = true)
 |-- district: string (nullable = true)
 |-- ward: string (nullable = true)
 |-- x_coordinate: string (nullable = true)
 |-- y_coordinate: string (nullable = true)
 |-- year: string (nullable = true)
 |-- latitude: string (nullable = true)
 |-- longitude: string (nullable = true)
 |-- location: string (nullable = true)
 |-- month: string (nullable = true)
 |-- community_name: string (nullable = true)
 |-- birth_rate: string (nullable = true)
 |-- assault_(homicide): string (nullable = true)
 |-- below_poverty_level: string (nullable = true)

In [23]:
crimes = crimes.drop('assault_(homicide)'')

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [29]:
# Convert latitude and longitude to Double type
crimes = crimes.withColumn("latitude", col("latitude").cast("double"))
crimes = crimes.withColumn("longitude", col("longitude").cast("double"))

# Convert socioeconomic indicators to Double type
crimes = crimes.withColumn("below_poverty_level", col("below_poverty_level").cast("double"))
crimes = crimes.withColumn("crowded_housing", col("crowded_housing").cast("double"))
crimes = crimes.withColumn("no_high_school_diploma", col("no_high_school_diploma").cast("double"))
crimes = crimes.withColumn("per_capita_income", col("per_capita_income").cast("integer"))
crimes = crimes.withColumn("unemployment", col("unemployment").cast("double"))

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [30]:
# Specify your S3 bucket path
output_path = "s3://hvpachisia-chicago-crime/processed_data/"

# Write DataFrame to S3 as CSV with headers
crimes.write.option("header", "true") \
    .parquet(output_path, mode="overwrite")

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…