# Combined Data

This notebook was loaded with:

```bash
PYSPARK_DRIVER_PYTHON=jupyter PYSPARK_DRIVER_PYTHON_OPTS=notebook ./dse/bin/dse pyspark --num-executors 5 --driver-memory 8g --executor-memory 8g
```

At this point, we've got several sets of data processed and cleaned. We also have discovered several fields we can use for joining:

- license_id
- longitude, latitude

Longitude and latitude are great candidates for joining crime, sanitation, weather, and inspections. The problem is that it's not reasonable to expect them to fall on exactly the same coordinate.

Suppose we divided the city up into a grid and determined the coordinates for the center of each cell. Then, we could determine which sanitation complaints and crimes were committed in the cell, and connect that to inspections.

In [1]:
%pylab inline 

Populating the interactive namespace from numpy and matplotlib


In [2]:
from pyspark.sql import Row
from pyspark.sql.types import *
from pyspark.sql.functions import from_unixtime, count, datediff, lag, sum, coalesce, rank, lit, when,col, udf, to_date, year, mean, month, date_format, array
from pyspark.sql.types import StructType, StructField, IntegerType, StringType, FloatType, DateType
from pyspark.ml.feature import StringIndexer, OneHotEncoder
from datetime import datetime
from pyspark.sql.window import Window
import pyspark
import matplotlib
import matplotlib.pyplot as plt

# Categorical Feature Extraction

We're finally at the point where we're ready to encode our final set for use in predictive models. Before we go any further it's important to make some observations on our data. We're going to combine crime, weather, santitation complains, license information, and inspections to form a complete model. Weather, crime, and sanitation complaints are all continuous data, meaning that they represent numerical values (like temperature, proportions, and counts). License and inspections contains a few categorical columns (such as license type, month of inspection, etc.). Computationally speaking, the categorical data isn't processable in its current form. We'll need to change that.

Now, let's consider the cost of what we're going to do. The continuous data simply needs to be joined to the set. We've already pre-computed the aggregations. The cateogrical data will need to be recoded (think shuffling and mapping in terms of Spark). Let's do the categorical mapping on the licenses and inspections before we join all the data so that we have less data to shuffle around, and encoding will trigger Spark actions.

Ok, so what do we need to do to categorical values? Firsr, we'll encode the categorical features with a method called One-Hot encoding. The idea behind that is we'll take a column that looks like this:

| month |
|---|
| 1 |
| 2 |
| 3 |

...and represent it like this:

| month_1 | month_2 | month_3 |
|---|---|---|
| 1 | 0 | 0 |
| 0 | 1 | 0 |
| 0 | 0 | 1 |

That has the advantage that it allows us to mathematically process the data in a model. For example, let's use $\beta$ to represent our variable weights for, say regression model... our formula could look like this:

$$\beta_0 + \beta_1 \times month_1 + \beta_2 \times month_2 + \beta_3 \times month_3$$

Keep in mind that categorical features that are already binary (for example reinspection, recent_inspection, etc.) are already where we want them. This only applies to columns that contain multiple categories in them (like inspection_type). Continuous features are fine the way they are because they represent actual number (proportion of failures, the various counts, temperatures, etc.). So, we need to encode `ward, police_district, inspection_type, month, and weekday`.

Why not encode those before we stored data in the database? We certainly could have, but conceptually, this kind of data modeling really belongs with the model, not the data. It's fine to create indexes to represent the categories in the database because we can use that to look up values, or create some kind of relationship or universally computable attribute to the category. These one hot encoded features really mean nothing to anyone except the machine models.

# Fetching and Joining Features

## Inspections

In [3]:
df_inspections = sqlContext.sql("select inspection_dt, license_id, city_grid, y, y_fail, canvass,\
complaint, cumulative_failures, cumulative_inspections, days_since_last_inspection, ever_failed, inspection_type,\
license_related,liquor,month,prev_fail,proportion_past_failures,recent_inspection,reinspection,risk,special_event,\
task_force, weekday, ward, police_district from chicago_data.inspections_by_city_grid")

In [4]:
df_inspections.count()

78136

We need to encode ` month, weekday,` and `risk`.

In [5]:
encoder = OneHotEncoder(inputCol="month", outputCol="monthVec")
df_inspections = encoder.transform(df_inspections)

In [6]:
encoder = OneHotEncoder(inputCol="weekday", outputCol="weekdayVec")
df_inspections = encoder.transform(df_inspections)

In [7]:
encoder = OneHotEncoder(inputCol="risk", outputCol="riskVec")
df_inspections = encoder.transform(df_inspections)

In [8]:
encoder = OneHotEncoder(inputCol="ward", outputCol="wardVec")
df_inspections = encoder.transform(df_inspections)

In [9]:
encoder = OneHotEncoder(inputCol="police_district", outputCol="police_districtVec")
df_inspections = encoder.transform(df_inspections)

In [10]:
#we can now drop the original categorical columns
df_inspections = df_inspections.drop("month").drop("weekday").drop("risk").drop("ward").drop("police_district")

In [11]:
df_inspections.dtypes

[('inspection_dt', 'date'),
 ('license_id', 'string'),
 ('city_grid', 'int'),
 ('y', 'int'),
 ('y_fail', 'int'),
 ('canvass', 'int'),
 ('complaint', 'int'),
 ('cumulative_failures', 'int'),
 ('cumulative_inspections', 'int'),
 ('days_since_last_inspection', 'int'),
 ('ever_failed', 'int'),
 ('inspection_type', 'int'),
 ('license_related', 'int'),
 ('liquor', 'int'),
 ('prev_fail', 'int'),
 ('proportion_past_failures', 'double'),
 ('recent_inspection', 'int'),
 ('reinspection', 'int'),
 ('special_event', 'int'),
 ('task_force', 'int'),
 ('monthVec', 'vector'),
 ('weekdayVec', 'vector'),
 ('riskVec', 'vector'),
 ('wardVec', 'vector'),
 ('police_districtVec', 'vector')]

In [12]:
#df_inspections = df_inspections.cache()

## Business Licenses

Now, get the business licenses. You'll see why we cast the districts and wards to `int` in a minute, but keep in mind they're already numerical codes. That's perfect for model encoding... we don't need to StringIndex them. It's also not a bad idea to push that to the SQL layer.

In [13]:
df_licenses = sqlContext.sql("select license_id, conditional_approval from chicago_data.licenses");

In [14]:
df_licenses = df_licenses.withColumn("conditional_approval", when(col("conditional_approval") == "Y", 1).otherwise(0))

In [15]:
df_licenses.select(col("conditional_approval")).distinct().collect()

[Row(conditional_approval=1), Row(conditional_approval=0)]

In [16]:
df_licenses.dtypes

[('license_id', 'string'), ('conditional_approval', 'int')]

As it turns out, not all inspection licenses are in the license database. However, the crime data is more comprehensive. We'll take the ward and district from the crime data instead. That leaves only conditional approval for us to use from licenses.

In [17]:
#df_licenses = df_licenses.cache()

In [18]:
df_licenses.count()

63524

#### Join

In [19]:
df_full = df_inspections.join(df_licenses, on="license_id", how="left_outer")

In [20]:
df_full.select(col("conditional_approval")).distinct().collect()

[Row(conditional_approval=None),
 Row(conditional_approval=1),
 Row(conditional_approval=0)]

In [21]:
df_full = df_full.withColumn("conditional_approval", coalesce(col("conditional_approval"), lit(0)))

In [22]:
df_full.select(col("conditional_approval")).distinct().collect()

[Row(conditional_approval=1), Row(conditional_approval=0)]

In [23]:
df_full.dtypes

[('license_id', 'string'),
 ('inspection_dt', 'date'),
 ('city_grid', 'int'),
 ('y', 'int'),
 ('y_fail', 'int'),
 ('canvass', 'int'),
 ('complaint', 'int'),
 ('cumulative_failures', 'int'),
 ('cumulative_inspections', 'int'),
 ('days_since_last_inspection', 'int'),
 ('ever_failed', 'int'),
 ('inspection_type', 'int'),
 ('license_related', 'int'),
 ('liquor', 'int'),
 ('prev_fail', 'int'),
 ('proportion_past_failures', 'double'),
 ('recent_inspection', 'int'),
 ('reinspection', 'int'),
 ('special_event', 'int'),
 ('task_force', 'int'),
 ('monthVec', 'vector'),
 ('weekdayVec', 'vector'),
 ('riskVec', 'vector'),
 ('wardVec', 'vector'),
 ('police_districtVec', 'vector'),
 ('conditional_approval', 'int')]

In [24]:
#df_full = df_full.cache()

|RDD Name|	Storage Level|	Cached Partitions|	Fraction Cached|	Size in Memory|	Size on Disk|
|---|---|---|---|---|---|
|*Project [license_id#979, cast(cast(ward#1004 as decimal(20,0)) as int) AS ward#1012, cast(cast(police_district#1000 as decimal(20,0)) as int) AS police_district#1013, C...	|Memory Deserialized 1x Replicated	|13	|100%	|869.8 KB	|0.0 B|
|*Scan org.apache.spark.sql.cassandra.CassandraSourceRelation@42aa2539 chicago_data.inspections_by_city_grid[inspection_dt#2,license_id#1,...	|Memory Deserialized 1x Replicated	|13	|100%	|2.3 MB	|0.0 B|

## Crime

In [25]:
df_theft = sqlContext.sql("select id, city_grid from chicago_data.crime_by_type where primary_type = 'THEFT'")
df_burglary = sqlContext.sql("select id, city_grid from chicago_data.crime_by_type where primary_type = 'BURGLARY'")
df_other = sqlContext.sql("select id, city_grid from chicago_data.crime_by_type where primary_type in ('BATTERY', 'CRIMINAL DAMAGE', 'NARCOTICS', 'OTHER OFFENSE', 'ASSAULT', 'DECEPTIVE PRACTICE', 'MOTOR VEHICLE THEFT')")

In [26]:
df_theft.count()

460014

In [27]:
df_burglary.count()

133715

In [28]:
df_other.count()

992913

Now, we'll aggregate them by `city_grid`.

In [29]:
df_theft = df_theft.groupby("city_grid").count()\
                         .select(col("city_grid"), col("count").alias("crime_count_theft"))

In [30]:
df_burglary = df_burglary.groupby("city_grid").count()\
                         .select(col("city_grid"), col("count").alias("crime_count_burglary"))

In [31]:
df_other = df_other.groupby("city_grid").count()\
                         .select(col("city_grid"), col("count").alias("crime_count_other"))

In [32]:
df_crime = df_theft.join(df_burglary, on="city_grid", how="left_outer")\
                .join(df_other, on="city_grid", how="left_outer")

In [33]:
df_crime.columns

['city_grid', 'crime_count_theft', 'crime_count_burglary', 'crime_count_other']

In [34]:
df_crime.head(5)

[Row(city_grid=9597, crime_count_theft=21175, crime_count_burglary=9714, crime_count_other=63347),
 Row(city_grid=9898, crime_count_theft=18142, crime_count_burglary=429, crime_count_other=10856),
 Row(city_grid=9498, crime_count_theft=9376, crime_count_burglary=4071, crime_count_other=29065),
 Row(city_grid=9399, crime_count_theft=868, crime_count_burglary=315, crime_count_other=2118),
 Row(city_grid=9695, crime_count_theft=6259, crime_count_burglary=2774, crime_count_other=13822)]

In [35]:
df_full = df_full.join(df_crime, on="city_grid", how="left_outer")

In [36]:
df_full.dtypes

[('city_grid', 'int'),
 ('license_id', 'string'),
 ('inspection_dt', 'date'),
 ('y', 'int'),
 ('y_fail', 'int'),
 ('canvass', 'int'),
 ('complaint', 'int'),
 ('cumulative_failures', 'int'),
 ('cumulative_inspections', 'int'),
 ('days_since_last_inspection', 'int'),
 ('ever_failed', 'int'),
 ('inspection_type', 'int'),
 ('license_related', 'int'),
 ('liquor', 'int'),
 ('prev_fail', 'int'),
 ('proportion_past_failures', 'double'),
 ('recent_inspection', 'int'),
 ('reinspection', 'int'),
 ('special_event', 'int'),
 ('task_force', 'int'),
 ('monthVec', 'vector'),
 ('weekdayVec', 'vector'),
 ('riskVec', 'vector'),
 ('wardVec', 'vector'),
 ('police_districtVec', 'vector'),
 ('conditional_approval', 'int'),
 ('crime_count_theft', 'bigint'),
 ('crime_count_burglary', 'bigint'),
 ('crime_count_other', 'bigint')]

In [37]:
#df_full = df_full.cache()

In [38]:
df_full.filter(col("crime_count_theft").isNull()).count()

0

In [39]:
df_full.filter(col("crime_count_burglary").isNull()).count()

7

In [40]:
df_full = df_full.withColumn("crime_count_burglary", coalesce(col("crime_count_burglary"), lit(0)))

In [41]:
df_full.filter(col("crime_count_other").isNull()).count()

0

## Sanitation

In [42]:
df_sanitation = sqlContext.sql("select service_request_number, city_grid from chicago_data.sanitation_by_city_grid ")

In [43]:
df_full = df_full.join(df_sanitation.groupby("city_grid").count().select(col("count").alias("sanitation_violation_count"), col("city_grid")), on="city_grid", how="left_outer")

In [44]:
df_full.dtypes

[('city_grid', 'int'),
 ('license_id', 'string'),
 ('inspection_dt', 'date'),
 ('y', 'int'),
 ('y_fail', 'int'),
 ('canvass', 'int'),
 ('complaint', 'int'),
 ('cumulative_failures', 'int'),
 ('cumulative_inspections', 'int'),
 ('days_since_last_inspection', 'int'),
 ('ever_failed', 'int'),
 ('inspection_type', 'int'),
 ('license_related', 'int'),
 ('liquor', 'int'),
 ('prev_fail', 'int'),
 ('proportion_past_failures', 'double'),
 ('recent_inspection', 'int'),
 ('reinspection', 'int'),
 ('special_event', 'int'),
 ('task_force', 'int'),
 ('monthVec', 'vector'),
 ('weekdayVec', 'vector'),
 ('riskVec', 'vector'),
 ('wardVec', 'vector'),
 ('police_districtVec', 'vector'),
 ('conditional_approval', 'int'),
 ('crime_count_theft', 'bigint'),
 ('crime_count_burglary', 'bigint'),
 ('crime_count_other', 'bigint'),
 ('sanitation_violation_count', 'bigint')]

In [45]:
df_full.filter(col("sanitation_violation_count").isNull()).count()

7

In [46]:
df_full = df_full.withColumn("sanitation_violation_count", coalesce(col("sanitation_violation_count"), lit(0)))

In [47]:
df_full.filter(col("sanitation_violation_count").isNull()).count()

0

## Weather

In [48]:
df_temp = sqlContext.read.format("org.apache.spark.sql.cassandra").\
               load(keyspace="chicago_data", table="temperature")

In [49]:
df_temp.dtypes

[('DATE', 'date'),
 ('TMAX', 'int'),
 ('TMAX_3', 'double'),
 ('TMAX_5', 'double'),
 ('TMIN', 'int')]

In [50]:
df_temp = df_temp.filter(col("TMAX_3").isNotNull() & col("TMAX_5").isNotNull())

In [51]:
df_full = df_full.join(df_temp.select(col("DATE").alias("inspection_dt"), col("TMAX"), col("TMIN"), col("TMAX_3"), col("TMAX_5"))
                                       , on="inspection_dt", how="left_outer")

In [52]:
df_full.filter(col("TMAX").isNull()).count()

16

In [53]:
df_full = df_full.filter(col("TMAX").isNotNull())

In [54]:
df_full.filter(col("TMAX_3").isNull()).count()

0

In [55]:
df_full.filter(col("TMAX_5").isNull()).count()

0

In [56]:
df_full.filter(col("TMIN").isNull()).count()

0

In [57]:
df_full.count()

78120

# Saving our Data

In [58]:
df_full = df_full.drop("license_id")

In [59]:
df_full.dtypes

[('inspection_dt', 'date'),
 ('city_grid', 'int'),
 ('y', 'int'),
 ('y_fail', 'int'),
 ('canvass', 'int'),
 ('complaint', 'int'),
 ('cumulative_failures', 'int'),
 ('cumulative_inspections', 'int'),
 ('days_since_last_inspection', 'int'),
 ('ever_failed', 'int'),
 ('inspection_type', 'int'),
 ('license_related', 'int'),
 ('liquor', 'int'),
 ('prev_fail', 'int'),
 ('proportion_past_failures', 'double'),
 ('recent_inspection', 'int'),
 ('reinspection', 'int'),
 ('special_event', 'int'),
 ('task_force', 'int'),
 ('monthVec', 'vector'),
 ('weekdayVec', 'vector'),
 ('riskVec', 'vector'),
 ('wardVec', 'vector'),
 ('police_districtVec', 'vector'),
 ('conditional_approval', 'int'),
 ('crime_count_theft', 'bigint'),
 ('crime_count_burglary', 'bigint'),
 ('crime_count_other', 'bigint'),
 ('sanitation_violation_count', 'bigint'),
 ('TMAX', 'int'),
 ('TMIN', 'int'),
 ('TMAX_3', 'double'),
 ('TMAX_5', 'double')]

In [60]:
df_last_60 = df_full.filter(date_format(col("inspection_dt"), "yyyy-MM-dd").between("2016-09-01", "2017-01-01"))                       

In [61]:
df_last_60.count()

1265

In [62]:
df_full = df_full.filter(date_format(col("inspection_dt"), "yyyy-MM-dd").between("2009-01-01", "2016-09-01")) 

In [63]:
df_full.count()

76899

In [64]:
df_full = df_full.drop("inspection_dt")
#df_last_60 = df_last_60.drop("inspection_dt")

In [65]:
df_last_60.write.save('last_60_set', format='parquet', mode='overwrite')

In [73]:
df_full.write.save('full_set', format='parquet', mode='overwrite')

In [None]:
#from pyspark.ml.tuning import ParamGridBuilder, TrainValidationSplit
#sqlContext.read.format('parquet').load('/path/to/file') 