**For this notebook to work, the prompt command code needs to be "pyspark" instead of "jupyter notebook"**

**Documentation:**

* PySpark RDD: https://spark.apache.org/docs/2.2.0/api/python/pyspark.html#pyspark.RDD

* PySpark SQL: https://spark.apache.org/docs/latest/sql-programming-guide.html

* PySpark SQL DataFrame: https://spark.apache.org/docs/2.2.0/api/python/pyspark.sql.html#pyspark.sql.DataFrame

In [1]:
# Import pyspark.sql modules
from pyspark.sql import SparkSession, SQLContext, Row
from pyspark.sql.types import FloatType

# Packages to upload to firebase
import json
import requests

In [2]:
# URL for the Firebase database
database_url = 'https://neighborhood-score-la.firebaseio.com/.json'

In [3]:
# Clean Firebase database
requests.put(database_url, data=json.dumps(None))

<Response [200]>

In [4]:
# Initiate the sqlSession, sqlContext and get the link for the Spark UI
sqlSession = SparkSession.builder.master("local").appName("Neighborhood-Scorer").getOrCreate()
sqlContext = SQLContext(sc)
sqlContext.sparkSession

### Load the Crime Dataset to PySpark and MapReduce to Aggregate Data

In [5]:
# Read the csv with neighborhood crime data
df1 = sqlContext.read.csv('Crime_Data_2019_Neighborhoods_v5.csv', header=True)
df1.show()

+---------+----------+--------------------+-------------------+
|    DR_NO|  DATE OCC|        Neighborhood|Crime_Weighted_Norm|
+---------+----------+--------------------+-------------------+
|191907191|  3/8/2019|              Sylmar|        0.113706949|
|190125334|10/17/2019|Downtown Los Angeles|        0.246650906|
|191920961|12/22/2019|    North Hills East|        0.113706949|
|190604395|  1/9/2019|          Central LA|        0.113706949|
|191310615| 4/30/2019|   South Los Angeles|        0.113706949|
|191419522|  9/3/2019|         Westchester|        0.113706949|
|190123550| 9/21/2019|Downtown Los Angeles|        0.246650906|
|191915118| 8/18/2019|              Sylmar|        0.113706949|
|190811511| 6/20/2019|           Brentwood|        0.113706949|
|191605013| 1/29/2019|          Sun Valley|        0.113706949|
|190129579|12/18/2019|Downtown Los Angeles|        0.113706949|
|191116816| 9/18/2019|Northeast Los Ang...|        0.057744638|
|190220657|10/25/2019|            Westla

In [6]:
# MapReduce to get the count of crimes per neighborhood
df2 = df1.groupBy('Neighborhood').count()
df2 = df2.withColumnRenamed('count', 'CrimeCount')
df2.show()

+--------------------+----------+
|        Neighborhood|CrimeCount|
+--------------------+----------+
|           Mar Vista|      1364|
|          West Hills|       793|
|           Hollywood|       165|
|       Glassell Park|       232|
|    Pico - Robertson|       844|
|              Harbor|       902|
|            Mid City|      8345|
|Downtown Los Angeles|     13409|
|              Reseda|      2511|
| Crescenta Highlands|         4|
|    North of Montana|         2|
|          Central LA|     24753|
|          McLaughlin|       231|
|       Culver - West|       303|
|       Baldwin Hills|       142|
|      Elysian Valley|        67|
|          Eagle Rock|        10|
|        Sunset Strip|        14|
|        North Arroyo|         1|
|        Century City|       465|
+--------------------+----------+
only showing top 20 rows



In [7]:
# MapReduce to the the average crime score per neighborhood
df3 = df1.groupBy('Neighborhood').agg({'Crime_Weighted_Norm': 'avg'})
df3 = df3.withColumnRenamed('avg(Crime_Weighted_Norm)', 'CrimeScore')
df3.show()

+--------------------+--------------------+
|        Neighborhood|          CrimeScore|
+--------------------+--------------------+
|           Mar Vista|  0.1655460595469206|
|          West Hills|  0.1577881623682219|
|           Hollywood| 0.17064889600606062|
|       Glassell Park| 0.13016096131896554|
|    Pico - Robertson| 0.16039390652369653|
|              Harbor| 0.16315045023059846|
|            Mid City| 0.15981678633337423|
|Downtown Los Angeles| 0.16958895540107172|
|              Reseda| 0.14990767685384304|
| Crescenta Highlands|        0.1956636565|
|    North of Montana|0.022944957999999998|
|          Central LA| 0.16590776860829162|
|          McLaughlin| 0.13280403066666668|
|       Culver - West|  0.1599755282871288|
|       Baldwin Hills| 0.15898202638732392|
|      Elysian Valley| 0.22014647929850756|
|          Eagle Rock|        0.1659616953|
|        Sunset Strip| 0.18806723907142858|
|        North Arroyo|         0.037952846|
|        Century City|  0.171093

### Load the House Dataset to PySpark and MapReduce to Aggregate Data

In [8]:
# Read the csv with neighborhood housing data
df4 = sqlContext.read.csv('neighborhood_housing_score.csv', header=True)
df4 = df4.withColumnRenamed('RegionName', 'Neighborhood')
df4.show()

+---+--------------------+--------+------------------+
|_c0|        Neighborhood|SizeRank|     Housing_Score|
+---+--------------------+--------+------------------+
|  0|   South Los Angeles|       4|442013.57352941175|
|  1|Southeast Los Ang...|       8| 367733.6911764706|
|  2|           Hollywood|      18| 847758.7058823529|
|  3|            Mid City|      62| 986030.0147058824|
|  4|            Van Nuys|      64| 535259.6764705882|
|  5|          Sun Valley|      68|513386.48529411765|
|  6|          Northridge|      69| 670616.4705882353|
|  7|              Sylmar|      74| 483452.1617647059|
|  8|       Boyle Heights|      75| 424736.1617647059|
|  9|     North Hollywood|      78| 613276.5441176471|
| 10|           San Pedro|      79| 583256.8382352941|
| 11|           Koreatown|      82| 629124.5441176471|
| 12|      Woodland Hills|      83| 780429.2647058824|
| 13|        Sherman Oaks|      85| 951315.7647058824|
| 14|             Pacoima|      86|434265.85294117645|
| 15|     

In [9]:
# MapReduce to the the average size rank per neighborhood
df5 = df4.groupBy('Neighborhood').agg({'SizeRank': 'avg'})
df5 = df5.withColumnRenamed('avg(SizeRank)', 'SizeRank')
df5.show()

+----------------+--------+
|    Neighborhood|SizeRank|
+----------------+--------+
|       Mar Vista|   238.0|
| Harvard Heights|   413.0|
|      West Hills|   288.0|
|       Hollywood|    18.0|
|   Glassell Park|   540.0|
|        Mid City|    62.0|
|          Reseda|   100.0|
|         Del Rey|   331.0|
|   Playa Del Rey|   886.0|
|         Sunland|   866.0|
|    Shadow Hills|  4441.0|
|  Elysian Valley|  2819.0|
|   Mid City West|   208.0|
|      Eagle Rock|   382.0|
|    Elysian Park| 11212.0|
|    Century City|  3269.0|
|      Pico-Union|   770.0|
|       Brentwood|   360.0|
|   Cheviot Hills|   988.0|
|West Los Angeles|  1179.0|
+----------------+--------+
only showing top 20 rows



In [10]:
# MapReduce to the the average size rank per neighborhood
df6 = df4.groupBy('Neighborhood').agg({'Housing_Score': 'avg'})
df6 = df6.withColumnRenamed('avg(Housing_Score)', 'HousingScore')
df6.show()

+----------------+------------------+
|    Neighborhood|      HousingScore|
+----------------+------------------+
|       Mar Vista|1330032.9411764706|
| Harvard Heights| 681784.0735294118|
|      West Hills| 662252.3382352941|
|       Hollywood| 847758.7058823529|
|   Glassell Park| 735981.5588235294|
|        Mid City| 986030.0147058824|
|          Reseda|501339.04411764705|
|         Del Rey|1011292.3970588235|
|   Playa Del Rey| 972849.4558823529|
|         Sunland| 556602.8088235294|
|    Shadow Hills| 715190.3529411765|
|  Elysian Valley| 687034.6764705882|
|   Mid City West|1532142.4117647058|
|      Eagle Rock|         796614.25|
|    Elysian Park| 590152.0735294118|
|    Century City|1099622.0735294118|
|      Pico-Union| 549041.7647058824|
|       Brentwood|2223105.0588235296|
|   Cheviot Hills|1664359.4117647058|
|West Los Angeles| 1257659.455882353|
+----------------+------------------+
only showing top 20 rows



### Load the School Dataset to PySpark and MapReduce to Aggregate Data

In [11]:
# Read the csv with neighborhood school data
df7 = sqlContext.read.csv('ACT19_LA_neighborhood_socre.csv', header=True)
df7 = df7.withColumnRenamed('RegionName', 'Neighborhood')
df7 = df7.withColumnRenamed('AVERAGE', 'AvgACT')
df7 = df7.withColumn("AvgScrRead", df7["AvgScrRead"].cast(FloatType()))
df7 = df7.withColumn("AvgScrEng", df7["AvgScrEng"].cast(FloatType()))
df7 = df7.withColumn("AvgScrRead", df7["AvgScrRead"].cast(FloatType()))
df7 = df7.withColumn("AvgScrMath", df7["AvgScrMath"].cast(FloatType()))
df7 = df7.withColumn("AvgScrSci", df7["AvgScrSci"].cast(FloatType()))
df7 = df7.withColumn("AvgACT", df7["AvgACT"].cast(FloatType()))
df7.show()

+--------------+----------+---------+----------+---------+---------+
|  Neighborhood|AvgScrRead|AvgScrEng|AvgScrMath|AvgScrSci|   AvgACT|
+--------------+----------+---------+----------+---------+---------+
|  Agoura Hills|      27.0|     26.0|      25.0|     24.0|     25.5|
|      Alhambra|      25.5|     26.0|      25.5|     24.5|   25.375|
|     Allentown|      25.0|     24.0|      22.0|     23.0|     23.5|
|       Arcadia|      28.0|     29.0|      28.0|     27.0|     28.0|
|         Azusa|      17.0|     16.0|      18.0|     19.0|     17.5|
|  Baldwin Park|      21.0|     19.0|      20.0|     20.0|     20.0|
|       Bassett|      23.0|     22.0|      22.0|     22.0|    22.25|
|          Bell|      18.0|     17.0|      18.0|     18.0|    17.75|
| Beverly Hills|      28.0|     29.0|      27.0|     26.0|     27.5|
|       Burbank|      25.5|     25.5|      24.5|     24.0|   24.875|
|    California| 18.666666|16.666666| 17.666666|17.666666|17.666666|
|   Canoga Park|      19.0|     18

### Merge All Datasets Indexing by Neighborhood

In [12]:
# Join all dataframes by neighborhood
df_merged = df2.join(df3, 'Neighborhood', how='fullouter').join(df5, 'Neighborhood', how='fullouter').join(df6, 'Neighborhood', how='fullouter').join(df7, 'Neighborhood', how='fullouter')
df_merged.show()

+--------------------+----------+--------------------+--------+------------------+----------+---------+----------+---------+------+
|        Neighborhood|CrimeCount|          CrimeScore|SizeRank|      HousingScore|AvgScrRead|AvgScrEng|AvgScrMath|AvgScrSci|AvgACT|
+--------------------+----------+--------------------+--------+------------------+----------+---------+----------+---------+------+
|     Harvard Heights|      null|                null|   413.0| 681784.0735294118|      null|     null|      null|     null|  null|
|           Mar Vista|      1364|  0.1655460595469206|   238.0|1330032.9411764706|      null|     null|      null|     null|  null|
|          West Hills|       793|  0.1577881623682219|   288.0| 662252.3382352941|      null|     null|      null|     null|  null|
|       Glassell Park|       232| 0.13016096131896554|   540.0| 735981.5588235294|      null|     null|      null|     null|  null|
|           Hollywood|       165| 0.17064889600606062|    18.0| 847758.70588

In [54]:
print(f'There are {df_merged.count()} neighborhoods in the dataset')

There are 224 neighborhoods in the dataset


### MapReduce the PySpark DataFrame into a Dictionary

This will be used as the data source for the creation of the jsons to be uploaded

In [14]:
# Convert from PySpark dataframe to json
json_data = df_merged.toJSON().collect()

In [15]:
# Read the json creating elements by neighborhood
list_data = [json.loads(json_data[i]) for i in range(len(json_data))]

In [16]:
# Extract the names of all neighborhoods and the features created
all_neighborhoods = [x.Neighborhood for x in df_merged.select('Neighborhood').distinct().collect()]
all_features = df_merged.columns[1:]

In [86]:
# First create a dict with index 1 being neighborhood and index 2 being each feature, with the values as None
# This is necessary to ensure all Neihgborhoods have all keys, which prevents problems down the line
dict_by_neighborhood = {}
for neigh in all_neighborhoods:
    dict_by_neighborhood[neigh] = {all_features[0]: None,
                                   all_features[1]: None,
                                   all_features[2]: None,
                                   all_features[3]: None,
                                   all_features[4]: None,
                                   all_features[5]: None,
                                   all_features[6]: None,
                                   all_features[7]: None,
                                   all_features[8]: None}        

In [87]:
# Now fill each [Neighborhood][Feature] with the values existing in the data
# Missing values will remain as 'None'
for i in range(len(list_data)):
    for feat in all_features:      
        try:
            dict_by_neighborhood[list_data[i]['Neighborhood']][feat] = list_data[i][feat]
        except:
            pass

In [88]:
dict_by_neighborhood

{'Harvard Heights': {'CrimeCount': None,
  'CrimeScore': None,
  'SizeRank': 413.0,
  'HousingScore': 681784.0735294118,
  'AvgScrRead': None,
  'AvgScrEng': None,
  'AvgScrMath': None,
  'AvgScrSci': None,
  'AvgACT': None},
 'Mar Vista': {'CrimeCount': 1364,
  'CrimeScore': 0.1655460595469206,
  'SizeRank': 238.0,
  'HousingScore': 1330032.9411764706,
  'AvgScrRead': None,
  'AvgScrEng': None,
  'AvgScrMath': None,
  'AvgScrSci': None,
  'AvgACT': None},
 'West Hills': {'CrimeCount': 793,
  'CrimeScore': 0.1577881623682219,
  'SizeRank': 288.0,
  'HousingScore': 662252.3382352941,
  'AvgScrRead': None,
  'AvgScrEng': None,
  'AvgScrMath': None,
  'AvgScrSci': None,
  'AvgACT': None},
 'Glassell Park': {'CrimeCount': 232,
  'CrimeScore': 0.13016096131896554,
  'SizeRank': 540.0,
  'HousingScore': 735981.5588235294,
  'AvgScrRead': None,
  'AvgScrEng': None,
  'AvgScrMath': None,
  'AvgScrSci': None,
  'AvgACT': None},
 'Hollywood': {'CrimeCount': 165,
  'CrimeScore': 0.170648896006060

'# Convert the PySpark Distributed Dataframe to a list of dictionaries to the uploaded to firebase
dict_by_neighborhood = df_merged.rdd.map(lambda x: {x['Neighborhood']: {'CrimeCount': x['CrimeCount'], 
                                                                        'CrimeScore': x['CrimeScore'],
                                                                        'SizeRank': x['SizeRank'],
                                                                        'HousingScore': x['HousingScore'],
                                                                        'AvgScrRead': x['AvgScrRead'],
                                                                        'AvgScrEng': x['AvgScrEng'],
                                                                        'AvgScrMath': x['AvgScrMath'],
                                                                        'AvgScrSci': x['AvgScrSci'],
                                                                        'AvgACT': x['AvgACT']}}).collect()

### Create the Dictionaries to be Uploaded to Firebase

In [65]:
# Create dataset with all data for each neighborhood 
final_neighborhood = {'NeighborhoodData': dict_by_neighborhood}

In [66]:
# Create a dictionary with CrimeCount per Neighborhood
CrimeCount_dict = {i:dict_by_neighborhood[i]['CrimeCount'] for i in dict_by_neighborhood}
final_CrimeCount = {'CrimeCount': CrimeCount_dict}

In [67]:
# Create a dictionary with CrimeScore per Neighborhood
CrimeScore_dict = {i:dict_by_neighborhood[i]['CrimeScore'] for i in dict_by_neighborhood}
final_CrimeScore = {'CrimeScore': CrimeScore_dict}

In [68]:
# Create a dictionary with SizeRank per Neighborhood
SizeRank_dict = {i:dict_by_neighborhood[i]['SizeRank'] for i in dict_by_neighborhood}
final_SizeRank = {'SizeRank': SizeRank_dict}

In [69]:
# Create a dictionary with HousingScore per Neighborhood
HousingScore_dict = {i:dict_by_neighborhood[i]['HousingScore'] for i in dict_by_neighborhood}
final_HousingScore = {'HousingScore': HousingScore_dict}

In [70]:
# Create a dictionary with AvgScrRead per Neighborhood
AvgScrRead_dict = {i:dict_by_neighborhood[i]['AvgScrRead'] for i in dict_by_neighborhood}
final_AvgScrRead = {'AvgScrRead': AvgScrRead_dict}

In [71]:
# Create a dictionary with AvgScrEng per Neighborhood
AvgScrEng_dict = {i:dict_by_neighborhood[i]['AvgScrEng'] for i in dict_by_neighborhood}
final_AvgScrEng = {'AvgScrEng': AvgScrEng_dict}

In [72]:
# Create a dictionary with AvgScrMath per Neighborhood
AvgScrMath_dict = {i:dict_by_neighborhood[i]['AvgScrMath'] for i in dict_by_neighborhood}
final_AvgScrMath = {'AvgScrMath': AvgScrMath_dict}

In [73]:
# Create a dictionary with AvgScrSci per Neighborhood
AvgScrSci_dict = {i:dict_by_neighborhood[i]['AvgScrSci'] for i in dict_by_neighborhood}
final_AvgScrSci = {'AvgScrSci': AvgScrSci_dict}

In [76]:
# Create a dictionary with AvgACT per Neighborhood
AvgACT_dict = {i:dict_by_neighborhood[i]['AvgACT'] for i in dict_by_neighborhood}
final_AvgACT = {'AvgACT': AvgACT_dict}

### Upload Data to Firebase

In [29]:
# Patch the Neighborhood Data
patch_neighborhood = requests.patch(database_url, data=json.dumps(final_neighborhood))
print(f'Patching Neighborhood Data: {patch_neighborhood.reason}')

Patching Neighborhood Data: OK


In [30]:
# Patch the CrimeCount Dictionary
patch_crimecount = requests.patch(database_url, data=json.dumps(final_CrimeCount))
print(f'Patching CrimeCount Data: {patch_crimecount.reason}')

Patching CrimeCount Data: OK


In [31]:
# Patch the CrimeScore Dictionary
patch_crimescore = requests.patch(database_url, data=json.dumps(final_CrimeScore))
print(f'Patching CrimeScore Data: {patch_crimescore.reason}')

Patching CrimeScore Data: OK


In [32]:
# Patch the SizeRank Data
patch_sizerank = requests.patch(database_url, data=json.dumps(final_SizeRank))
print(f'Patching SizeRank Data: {patch_sizerank.reason}')

Patching SizeRank Data: OK


In [33]:
# Patch the HousingScore Dictionary
patch_housingscore = requests.patch(database_url, data=json.dumps(final_HousingScore))
print(f'Patching HousingScore Data: {patch_housingscore.reason}')

Patching HousingScore Data: OK


In [34]:
# Patch the AvgScrRead Dictionary
patch_avgscrread = requests.patch(database_url, data=json.dumps(final_AvgScrRead))
print(f'Patching AvgScrRead Data: {patch_avgscrread.reason}')

Patching AvgScrRead Data: OK


In [35]:
# Patch the AvgScrEng Dictionary
patch_avgscreng = requests.patch(database_url, data=json.dumps(final_AvgScrEng))
print(f'Patching AvgScrEng Data: {patch_avgscreng.reason}')

Patching AvgScrEng Data: OK


In [36]:
# Patch the AvgScrMath Dictionary
patch_avgscrmath = requests.patch(database_url, data=json.dumps(final_AvgScrMath))
print(f'Patching AvgScrMath Data: {patch_avgscrmath.reason}')

Patching AvgScrMath Data: OK


In [37]:
# Patch the AvgScrSci Dictionary
patch_avgscrsci = requests.patch(database_url, data=json.dumps(final_AvgScrSci))
print(f'Patching AvgScrSci Data: {patch_avgscrsci.reason}')

Patching AvgScrSci Data: OK


In [38]:
# Patch the AvgACT Dictionary
patch_avgact = requests.patch(database_url, data=json.dumps(final_AvgACT))
print(f'Patching AvgACT Data: {patch_avgact.reason}')

Patching AvgACT Data: OK
