## Extraction and Preprocessing of Data for Visualization 

#### Requirements
This notebook was developed and tested on local machine within the following environment. 

Software:
1. Visual Studio Code 1.66.2
2. Python 3.7.4
3. jupyter-core 4.9.2
4. pyspark 3.2.1

Hardware:
1. Windows 10 64-bit
2. 16 GB RAM, 1 TB SSD Disk

### 1. Load libraries and initialize pyspark

In [1]:
#import pandas as pd
#import numpy as np
import pyspark
from pyspark.sql import SQLContext
from pyspark.sql import *
from pyspark.sql.functions import *

In [None]:
# initial pyspark environment
sc = pyspark.context.SparkContext('local')
spark = SparkSession(sc)

#### 2. Utilities functions

In [14]:
"""
functions: load csv file from given path
param: String path
return: pyspark.sql.Dataframe
"""
def load_data(path):
    df = spark.read.csv(path, header=True,inferSchema=True)

    print("Record Count: ", df.count())
    df.printSchema()

    return df

In [15]:
"""
functions: output dataframe to csv file at given path
param: pyspark.sql.Dataframe df, String path
return: void
"""
def output_data(df, path):
    df.toPandas().to_csv(path, index=False)

#### 3. Crime Data
Historical crime data of the Cities of Los Angeles for all crimes recorded from 2010 to 2020 should be downloaded from [State of California Department of Justice Office of the Attorney General](https://openjustice.doj.ca.gov/data-stories/crimes-clearances-arson) as CSV into the working directory and renamed to `crimes_original.csv`.  

In [16]:
# load crime data
crime_input_path = "crimes_original.csv"
crime_df = load_data(crime_input_path)

# Select wanted colunmns
crime_input = crime_df.select("Year", "County", "NCICCode", "Violent_sum", "Property_sum")

# Select time range and Los Angeles County
crime_input = crime_input.filter((crime_input["Year"]>=2010) & (crime_input["Year"]<=2020))\
                    .filter(crime_input["County"]=="Los Angeles County")

print("After selected, data count: ", crime_input.count())

Record Count:  26411
root
 |-- Year: integer (nullable = true)
 |-- County: string (nullable = true)
 |-- NCICCode: string (nullable = true)
 |-- Violent_sum: integer (nullable = true)
 |-- Homicide_sum: integer (nullable = true)
 |-- ForRape_sum: integer (nullable = true)
 |-- Robbery_sum: integer (nullable = true)
 |-- AggAssault_sum: integer (nullable = true)
 |-- Property_sum: integer (nullable = true)
 |-- Burglary_sum: integer (nullable = true)
 |-- VehicleTheft_sum: integer (nullable = true)
 |-- LTtotal_sum: integer (nullable = true)
 |-- ViolentClr_sum: integer (nullable = true)
 |-- HomicideClr_sum: integer (nullable = true)
 |-- ForRapeClr_sum: integer (nullable = true)
 |-- RobberyClr_sum: integer (nullable = true)
 |-- AggAssaultClr_sum: integer (nullable = true)
 |-- PropertyClr_sum: integer (nullable = true)
 |-- BurglaryClr_sum: integer (nullable = true)
 |-- VehicleTheftClr_sum: integer (nullable = true)
 |-- LTtotalClr_sum: integer (nullable = true)
 |-- TotalStructur

In [17]:
# Include the cities list in Los Angeles
cities_path = "cities.csv"
cities_df = load_data(cities_path)

Record Count:  88
root
 |-- City: string (nullable = true)



In [18]:
# Only keep the data by cities
crime_clean = crime_input.join(cities_df, crime_input["NCICCode"]==cities_df["City"]).drop("NCICCode", "County")

print("After joined, data count: ", crime_clean.count())
crime_clean.show(10)

After joined, data count:  968
+----+-----------+------------+----------------+
|Year|Violent_sum|Property_sum|            City|
+----+-----------+------------+----------------+
|2010|        185|        2075|        Alhambra|
|2010|         88|        1737|         Arcadia|
|2010|         64|         265|         Artesia|
|2010|         24|          75|          Avalon|
|2010|        226|        1136|           Azusa|
|2010|        253|        1752|    Baldwin Park|
|2010|        241|         601|            Bell|
|2010|        415|        1776|      Bellflower|
|2010|        210|         940|    Bell Gardens|
|2010|          7|          81|La Habra Heights|
+----+-----------+------------+----------------+
only showing top 10 rows



In [13]:
# write cleaned data to csv
crime_output_path = "crime_clean.csv"
output_data(df=crime_clean, path=crime_output_path)

#### 4. Unemployment Data
Historical unemployment data of the Cities of Los Angeles from 2010 to 2020 should be downloaded from [Employment Development Department of the State of California](https://data.edd.ca.gov/Labor-Force-and-Unemployment-Rates/Local-Area-Unemployment-Statistics-LAUS-Annual-Ave/7jbb-3rb8) as CSV into the working directory and renamed to `unemployment_original.csv`.  

In [67]:
# load unemployment data
unemployment_input_path = "unemployment_original.csv"
unemployment_df = load_data(unemployment_input_path)

# Select wanted colunmns
unemployment_input = unemployment_df.select("Location", "Year", "Unemployment Rate")

# Select time range and Area Type
unemployment_input = unemployment_input.filter((unemployment_input["Year"]>=2010) & (unemployment_input["Year"]<=2020))\


Record Count:  4972
root
 |-- Area Type: string (nullable = true)
 |-- Area Name: string (nullable = true)
 |-- Location: string (nullable = true)
 |-- Year: integer (nullable = true)
 |-- Period: string (nullable = true)
 |-- Labor Force: integer (nullable = true)
 |-- Employment: integer (nullable = true)
 |-- Unemployment: integer (nullable = true)
 |-- Unemployment Rate: double (nullable = true)
 |-- Seasonally Adjusted (Y N): string (nullable = true)
 |-- Status: string (nullable = true)



In [64]:
# Include the cities list in Los Angeles
# The difference between the two city lists is La Canada Flintridge, some dataset uses its French name 
cities_path_2 = "cities_2.csv"
cities_df_2 = load_data(cities_path_2)

Record Count:  88
root
 |-- City: string (nullable = true)



In [70]:
# Only keep the data of cities in Los Angeles
unemployment_clean = unemployment_input.join(cities_df_2, unemployment_input["Location"]==cities_df_2["City"]).drop("Location")

print("After joined, data count: ", unemployment_clean.count())
unemployment_clean.show(10)

After joined, data count:  968
+----+-----------------+------------+
|Year|Unemployment Rate|        City|
+----+-----------------+------------+
|2010|             10.5|Agoura Hills|
|2011|             10.2|Agoura Hills|
|2012|              9.2|Agoura Hills|
|2013|              8.1|Agoura Hills|
|2014|              6.8|Agoura Hills|
|2015|              5.5|Agoura Hills|
|2016|              4.8|Agoura Hills|
|2017|              4.1|Agoura Hills|
|2018|              4.3|Agoura Hills|
|2019|              4.0|Agoura Hills|
+----+-----------------+------------+
only showing top 10 rows



In [71]:
# write cleaned data to csv
unemployment_output_path = "unemployment_clean.csv"
output_data(df=unemployment_clean, path=unemployment_output_path)

#### 5. Wage Data
Historical unemployment data of the Cities of Los Angeles from 2010 to 2020 should be downloaded from [Employment Development Department of the State of California](https://data.edd.ca.gov/Wages/Occupational-Employment-and-Wage-Statistics-OEWS-/pwxn-y2g5) as CSV into the working directory and renamed to `wage_original.csv`.  

#### 6. Education Data
Historical unemployment data of the Cities of Los Angeles from 2010 to 2020 should be downloaded from [Employment Development Department of the State of California](https://data.edd.ca.gov/Labor-Force-and-Unemployment-Rates/Local-Area-Unemployment-Statistics-LAUS-Annual-Ave/7jbb-3rb8) as CSV into the working directory and renamed to `Unemployment_original.csv`.  
Data is loaded as a Spark Dataframe for further cleaning.

#### 7. House Price Data
Historical house price data for the Cities of Los Angeles from 2010 to 2020 should be downloaded from [Zillow](https://www.zillow.com/research/data/) as CSV into the working directory and renamed to `house_price_original.csv`.  

In [74]:
house_data_path = "house_price_original.csv"
house_df = load_data(house_data_path)

Record Count:  117
root
 |-- RegionName: string (nullable = true)
 |-- CountyName: string (nullable = true)
 |-- 2010: integer (nullable = true)
 |-- 2011: integer (nullable = true)
 |-- 2012: integer (nullable = true)
 |-- 2013: integer (nullable = true)
 |-- 2014: integer (nullable = true)
 |-- 2015: integer (nullable = true)
 |-- 2016: integer (nullable = true)
 |-- 2017: integer (nullable = true)
 |-- 2018: integer (nullable = true)
 |-- 2019: integer (nullable = true)
 |-- 2020: integer (nullable = true)
 |-- 2021: integer (nullable = true)



In [75]:
# Keep wanted colunmns but drop unwanted columns
house_clean = house_df.join(cities_df_2, house_df["RegionName"]==cities_df_2["City"]).drop("RegionName", "CountyName")

print("After joined, data count: ", house_clean.count())
house_clean.show(10)

After joined, data count:  85
+------+------+------+------+------+------+------+------+------+------+------+-------+-------------+
|  2010|  2011|  2012|  2013|  2014|  2015|  2016|  2017|  2018|  2019|  2020|   2021|         City|
+------+------+------+------+------+------+------+------+------+------+------+-------+-------------+
|443598|417794|412555|487311|539526|569032|612958|655440|709127|721705|768981| 879316|  Los Angeles|
|392446|367406|360559|422371|460511|481832|516276|549841|591807|602065|639413| 735487|   Long Beach|
|387078|361097|344866|399787|450558|471165|497903|527846|562726|566738|597246| 691957|Santa Clarita|
|539168|507771|499262|581092|635879|678545|731557|780031|847227|860375|913777|1046715|     Glendale|
|152561|145804|143409|174286|202131|213939|233481|257659|288135|301506|324275| 391338|    Lancaster|
|164723|158992|155646|187200|221500|240177|260545|285259|316312|328997|351799| 421161|     Palmdale|
|249878|242974|241167|291516|335086|355523|376839|404012|4371

In [None]:
# write cleaned data to csv
house_output_path = "house_price_clean.csv"
output_data(df=house_clean, path=house_output_path)