### Missing Observations
The simplest way to deal with missing values, if your data can afford it, is to drop the whole observation when any missing value is found. You have to be careful not to drop too many: depending on the distribution of the missing values across your dataset it might severely affect the usability of your dataset. If, after dropping the rows, I end up with a very small dataset, or find that the reduction in data size is more than 50%, I start checking my data to see what features have the most holes in them and perhaps exclude those altogether; if a feature has most of its values missing (unless a missing value bears a meaning), from a modeling point of view, it is fairly useless.


In [1]:
from pyspark.sql import SparkSession

spark = SparkSession.builder \
        .appName("comm") \
        .getOrCreate()

In [12]:
df = spark.read.csv("join_df.csv",inferSchema=True, header=True)

In [13]:
df.printSchema()

root
 |-- Zip: integer (nullable = true)
 |-- ComplaintID: integer (nullable = true)
 |-- ProblemID: integer (nullable = true)
 |-- SpaceType: string (nullable = true)
 |-- TypeID: integer (nullable = true)
 |-- Type: string (nullable = true)
 |-- MajorCategoryID: integer (nullable = true)
 |-- MajorCategory: string (nullable = true)
 |-- MinorCategoryID: integer (nullable = true)
 |-- MinorCategory: string (nullable = true)
 |-- CodeID: integer (nullable = true)
 |-- Code: string (nullable = true)
 |-- StatusDate: string (nullable = true)
 |-- ReceivedDate: string (nullable = true)
 |-- closeTime: double (nullable = true)
 |-- All Households: double (nullable = true)
 |-- Families: double (nullable = true)
 |-- Families with Children: double (nullable = true)
 |-- Families without Children: double (nullable = true)



In [14]:
df.show(3)

+-----+-----------+---------+----------------+------+---------+---------------+-------------+---------------+-------------+------+--------------------+----------+------------+---------+--------------+--------+----------------------+-------------------------+
|  Zip|ComplaintID|ProblemID|       SpaceType|TypeID|     Type|MajorCategoryID|MajorCategory|MinorCategoryID|MinorCategory|CodeID|                Code|StatusDate|ReceivedDate|closeTime|All Households|Families|Families with Children|Families without Children|
+-----+-----------+---------+----------------+------+---------+---------------+-------------+---------------+-------------+------+--------------------+----------+------------+---------+--------------+--------+----------------------+-------------------------+
|11432|    2397487|  3768602|ENTIRE APARTMENT|     2|HAZARDOUS|             13|     NONCONST|            106|       VERMIN|   886|             ROACHES|08/12/2004|  06/20/2004|     53.0|       62148.0| 67198.0|              

# To find the number of missing observations per row, we can use the following snippet: 
df_miss.rdd.map( lambda row: (row['id'], sum([c == None for c in row])) ).collect()

In [15]:
df.rdd.map(lambda row: (row['ComplaintID'], sum([c==None for c in row]))).take(5) # .count()

[(2397487, 0), (2397487, 0), (2397487, 0), (2397487, 0), (2397487, 0)]

In [16]:
# sql, make dataframe accessible
df.createOrReplaceTempView('zip')

# this selects the 'Zip' column from the dataframe and saves it to the 'sqlDF' variable
zipc = spark.sql("SELECT * FROM zip") #.show(1)

# .select() goes into the 'address' column (the only column in this reduced dataframe) and then selects the city from each row of this column
zipcode = zipc.select('zip.Zip') #.show()

# now that we have the cities isolated, we can return only the rows of the dataframe where the city is 'Houston'
#houston = spark.sql("SELECT * FROM addr where address.city like '%Houston%'") #.show()

# also we need to use <<== 'Houston'>> instead of <<like '%[]%'>> to account for "South Houston"
bedstuy = spark.sql("SELECT * FROM zip where Zip == '11216'") #.show()
#houston.show()
bedstuy.count()

3793

In [17]:
df.filter((df.ComplaintID == '2397487')).show()

+-----+-----------+---------+----------------+------+---------+---------------+-------------+---------------+-------------+------+--------------------+----------+------------+---------+--------------+--------+----------------------+-------------------------+
|  Zip|ComplaintID|ProblemID|       SpaceType|TypeID|     Type|MajorCategoryID|MajorCategory|MinorCategoryID|MinorCategory|CodeID|                Code|StatusDate|ReceivedDate|closeTime|All Households|Families|Families with Children|Families without Children|
+-----+-----------+---------+----------------+------+---------+---------------+-------------+---------------+-------------+------+--------------------+----------+------------+---------+--------------+--------+----------------------+-------------------------+
|11432|    2397487|  3768602|ENTIRE APARTMENT|     2|HAZARDOUS|             13|     NONCONST|            106|       VERMIN|   886|             ROACHES|08/12/2004|  06/20/2004|     53.0|       62148.0| 67198.0|              

# Let's now check what percentage of missing observations are there in each column: 

In [18]:
import pyspark.sql.functions as fn


df.agg(*[ (1 - (fn.count(c) / fn.count('*'))).alias(c + '_missing') for c in df.columns ]).show()


+-----------+-------------------+-----------------+-----------------+--------------+------------+-----------------------+---------------------+-----------------------+---------------------+--------------+------------+--------------------+--------------------+--------------------+----------------------+----------------+------------------------------+---------------------------------+
|Zip_missing|ComplaintID_missing|ProblemID_missing|SpaceType_missing|TypeID_missing|Type_missing|MajorCategoryID_missing|MajorCategory_missing|MinorCategoryID_missing|MinorCategory_missing|CodeID_missing|Code_missing|  StatusDate_missing|ReceivedDate_missing|   closeTime_missing|All Households_missing|Families_missing|Families with Children_missing|Families without Children_missing|
+-----------+-------------------+-----------------+-----------------+--------------+------------+-----------------------+---------------------+-----------------------+---------------------+--------------+------------+-----------

The * argument to the .count(...) method (in place of a column name) instructs the method to count all rows. On the other hand, the * preceding the list declaration instructs the .agg(...) method to treat the list as a set of separate parameters passed to the function.

### Only a few columns have missing data:
StatusDate_missing <br>
closeTime_missing <br>
Families with Children_missing

### Idea #1
What this tells us is that we have some income data missing regarding Families with Children. <br>
One solution could be taking the average of the "All Households", "Families", and "Families without Children" columns 


### Idea #2 
We also have some StatusDate information missing which is resulting in some closeTime data being missing (since closeTime is based on StatusDate. An empty cell for StatusDate means that the the case has yet to be closed. <br>
One solution to this could be replacing all missing values in StatusDate with the current date and then repeating that function everytime the dataframe is read in so that a complaint that is really open isn't saved with whatever the last date where the dataset was read in. 

... 

However, since the percentages ae so small, it might be best to just drop these specific rows from the dataset.



In [19]:
import pandas as pd 

df_pd = df.toPandas()
df_pd

Unnamed: 0,Zip,ComplaintID,ProblemID,SpaceType,TypeID,Type,MajorCategoryID,MajorCategory,MinorCategoryID,MinorCategory,CodeID,Code,StatusDate,ReceivedDate,closeTime,All Households,Families,Families with Children,Families without Children
0,11432,2397487,3768602,ENTIRE APARTMENT,2,HAZARDOUS,13,NONCONST,106,VERMIN,886,ROACHES,08/12/2004,06/20/2004,53.0,62148.0,67198.0,55290.0,79468.0
1,11432,2397487,3768603,OTHER,2,HAZARDOUS,13,NONCONST,106,VERMIN,884,MICE,08/12/2004,06/20/2004,53.0,62148.0,67198.0,55290.0,79468.0
2,11432,2397487,3768604,ENTIRE BUILDING,2,HAZARDOUS,28,PAINT/PLASTER,198,WALL,1400,PAINT DIRTY AND UNSANITARY,08/12/2004,06/20/2004,53.0,62148.0,67198.0,55290.0,79468.0
3,11432,2397487,3768605,OTHER,1,EMERGENCY,13,NONCONST,101,RUBBISH,1309,OTHER,08/12/2004,06/20/2004,53.0,62148.0,67198.0,55290.0,79468.0
4,11432,2397487,3768606,BATHROOM,1,EMERGENCY,9,PLUMBING,68,WATER SUPPLY,1282,OTHER,08/12/2004,06/20/2004,53.0,62148.0,67198.0,55290.0,79468.0
...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
275523,10460,9076230,18593101,ENTRANCE/FOYER,1,EMERGENCY,56,DOOR/WINDOW,333,DOOR,2666,NEEDS KEY FOR EXIT,06/10/2018,05/31/2018,10.0,27545.0,33282.0,25665.0,44649.0
275524,10460,9076230,18593102,ENTIRE APARTMENT,1,EMERGENCY,10,ELECTRIC,71,WIRING,2461,ILLEGAL,06/10/2018,05/31/2018,10.0,27545.0,33282.0,25665.0,44649.0
275525,10460,9076230,18593103,KITCHEN,3,NON EMERGENCY,8,APPLIANCE,59,ELECTRIC/GAS RANGE,2447,PILOT LIGHT OUT,06/10/2018,05/31/2018,10.0,27545.0,33282.0,25665.0,44649.0
275526,10460,9076230,18593104,ENTIRE APARTMENT,3,NON EMERGENCY,56,DOOR/WINDOW,337,WINDOW FRAME,2672,WINDOW STUCK CLOSED OR OPEN,06/10/2018,05/31/2018,10.0,27545.0,33282.0,25665.0,44649.0


In [20]:
# today's date

import datetime as dt     

today = dt.datetime.today().strftime("%m/%d/%Y")
today

'04/16/2021'

In [22]:
# list comprehsion to return the complaints that have not been closed
# the entry for StatusDate will not have a date inside it, it will be 'nan'
# [x for x in join_df['StatusDate'] if type(x) != str]


# then use str(x).replace(str(x),today) to add the 
[str(x).replace(str(x),today) for x in df_pd['StatusDate'] if type(x) != str]

['04/16/2021',
 '04/16/2021',
 '04/16/2021',
 '04/16/2021',
 '04/16/2021',
 '04/16/2021',
 '04/16/2021',
 '04/16/2021',
 '04/16/2021',
 '04/16/2021',
 '04/16/2021',
 '04/16/2021',
 '04/16/2021',
 '04/16/2021',
 '04/16/2021',
 '04/16/2021',
 '04/16/2021',
 '04/16/2021',
 '04/16/2021',
 '04/16/2021',
 '04/16/2021',
 '04/16/2021',
 '04/16/2021',
 '04/16/2021',
 '04/16/2021']

In [23]:
for row in df_pd['StatusDate']:
    if type(row) != str:
        #print(str(row).replace(str(row),today))
        row = str(row).replace(str(row),today)
        print(row)
        df_pd.loc[row,'StatusDate'] = row

04/16/2021
04/16/2021
04/16/2021
04/16/2021
04/16/2021
04/16/2021
04/16/2021
04/16/2021
04/16/2021
04/16/2021
04/16/2021
04/16/2021
04/16/2021
04/16/2021
04/16/2021
04/16/2021
04/16/2021
04/16/2021
04/16/2021
04/16/2021
04/16/2021
04/16/2021
04/16/2021
04/16/2021
04/16/2021


In [24]:
df_pd['StatusDate'][0]

'08/12/2004'

In [35]:
#[x for x in join_df['StatusDate'] if type(x) != str]

for row in range(len(df_pd['StatusDate'])-1):
    if type(df_pd['StatusDate'][row]) != str:
        #df_pd['StatusDate'][row] = \
        df_pd.iloc[row,'StatusDate'] = \
        str(df_pd['StatusDate'][row]).\
        replace(str(df_pd['StatusDate'][row]),today)
        
        
        #print(str(row).replace(str(row),today))
        #join_df.iloc[row,'StatusDate'] = \
        

In [62]:
# join_df.iloc[5:25,:]

In [38]:
[x for x in df_pd['StatusDate'] if type(x) != str]

# nice, we did it 

[]

In [42]:
# now export it 
df_pd.to_csv("join_df_Mod4_April_16.csv", index=False)

# Now read it back in and move on 

In [71]:

# now bring it back thru pyspark
df = spark.read.csv("join_df_Mod4_April_16.csv",inferSchema=True, header=True)

In [72]:
df.show(5)

+-------+-----------+---------+----------------+------+---------+---------------+-------------+---------------+-------------+------+--------------------+----------+------------+---------+--------------+--------+----------------------+-------------------------+
|    Zip|ComplaintID|ProblemID|       SpaceType|TypeID|     Type|MajorCategoryID|MajorCategory|MinorCategoryID|MinorCategory|CodeID|                Code|StatusDate|ReceivedDate|closeTime|All Households|Families|Families with Children|Families without Children|
+-------+-----------+---------+----------------+------+---------+---------------+-------------+---------------+-------------+------+--------------------+----------+------------+---------+--------------+--------+----------------------+-------------------------+
|11432.0|  2397487.0|3768602.0|ENTIRE APARTMENT|   2.0|HAZARDOUS|           13.0|     NONCONST|          106.0|       VERMIN| 886.0|             ROACHES|08/12/2004|  06/20/2004|     53.0|       62148.0| 67198.0|      

# Editing the time stamps 

In [73]:
from pyspark.sql.functions import unix_timestamp

# preprocessing: convert dates to datetime format and calculate response variable - closing time
timeDiff = (unix_timestamp('StatusDate', 'MM/dd/yyyy') - unix_timestamp('ReceivedDate', 'MM/dd/yyyy')) / 86400 #seconds per day
timeDiff = timeDiff.cast(IntegerType())

# add closing time (time for complaint to be resolved) to dataframe
df = df.withColumn("closeTime", timeDiff)
df.show(5)
#Reference: https://stackoverflow.com/questions/47701339/subtracting-two-date-columns-in-pyspark-python

+-------+-----------+---------+----------------+------+---------+---------------+-------------+---------------+-------------+------+--------------------+----------+------------+---------+--------------+--------+----------------------+-------------------------+
|    Zip|ComplaintID|ProblemID|       SpaceType|TypeID|     Type|MajorCategoryID|MajorCategory|MinorCategoryID|MinorCategory|CodeID|                Code|StatusDate|ReceivedDate|closeTime|All Households|Families|Families with Children|Families without Children|
+-------+-----------+---------+----------------+------+---------+---------------+-------------+---------------+-------------+------+--------------------+----------+------------+---------+--------------+--------+----------------------+-------------------------+
|11432.0|  2397487.0|3768602.0|ENTIRE APARTMENT|   2.0|HAZARDOUS|           13.0|     NONCONST|          106.0|       VERMIN| 886.0|             ROACHES|08/12/2004|  06/20/2004|       53|       62148.0| 67198.0|      

In [74]:
# I also need to change the contents of the Zip column in 'df' to be integers again instead of floating point numbers
# for some reason that didn't save from join_income_opendatanyc.ipynb

# https://sparkbyexamples.com/pyspark/pyspark-cast-column-type/
# https://sparkbyexamples.com/pyspark/pyspark-rename-dataframe-column/

from pyspark.sql.types import IntegerType

df = df.withColumn("Zip", df.Zip.cast(IntegerType()))

In [75]:
df.show(5)

+-----+-----------+---------+----------------+------+---------+---------------+-------------+---------------+-------------+------+--------------------+----------+------------+---------+--------------+--------+----------------------+-------------------------+
|  Zip|ComplaintID|ProblemID|       SpaceType|TypeID|     Type|MajorCategoryID|MajorCategory|MinorCategoryID|MinorCategory|CodeID|                Code|StatusDate|ReceivedDate|closeTime|All Households|Families|Families with Children|Families without Children|
+-----+-----------+---------+----------------+------+---------+---------------+-------------+---------------+-------------+------+--------------------+----------+------------+---------+--------------+--------+----------------------+-------------------------+
|11432|  2397487.0|3768602.0|ENTIRE APARTMENT|   2.0|HAZARDOUS|           13.0|     NONCONST|          106.0|       VERMIN| 886.0|             ROACHES|08/12/2004|  06/20/2004|       53|       62148.0| 67198.0|              

In [76]:
# double check to make sure nothing is missing 

import pyspark.sql.functions as fn

df.agg(*[ (1 - (fn.count(c) / fn.count('*'))).alias(c + '_missing') for c in df.columns ]).show()


+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+-----------------------+---------------------+-----------------------+---------------------+--------------------+--------------------+------------------+--------------------+--------------------+----------------------+--------------------+------------------------------+---------------------------------+
|         Zip_missing| ComplaintID_missing|   ProblemID_missing|   SpaceType_missing|      TypeID_missing|        Type_missing|MajorCategoryID_missing|MajorCategory_missing|MinorCategoryID_missing|MinorCategory_missing|      CodeID_missing|        Code_missing|StatusDate_missing|ReceivedDate_missing|   closeTime_missing|All Households_missing|    Families_missing|Families with Children_missing|Families without Children_missing|
+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+---------

In [54]:
df.agg(*[ (1 - (fn.count(c) / fn.count('*'))).alias(c) for c in df.columns ]).show()

+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+----------+--------------------+--------------------+--------------------+--------------------+----------------------+-------------------------+
|                 Zip|         ComplaintID|           ProblemID|           SpaceType|              TypeID|                Type|     MajorCategoryID|       MajorCategory|     MinorCategoryID|       MinorCategory|              CodeID|                Code|StatusDate|        ReceivedDate|           closeTime|      All Households|            Families|Families with Children|Families without Children|
+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------

# oh god what have i done

can i just drop null and move on?

In [95]:
# Create view so we can query using SQL
df.createOrReplaceTempView("df_view")

spark.sql("select * from df_view where MajorCategory is Null").show()

+----+-----------+---------+---------+------+----+---------------+-------------+---------------+-------------+------+----+----------+------------+---------+--------------+--------+----------------------+-------------------------+
| Zip|ComplaintID|ProblemID|SpaceType|TypeID|Type|MajorCategoryID|MajorCategory|MinorCategoryID|MinorCategory|CodeID|Code|StatusDate|ReceivedDate|closeTime|All Households|Families|Families with Children|Families without Children|
+----+-----------+---------+---------+------+----+---------------+-------------+---------------+-------------+------+----+----------+------------+---------+--------------+--------+----------------------+-------------------------+
|null|       null|     null|     null|  null|null|           null|         null|           null|         null|  null|null|04/16/2021|        null|     null|          null|    null|                  null|                     null|
+----+-----------+---------+---------+------+----+---------------+-------------+

In [104]:
# https://sparkbyexamples.com/pyspark/pyspark-drop-rows-with-null-values/#:~:text=In%20order%20to%20remove%20Rows,NULL%20values%20to%20delete%20rows.

df = df.na.drop()

In [105]:
# Create view so we can query using SQL
df.createOrReplaceTempView("df_view")

spark.sql("select * from df_view where MajorCategory is Null").show()

+---+-----------+---------+---------+------+----+---------------+-------------+---------------+-------------+------+----+----------+------------+---------+--------------+--------+----------------------+-------------------------+
|Zip|ComplaintID|ProblemID|SpaceType|TypeID|Type|MajorCategoryID|MajorCategory|MinorCategoryID|MinorCategory|CodeID|Code|StatusDate|ReceivedDate|closeTime|All Households|Families|Families with Children|Families without Children|
+---+-----------+---------+---------+------+----+---------------+-------------+---------------+-------------+------+----+----------+------------+---------+--------------+--------+----------------------+-------------------------+
+---+-----------+---------+---------+------+----+---------------+-------------+---------------+-------------+------+----+----------+------------+---------+--------------+--------+----------------------+-------------------------+



In [106]:
df.agg(*[ (1 - (fn.count(c) / fn.count('*'))).alias(c) for c in df.columns ]).show()

+---+-----------+---------+---------+------+----+---------------+-------------+---------------+-------------+------+----+----------+------------+---------+--------------+--------+----------------------+-------------------------+
|Zip|ComplaintID|ProblemID|SpaceType|TypeID|Type|MajorCategoryID|MajorCategory|MinorCategoryID|MinorCategory|CodeID|Code|StatusDate|ReceivedDate|closeTime|All Households|Families|Families with Children|Families without Children|
+---+-----------+---------+---------+------+----+---------------+-------------+---------------+-------------+------+----+----------+------------+---------+--------------+--------+----------------------+-------------------------+
|0.0|        0.0|      0.0|      0.0|   0.0| 0.0|            0.0|          0.0|            0.0|          0.0|   0.0| 0.0|       0.0|         0.0|      0.0|           0.0|     0.0|                   0.0|                      0.0|
+---+-----------+---------+---------+------+----+---------------+-------------+-----

# YES! FINALLY!

In [107]:
# OK NOW EXPORT AND MOVE ON 
df.toPandas().to_csv("join_df_Mod4_April_16_edit1.csv", index=False)

# More exploratory data analysis (specifically for the numerical columns)

In [108]:
numerical = ['closeTime','All Households','Families','Families with Children','Families without Children']
desc = df.describe(numerical)
desc.show()

+-------+------------------+-----------------+-----------------+----------------------+-------------------------+
|summary|         closeTime|   All Households|         Families|Families with Children|Families without Children|
+-------+------------------+-----------------+-----------------+----------------------+-------------------------+
|  count|            275323|           275323|           275323|                275323|                   275323|
|   mean|13.838909208456975|54599.59264936093|66023.01643160942|    62668.236536722325|        70307.02262070368|
| stddev| 42.05641735917821| 24634.0091019443|39975.16494195123|     48919.94228814118|        33756.22433021968|
|    min|            -168.0|          21447.0|          25429.0|               20114.0|                  29907.0|
|    max|            3372.0|         250001.0|         250001.0|              250001.0|                 250001.0|
+-------+------------------+-----------------+-----------------+----------------------+-

# Note:
Ah. The Income columns do not contain numbers, they contain strings representing the values of income. We should change that so we can run some analysis on this.

# Correlations
Calculating correlations in PySpark is very easy once your data is in a DataFrame form. The only difficulties are that the .corr(...) method supports the Pearson correlation coefficient at the moment, and it can only calculate pairwise correlations, such as the following: fraud_df.corr('balance', 'numTrans') 

In order to create a correlations matrix, you can use the following script: 
n_numerical = len(numerical)
corr = []
for i in range(0, n_numerical):
    temp = [None] * i
    for j in range(i, n_numerical):
        temp.append(fraud_df.corr(numerical[i], numerical[j]))
    corr.append(temp)

*Note: click into this cell to see the formatting*

# Histograms
Histograms are by far the easiest way to visually gauge the distribution of your features. There are three ways you can generate histograms in PySpark (or a Jupyter notebook):
1. Aggregate the data in workers and return an aggregated list of bins and counts in each bin of the histogram to the driver
2. Return all the data points to the driver and allow the plotting libraries' methods to do the job for you
3. Sample your data and then return them to the driver for plotting.

hists = fraud_df.select('balance').rdd.flatMap(
lambda row: row
).histogram(20)


In a similar manner, a histogram can be created with Bokeh:
b_hist = chrt.Bar(
Data,
values='freq', label='bins',
title='Histogram of \'balance\'')
chrt.show(b_hist)

Since Bokeh uses D3.js in the background, the resulting chart is interactive


In [109]:
# hists = df.select('closeTime').rdd.flatMap(lambda row: row).histogram(20)

In [110]:
# import numpy as np
# import matplotlib.pyplot as plt

# x = df['closeTime']
# bins = np.arange(0, 100, 5.0)

# plt.figure(figsize=(10,8))
# # the histogram of the data
# plt.hist(x, bins, alpha=0.8, histtype='bar', color='gold',ec='black',weights=np.zeros_like(x) + 100. / x.size)

# plt.xlabel('closeTime')
# plt.ylabel('percentage')
# plt.xticks(bins)
# plt.show()

# fig.savefig('closeTime'+".pdf", bbox_inches='tight')

well that didn't go as planned...

# Now going through data_preprocessing.ipynb

In [111]:
import os
import pyspark.sql.types as typ
import pyspark.sql.functions as F

In [112]:
from pyspark.sql import SparkSession

spark = SparkSession \
    .builder \
    .master("local") \
    .appName("data preprocessing") \
    .config("spark.executor.memory", '8g') \
    .config('spark.executor.cores', '4') \
    .config('spark.cores.max', '4') \
    .config("spark.driver.memory",'8g') \
    .getOrCreate()

sc = spark.sparkContext

In [113]:
sc.getConf().getAll()

[('spark.driver.host', 'udc-aw29-24a'),
 ('spark.rdd.compress', 'True'),
 ('spark.app.id', 'local-1618569865754'),
 ('spark.serializer.objectStreamReset', '100'),
 ('spark.app.name', 'comm'),
 ('spark.master', 'local[*]'),
 ('spark.submit.pyFiles', ''),
 ('spark.executor.id', 'driver'),
 ('spark.submit.deployMode', 'client'),
 ('spark.ui.showConsoleProgress', 'true'),
 ('spark.driver.port', '38694')]

In [114]:
data_file = "join_df_Mod4_April_16_edit1.csv"

## NOTE:
You can read this data into a Spark Dataframe by issuing: <br>
fraud = spark.read.csv(data_file, header='true', inferSchema='true')

You can also read it in as an RDD and convert to Spark Dataframe using textFile()

The first approach is recommended here since it's easier, but I walk you through the second approach as it illustrates more functionality. <br>
There may be times when it makes sense to work with the RDD first and then convert to Dataframe.

### Read in data as RDD:

In [115]:
# read in data
df = sc.textFile(data_file)

In [116]:
# check the data type...it's pyspark.rdd.RDD, and we know that because we called textFile() to read the csv in as a RDD
type(df)

pyspark.rdd.RDD

In [117]:
# take the first 5 records
df.take(5)

['Zip,ComplaintID,ProblemID,SpaceType,TypeID,Type,MajorCategoryID,MajorCategory,MinorCategoryID,MinorCategory,CodeID,Code,StatusDate,ReceivedDate,closeTime,All Households,Families,Families with Children,Families without Children',
 '11432.0,2397487.0,3768602.0,ENTIRE APARTMENT,2.0,HAZARDOUS,13.0,NONCONST,106.0,VERMIN,886.0,ROACHES,08/12/2004,06/20/2004,53.0,62148.0,67198.0,55290.0,79468.0',
 '11432.0,2397487.0,3768603.0,OTHER,2.0,HAZARDOUS,13.0,NONCONST,106.0,VERMIN,884.0,MICE,08/12/2004,06/20/2004,53.0,62148.0,67198.0,55290.0,79468.0',
 '11432.0,2397487.0,3768604.0,ENTIRE BUILDING,2.0,HAZARDOUS,28.0,PAINT/PLASTER,198.0,WALL,1400.0,PAINT DIRTY AND UNSANITARY,08/12/2004,06/20/2004,53.0,62148.0,67198.0,55290.0,79468.0',
 '11432.0,2397487.0,3768605.0,OTHER,1.0,EMERGENCY,13.0,NONCONST,101.0,RUBBISH,1309.0,OTHER,08/12/2004,06/20/2004,53.0,62148.0,67198.0,55290.0,79468.0']

In [118]:
# save the first record, which contains the header
header = df.first()

In [119]:
header

'Zip,ComplaintID,ProblemID,SpaceType,TypeID,Type,MajorCategoryID,MajorCategory,MinorCategoryID,MinorCategory,CodeID,Code,StatusDate,ReceivedDate,closeTime,All Households,Families,Families with Children,Families without Children'

In [120]:
# get a record count
n_rec = df.count()
n_rec

275324

In [121]:
# store the comma-separated data in an RDD, skipping the header row. 
df = df.filter(lambda row: row != header).map(lambda row: [elem for elem in row.split(',')])

In [123]:
df.take(2)

[['11432.0',
  '2397487.0',
  '3768602.0',
  'ENTIRE APARTMENT',
  '2.0',
  'HAZARDOUS',
  '13.0',
  'NONCONST',
  '106.0',
  'VERMIN',
  '886.0',
  'ROACHES',
  '08/12/2004',
  '06/20/2004',
  '53.0',
  '62148.0',
  '67198.0',
  '55290.0',
  '79468.0'],
 ['11432.0',
  '2397487.0',
  '3768603.0',
  'OTHER',
  '2.0',
  'HAZARDOUS',
  '13.0',
  'NONCONST',
  '106.0',
  'VERMIN',
  '884.0',
  'MICE',
  '08/12/2004',
  '06/20/2004',
  '53.0',
  '62148.0',
  '67198.0',
  '55290.0',
  '79468.0']]

# RDDs don't make sense here...I'm just going to import as DataFrame and move on with my life 

In [124]:
df = spark.read.csv("join_df_Mod4_April_16_edit1.csv",inferSchema=True, header=True)

In [125]:
df.printSchema()

root
 |-- Zip: double (nullable = true)
 |-- ComplaintID: double (nullable = true)
 |-- ProblemID: double (nullable = true)
 |-- SpaceType: string (nullable = true)
 |-- TypeID: double (nullable = true)
 |-- Type: string (nullable = true)
 |-- MajorCategoryID: double (nullable = true)
 |-- MajorCategory: string (nullable = true)
 |-- MinorCategoryID: double (nullable = true)
 |-- MinorCategory: string (nullable = true)
 |-- CodeID: double (nullable = true)
 |-- Code: string (nullable = true)
 |-- StatusDate: string (nullable = true)
 |-- ReceivedDate: string (nullable = true)
 |-- closeTime: double (nullable = true)
 |-- All Households: double (nullable = true)
 |-- Families: double (nullable = true)
 |-- Families with Children: double (nullable = true)
 |-- Families without Children: double (nullable = true)



In [126]:
df.show(5)

+-------+-----------+---------+----------------+------+---------+---------------+-------------+---------------+-------------+------+--------------------+----------+------------+---------+--------------+--------+----------------------+-------------------------+
|    Zip|ComplaintID|ProblemID|       SpaceType|TypeID|     Type|MajorCategoryID|MajorCategory|MinorCategoryID|MinorCategory|CodeID|                Code|StatusDate|ReceivedDate|closeTime|All Households|Families|Families with Children|Families without Children|
+-------+-----------+---------+----------------+------+---------+---------------+-------------+---------------+-------------+------+--------------------+----------+------------+---------+--------------+--------+----------------------+-------------------------+
|11432.0|  2397487.0|3768602.0|ENTIRE APARTMENT|   2.0|HAZARDOUS|           13.0|     NONCONST|          106.0|       VERMIN| 886.0|             ROACHES|08/12/2004|  06/20/2004|     53.0|       62148.0| 67198.0|      

In [127]:
# Create view so we can query using SQL
df.createOrReplaceTempView("df_view")

In [128]:
# Example of using SQL to count records
spark.sql("select count(*) from df_view").show()

+--------+
|count(1)|
+--------+
|  275323|
+--------+



In [129]:
spark.sql("select * from df_view where closeTime > 3000").show()

+-------+-----------+-----------+----------------+------+-------------+---------------+-------------+---------------+-------------+------+--------------------+----------+------------+---------+--------------+--------+----------------------+-------------------------+
|    Zip|ComplaintID|  ProblemID|       SpaceType|TypeID|         Type|MajorCategoryID|MajorCategory|MinorCategoryID|MinorCategory|CodeID|                Code|StatusDate|ReceivedDate|closeTime|All Households|Families|Families with Children|Families without Children|
+-------+-----------+-----------+----------------+------+-------------+---------------+-------------+---------------+-------------+------+--------------------+----------+------------+---------+--------------+--------+----------------------+-------------------------+
|10467.0|  3976078.0|  7481721.0|ENTIRE APARTMENT|   3.0|NON EMERGENCY|           28.0|PAINT/PLASTER|          198.0|         WALL|2527.0|             CRACKED|03/17/2016|  11/15/2007|   3045.0|      

### Groupby MajorCategory

In [130]:
df.groupby('MajorCategory').count().show()

+--------------------+-----+
|       MajorCategory|count|
+--------------------+-----+
|              SAFETY| 6963|
|            ELECTRIC|12613|
|         DOOR/WINDOW|19180|
|     FLOORING/STAIRS|12372|
|    OUTSIDE BUILDING|  872|
|UNSANITARY CONDITION|39395|
|           APPLIANCE| 6975|
|        CONSTRUCTION|    3|
|            ELEVATOR|  582|
|            NONCONST|   24|
|      HEAT/HOT WATER|92654|
|             GENERAL|12355|
|       PAINT/PLASTER|29641|
|            PLUMBING|25494|
|             HEATING|  244|
|          WATER LEAK|15956|
+--------------------+-----+



### Groupby Zipcode
https://hendra-herviawan.github.io/pyspark-groupby-and-aggregate-functions.html

In [132]:
# group by zipcode
df_zip = df.groupby('Zip').count()

# then order by the count for each zipcode, sort by desc to show the zipcodes with the highest complaint counts in the dataset
df_zip.orderBy(df_zip['count'].desc()).show()

+-------+-----+
|    Zip|count|
+-------+-----+
|11226.0|10137|
|10467.0| 8093|
|10458.0| 8036|
|10453.0| 7849|
|10457.0| 7430|
|10452.0| 7147|
|10468.0| 6939|
|10456.0| 6569|
|11213.0| 5839|
|10031.0| 5797|
|11225.0| 5488|
|10032.0| 5158|
|11212.0| 5091|
|11207.0| 4703|
|10460.0| 4603|
|11233.0| 4549|
|10472.0| 4162|
|10462.0| 4043|
|10033.0| 3989|
|11203.0| 3906|
+-------+-----+
only showing top 20 rows



In [133]:
# distinct row count
print('rows={}'.format(df.distinct().count()))

rows=275323


In [134]:
# row count
print('rows={}'.format(df.count()))

rows=275323


In [135]:
# for each field, compute missing percentage

df.agg(*[
    (1 - F.count(c) / F.count('*')).alias(c + '_miss')
    for c in df.columns
]).show()

+--------+----------------+--------------+--------------+-----------+---------+--------------------+------------------+--------------------+------------------+-----------+---------+---------------+-----------------+--------------+-------------------+-------------+---------------------------+------------------------------+
|Zip_miss|ComplaintID_miss|ProblemID_miss|SpaceType_miss|TypeID_miss|Type_miss|MajorCategoryID_miss|MajorCategory_miss|MinorCategoryID_miss|MinorCategory_miss|CodeID_miss|Code_miss|StatusDate_miss|ReceivedDate_miss|closeTime_miss|All Households_miss|Families_miss|Families with Children_miss|Families without Children_miss|
+--------+----------------+--------------+--------------+-----------+---------+--------------------+------------------+--------------------+------------------+-----------+---------+---------------+-----------------+--------------+-------------------+-------------+---------------------------+------------------------------+
|     0.0|             0.0| 

In [136]:
# mint

# Now working through classification_wisc_breast_cancer.ipynb

In [137]:
# load modules
from pyspark.sql import SparkSession
from pyspark.mllib.classification import LogisticRegressionWithLBFGS, LogisticRegressionModel
from pyspark.mllib.regression import LabeledPoint
from pyspark.ml.feature import VectorAssembler 
from pyspark.mllib.linalg import Vectors
from pyspark.sql.functions import col 
from pyspark.mllib.evaluation import MulticlassMetrics

import os

In [140]:
# param init (you will need to update data_path)
infile = 'join_df_Mod4_April_16_edit1.csv'

spark = SparkSession \
    .builder \
    .appName("Mod4") \
    .getOrCreate()

In [141]:
# read in data
df = spark.read.csv(infile, inferSchema=True, header = True)

In [142]:
df.show(5)

+-------+-----------+---------+----------------+------+---------+---------------+-------------+---------------+-------------+------+--------------------+----------+------------+---------+--------------+--------+----------------------+-------------------------+
|    Zip|ComplaintID|ProblemID|       SpaceType|TypeID|     Type|MajorCategoryID|MajorCategory|MinorCategoryID|MinorCategory|CodeID|                Code|StatusDate|ReceivedDate|closeTime|All Households|Families|Families with Children|Families without Children|
+-------+-----------+---------+----------------+------+---------+---------------+-------------+---------------+-------------+------+--------------------+----------+------------+---------+--------------+--------+----------------------+-------------------------+
|11432.0|  2397487.0|3768602.0|ENTIRE APARTMENT|   2.0|HAZARDOUS|           13.0|     NONCONST|          106.0|       VERMIN| 886.0|             ROACHES|08/12/2004|  06/20/2004|     53.0|       62148.0| 67198.0|      

# Option 1: Using StringIndexer on MajorCategory
https://spark.apache.org/docs/latest/api/python/reference/api/pyspark.ml.feature.StringIndexer.html
### StringIndexer
StringIndexer encodes a string column of labels to a column of label indices. StringIndexer can encode multiple columns. The indices are in [0, numLabels), and four ordering options are supported: “frequencyDesc”: descending order by label frequency (most frequent label assigned 0), “frequencyAsc”: ascending order by label frequency (least frequent label assigned 0), “alphabetDesc”: descending alphabetical order, and “alphabetAsc”: ascending alphabetical order (default = “frequencyDesc”). Note that in case of equal frequency when under “frequencyDesc”/”frequencyAsc”, the strings are further sorted by alphabet.

The unseen labels will be put at index numLabels if user chooses to keep them. If the input column is numeric, we cast it to string and index the string values. When downstream pipeline components such as Estimator or Transformer make use of this string-indexed label, you must set the input column of the component to this string-indexed column name. In many cases, you can set the input column with setInputCol.

In [143]:
#https://spark.apache.org/docs/latest/ml-features.html#stringindexer

from pyspark.ml.feature import StringIndexer

stringIndexer = StringIndexer(inputCol="MajorCategory", outputCol="MajorCategory_indexed") #, stringOrderType="frequencyDesc")

indexed = stringIndexer.fit(df).transform(df)

indexed.show(5)

+-------+-----------+---------+----------------+------+---------+---------------+-------------+---------------+-------------+------+--------------------+----------+------------+---------+--------------+--------+----------------------+-------------------------+---------------------+
|    Zip|ComplaintID|ProblemID|       SpaceType|TypeID|     Type|MajorCategoryID|MajorCategory|MinorCategoryID|MinorCategory|CodeID|                Code|StatusDate|ReceivedDate|closeTime|All Households|Families|Families with Children|Families without Children|MajorCategory_indexed|
+-------+-----------+---------+----------------+------+---------+---------------+-------------+---------------+-------------+------+--------------------+----------+------------+---------+--------------+--------+----------------------+-------------------------+---------------------+
|11432.0|  2397487.0|3768602.0|ENTIRE APARTMENT|   2.0|HAZARDOUS|           13.0|     NONCONST|          106.0|       VERMIN| 886.0|             ROACHE

### Note: I'm also going to do this StringIndexer on Zip

In [144]:
#https://spark.apache.org/docs/latest/ml-features.html#stringindexer

from pyspark.ml.feature import StringIndexer

stringIndexer = StringIndexer(inputCol="Zip", outputCol="Zip_indexed") #, stringOrderType="frequencyDesc")

indexed = stringIndexer.fit(df).transform(df)

indexed.show(10)

+-------+-----------+---------+----------------+------+-------------+---------------+-------------+---------------+-------------+------+--------------------+----------+------------+---------+--------------+--------+----------------------+-------------------------+-----------+
|    Zip|ComplaintID|ProblemID|       SpaceType|TypeID|         Type|MajorCategoryID|MajorCategory|MinorCategoryID|MinorCategory|CodeID|                Code|StatusDate|ReceivedDate|closeTime|All Households|Families|Families with Children|Families without Children|Zip_indexed|
+-------+-----------+---------+----------------+------+-------------+---------------+-------------+---------------+-------------+------+--------------------+----------+------------+---------+--------------+--------+----------------------+-------------------------+-----------+
|11432.0|  2397487.0|3768602.0|ENTIRE APARTMENT|   2.0|    HAZARDOUS|           13.0|     NONCONST|          106.0|       VERMIN| 886.0|             ROACHES|08/12/2004| 

# Option 2: Using MajorCategoryID and passing that into OneHotEncoder

https://spark.apache.org/docs/latest/ml-features.html#stringindexer

### OneHotEncoder
One-hot encoding maps a categorical feature, represented as a label index, to a binary vector with at most a single one-value indicating the presence of a specific feature value from among the set of all feature values. This encoding allows algorithms which expect continuous features, such as Logistic Regression, to use categorical features. For string type input data, it is common to encode categorical features using StringIndexer first.

OneHotEncoder can transform multiple columns, returning an one-hot-encoded output vector column for each input column. It is common to merge these vectors into a single feature vector using VectorAssembler.

OneHotEncoder supports the handleInvalid parameter to choose how to handle invalid input during transforming data. Available options include ‘keep’ (any invalid inputs are assigned to an extra categorical index) and ‘error’ (throw an error).

In [145]:
from pyspark.ml.feature import OneHotEncoder

encoder = OneHotEncoder(inputCols=["MajorCategoryID","Zip_indexed"], outputCols=["MajorCategoryID_Vec", "Zip_Vec"])

model = encoder.fit(indexed)

encoded = model.transform(indexed)

encoded.show(10)

+-------+-----------+---------+----------------+------+-------------+---------------+-------------+---------------+-------------+------+--------------------+----------+------------+---------+--------------+--------+----------------------+-------------------------+-----------+-------------------+----------------+
|    Zip|ComplaintID|ProblemID|       SpaceType|TypeID|         Type|MajorCategoryID|MajorCategory|MinorCategoryID|MinorCategory|CodeID|                Code|StatusDate|ReceivedDate|closeTime|All Households|Families|Families with Children|Families without Children|Zip_indexed|MajorCategoryID_Vec|         Zip_Vec|
+-------+-----------+---------+----------------+------+-------------+---------------+-------------+---------------+-------------+------+--------------------+----------+------------+---------+--------------+--------+----------------------+-------------------------+-----------+-------------------+----------------+
|11432.0|  2397487.0|3768602.0|ENTIRE APARTMENT|   2.0|   

# Note: I think it makes more sense to go with Option 2. The last column of the dataframe variable 'encoded' contains the OneHotEncoded term for each row 


# Next step: run the assembler and transformed (following along from classification_wisc_breast_cancer.ipynb)

In [146]:
# use some of the fields as features
# the VectorAssembler() takes in f1,f2,f3 and creates a new column called 'features', appending it to the end of the dataframe


#assembler = VectorAssembler(inputCols=["f1", "f2", "f3"], outputCol="features") 
assembler = VectorAssembler(inputCols=["MajorCategoryID_Vec", "Zip_Vec"], outputCol="features") 
transformed = assembler.transform(encoded)

# ".transform()" tokenizes the dataset df, using the VectorAssembler that took in f1,f2,f3 and created a column called 'features'

In [147]:
assembler,transformed

(VectorAssembler_ae73c3781590,
 DataFrame[Zip: double, ComplaintID: double, ProblemID: double, SpaceType: string, TypeID: double, Type: string, MajorCategoryID: double, MajorCategory: string, MinorCategoryID: double, MinorCategory: string, CodeID: double, Code: string, StatusDate: string, ReceivedDate: string, closeTime: double, All Households: double, Families: double, Families with Children: double, Families without Children: double, Zip_indexed: double, MajorCategoryID_Vec: vector, Zip_Vec: vector, features: vector])

In [148]:
# convert to RDD
# 'transformed is the DataFrame that has the 'features' column appended to the end of it'
# so this is selecting the response variable 'col('diagnosis')' and the synthetic 'features' variable, then mapping it as a tuple 
dataRdd = transformed.select(col("closeTime"), col("features")).rdd.map(tuple)

In [149]:
# look at some data
dataRdd.take(15)
# the dataRdd is a set of tuples containing the diagnosis in the first place (index=0) and the DenseVector of 'features' (f1,f2,f3) in
# the second place (index=1)


# 244 = 65 + 179
# the 65 is from the MajorCategoryID_Vec, it's the number of distinct MajorCategories in the dataset
# the 179 is from the Zip_Vec, there are 179 distinct Zipcodes in the dataset

# and the first numbers in the () are the closeTime

[(53.0, SparseVector(242, {13: 1.0, 128: 1.0})),
 (53.0, SparseVector(242, {13: 1.0, 128: 1.0})),
 (53.0, SparseVector(242, {28: 1.0, 128: 1.0})),
 (53.0, SparseVector(242, {13: 1.0, 128: 1.0})),
 (53.0, SparseVector(242, {9: 1.0, 128: 1.0})),
 (1179.0, SparseVector(242, {28: 1.0, 128: 1.0})),
 (49.0, SparseVector(242, {12: 1.0, 95: 1.0})),
 (3045.0, SparseVector(242, {28: 1.0, 66: 1.0})),
 (152.0, SparseVector(242, {11: 1.0, 66: 1.0})),
 (3045.0, SparseVector(242, {28: 1.0, 66: 1.0})),
 (84.0, SparseVector(242, {10: 1.0, 66: 1.0})),
 (84.0, SparseVector(242, {9: 1.0, 66: 1.0})),
 (3045.0, SparseVector(242, {11: 1.0, 66: 1.0})),
 (52.0, SparseVector(242, {12: 1.0, 72: 1.0})),
 (2362.0, SparseVector(242, {28: 1.0, 118: 1.0}))]

In [152]:
# ...

### Ugh this is LogisticRegression...stopping now and moving to linear regression

# Trying this now with linear regression
https://towardsdatascience.com/building-a-linear-regression-with-pyspark-and-mllib-d065c3ba246a

In [153]:
from pyspark.sql import SparkSession

spark = SparkSession.builder \
        .appName("comm") \
        .getOrCreate()

df2 = spark.read.csv("join_df_Mod4_April_16_edit1.csv",inferSchema=True, header=True)

In [82]:
# df2 = df.select('*').limit(10)

In [154]:
df2.show()

+-------+-----------+-----------+----------------+------+-------------+---------------+-------------+---------------+-------------+------+--------------------+----------+------------+---------+--------------+--------+----------------------+-------------------------+
|    Zip|ComplaintID|  ProblemID|       SpaceType|TypeID|         Type|MajorCategoryID|MajorCategory|MinorCategoryID|MinorCategory|CodeID|                Code|StatusDate|ReceivedDate|closeTime|All Households|Families|Families with Children|Families without Children|
+-------+-----------+-----------+----------------+------+-------------+---------------+-------------+---------------+-------------+------+--------------------+----------+------------+---------+--------------+--------+----------------------+-------------------------+
|11432.0|  2397487.0|  3768602.0|ENTIRE APARTMENT|   2.0|    HAZARDOUS|           13.0|     NONCONST|          106.0|       VERMIN| 886.0|             ROACHES|08/12/2004|  06/20/2004|     53.0|      

In [155]:
df.cache()
df.printSchema()

root
 |-- Zip: double (nullable = true)
 |-- ComplaintID: double (nullable = true)
 |-- ProblemID: double (nullable = true)
 |-- SpaceType: string (nullable = true)
 |-- TypeID: double (nullable = true)
 |-- Type: string (nullable = true)
 |-- MajorCategoryID: double (nullable = true)
 |-- MajorCategory: string (nullable = true)
 |-- MinorCategoryID: double (nullable = true)
 |-- MinorCategory: string (nullable = true)
 |-- CodeID: double (nullable = true)
 |-- Code: string (nullable = true)
 |-- StatusDate: string (nullable = true)
 |-- ReceivedDate: string (nullable = true)
 |-- closeTime: double (nullable = true)
 |-- All Households: double (nullable = true)
 |-- Families: double (nullable = true)
 |-- Families with Children: double (nullable = true)
 |-- Families without Children: double (nullable = true)



In [156]:
df.describe().toPandas().transpose()

Unnamed: 0,0,1,2,3,4
summary,count,mean,stddev,min,max
Zip,275323,10732.27465195425,512.8310972379447,10001.0,11697.0
ComplaintID,275323,8640507.667964537,999659.684692623,1936914.0,1.0410825E7
ProblemID,275323,1.7764211552580062E7,1926228.4741469913,2849265.0,2.1143445E7
SpaceType,275323,,,AIR SHAFT,YARD
TypeID,275323,1.7902209404953455,1.0301193256644046,1.0,4.0
Type,275323,,,EMERGENCY,NON EMERGENCY
MajorCategoryID,275323,46.04006566832411,21.300742681710446,8.0,65.0
MajorCategory,275323,,,APPLIANCE,WATER LEAK
MinorCategoryID,275323,295.0284102672134,107.02721849161907,59.0,400.0


In [30]:
# # Scatter matrix is a great way to roughly determine if we have a linear correlation between multiple independent variables.

# from pandas.plotting import scatter_matrix 
# #https://stackoverflow.com/questions/55394041/how-can-i-solve-module-pandas-has-no-attribute-scatter-matrix-error

# numeric_features = [t[0] for t in df.dtypes if t[1] == 'int' or t[1] == 'double']
# sampled_data = df.select(numeric_features).sample(False, 0.8).toPandas()
# axs = scatter_matrix(sampled_data, figsize=(10, 10))
# n = len(sampled_data.columns)
# for i in range(n):
#     v = axs[i, 0]
#     v.yaxis.label.set_rotation(0)
#     v.yaxis.label.set_ha('right')
#     v.set_yticks(())
#     h = axs[n-1, i]
#     h.xaxis.label.set_rotation(90)
#     h.set_xticks(())

In [None]:
import six
for i in df.columns:
    if not( isinstance(df.select(i).take(1)[0][0], six.string_types)):
        print( "Correlation to closeTime for ", i, df.stat.corr('closeTime',i))

# Note:
That's actually interesting to see. This obviously needs to be edited for better performance (if columns like Zip and MajorCategory were OneHotEncoded I think they would perform better). However, even still, seeing TypeID show 10% correlation is interesting.

In [157]:
#https://spark.apache.org/docs/latest/ml-features.html#stringindexer

from pyspark.ml.feature import StringIndexer

stringIndexer = StringIndexer(inputCol="MajorCategory", outputCol="MajorCategory_indexed") #, stringOrderType="frequencyDesc")

indexed = stringIndexer.fit(df2).transform(df2)

indexed.show(3)

+-------+-----------+---------+----------------+------+---------+---------------+-------------+---------------+-------------+------+--------------------+----------+------------+---------+--------------+--------+----------------------+-------------------------+---------------------+
|    Zip|ComplaintID|ProblemID|       SpaceType|TypeID|     Type|MajorCategoryID|MajorCategory|MinorCategoryID|MinorCategory|CodeID|                Code|StatusDate|ReceivedDate|closeTime|All Households|Families|Families with Children|Families without Children|MajorCategory_indexed|
+-------+-----------+---------+----------------+------+---------+---------------+-------------+---------------+-------------+------+--------------------+----------+------------+---------+--------------+--------+----------------------+-------------------------+---------------------+
|11432.0|  2397487.0|3768602.0|ENTIRE APARTMENT|   2.0|HAZARDOUS|           13.0|     NONCONST|          106.0|       VERMIN| 886.0|             ROACHE

In [158]:
#https://spark.apache.org/docs/latest/ml-features.html#stringindexer

from pyspark.ml.feature import StringIndexer

stringIndexer = StringIndexer(inputCol="Zip", outputCol="Zip_indexed") #, stringOrderType="frequencyDesc")

indexed = stringIndexer.fit(df2).transform(df2)

indexed.show(3)

+-------+-----------+---------+----------------+------+---------+---------------+-------------+---------------+-------------+------+--------------------+----------+------------+---------+--------------+--------+----------------------+-------------------------+-----------+
|    Zip|ComplaintID|ProblemID|       SpaceType|TypeID|     Type|MajorCategoryID|MajorCategory|MinorCategoryID|MinorCategory|CodeID|                Code|StatusDate|ReceivedDate|closeTime|All Households|Families|Families with Children|Families without Children|Zip_indexed|
+-------+-----------+---------+----------------+------+---------+---------------+-------------+---------------+-------------+------+--------------------+----------+------------+---------+--------------+--------+----------------------+-------------------------+-----------+
|11432.0|  2397487.0|3768602.0|ENTIRE APARTMENT|   2.0|HAZARDOUS|           13.0|     NONCONST|          106.0|       VERMIN| 886.0|             ROACHES|08/12/2004|  06/20/2004|    

In [159]:
from pyspark.ml.feature import OneHotEncoder

encoder = OneHotEncoder(inputCols=["MajorCategoryID","Zip_indexed"], outputCols=["MajorCategoryID_Vec", "Zip_Vec"])

model = encoder.fit(indexed)

encoded = model.transform(indexed)

encoded.show(3)

+-------+-----------+---------+----------------+------+---------+---------------+-------------+---------------+-------------+------+--------------------+----------+------------+---------+--------------+--------+----------------------+-------------------------+-----------+-------------------+----------------+
|    Zip|ComplaintID|ProblemID|       SpaceType|TypeID|     Type|MajorCategoryID|MajorCategory|MinorCategoryID|MinorCategory|CodeID|                Code|StatusDate|ReceivedDate|closeTime|All Households|Families|Families with Children|Families without Children|Zip_indexed|MajorCategoryID_Vec|         Zip_Vec|
+-------+-----------+---------+----------------+------+---------+---------------+-------------+---------------+-------------+------+--------------------+----------+------------+---------+--------------+--------+----------------------+-------------------------+-----------+-------------------+----------------+
|11432.0|  2397487.0|3768602.0|ENTIRE APARTMENT|   2.0|HAZARDOUS|     

In [160]:
from pyspark.ml.feature import VectorAssembler

vectorAssembler = VectorAssembler(inputCols = ['Zip', 'ComplaintID', 'ProblemID', 'TypeID', 'MajorCategoryID', 'MinorCategoryID', 'CodeID', 'All Households', 'Families', 'Families with Children', 'Families without Children'], outputCol = 'features')

vhouse_df = vectorAssembler.transform(df2)
vhouse_df = vhouse_df.select(['features', 'closeTime'])

vhouse_df.show()

+--------------------+---------+
|            features|closeTime|
+--------------------+---------+
|[11432.0,2397487....|     53.0|
|[11432.0,2397487....|     53.0|
|[11432.0,2397487....|     53.0|
|[11432.0,2397487....|     53.0|
|[11432.0,2397487....|     53.0|
|[11432.0,2397487....|   1179.0|
|[11230.0,2784991....|     49.0|
|[10467.0,3976078....|   3045.0|
|[10467.0,3976078....|    152.0|
|[10467.0,3976078....|   3045.0|
|[10467.0,3976078....|     84.0|
|[10467.0,3976078....|     84.0|
|[10467.0,3976078....|   3045.0|
|[10456.0,4249936....|     52.0|
|[11211.0,4827199....|   2362.0|
|[10035.0,5299242....|     16.0|
|[10035.0,5446548....|     12.0|
|[11209.0,6020766....|    100.0|
|[11220.0,6102817....|     50.0|
|[10029.0,6156209....|    621.0|
+--------------------+---------+
only showing top 20 rows



In [161]:
splits = vhouse_df.randomSplit([0.7, 0.3])

train_df = splits[0]
test_df = splits[1]

In [162]:
#Option 1


# Linear Regression

from pyspark.ml.regression import LinearRegression

lr = LinearRegression(featuresCol = 'features', labelCol='closeTime', maxIter=10, regParam=0.3, elasticNetParam=0.8)
lr_model = lr.fit(train_df)

print("Coefficients: " + str(lr_model.coefficients))
print("Intercept: " + str(lr_model.intercept))

Coefficients: [-0.0001516852538944442,0.0,-1.9405492493167985e-06,3.0664923939159556,-0.08321899215320436,0.0,-0.0009994276615069389,1.0649955413826916e-05,0.0,0.0,0.0]
Intercept: 50.301184577481756


In [163]:
trainingSummary = lr_model.summary
print("RMSE: %f" % trainingSummary.rootMeanSquaredError)
print("r2: %f" % trainingSummary.r2)

RMSE: 42.064267
r2: 0.020839


In [164]:
train_df.describe().show()

+-------+------------------+
|summary|         closeTime|
+-------+------------------+
|  count|            192824|
|   mean|13.865213873791644|
| stddev| 42.50962975662086|
|    min|            -168.0|
|    max|            3335.0|
+-------+------------------+



RMSE measures the differences between predicted values by the model and the actual values. However, RMSE alone is meaningless until we compare with the actual closeTime value, such as mean, min and max. 

### Interpretation? 

I'm guessing this means the linear model we built is pretty bad. R2 of 0.020839 means that roughly 2% of the variability in closeTime can be explained using the model. 

In [166]:
lr_predictions = lr_model.transform(test_df)
lr_predictions.select("prediction","closeTime","features").show(5)
from pyspark.ml.evaluation import RegressionEvaluator
lr_evaluator = RegressionEvaluator(predictionCol="prediction", \
                 labelCol="closeTime",metricName="r2")
print("R Squared (R2) on test data = %g" % lr_evaluator.evaluate(lr_predictions))

+------------------+---------+--------------------+
|        prediction|closeTime|            features|
+------------------+---------+--------------------+
| 22.33798946858534|      6.0|[10001.0,7036561....|
|16.573003607326875|      1.0|[10001.0,7055145....|
| 25.39223056385068|      6.0|[10001.0,7066834....|
| 22.77454386618384|      6.0|[10001.0,7066834....|
|  14.4697152669263|     53.0|[10001.0,7525334....|
+------------------+---------+--------------------+
only showing top 5 rows

R Squared (R2) on test data = 0.0215144


In [167]:
test_result = lr_model.evaluate(test_df)
print("Root Mean Squared Error (RMSE) on test data = %g" % test_result.rootMeanSquaredError)

Root Mean Squared Error (RMSE) on test data = 40.5343


In [168]:
print("numIterations: %d" % trainingSummary.totalIterations)
print("objectiveHistory: %s" % str(trainingSummary.objectiveHistory))
trainingSummary.residuals.show()

numIterations: 11
objectiveHistory: [0.5, 0.49363459502247903, 0.4912857186929599, 0.491096842856474, 0.4910389319933901, 0.49101920513001057, 0.4909560450922158, 0.4909261916755941, 0.4909170348964062, 0.49087468340101764, 0.4908524079510636]
+-------------------+
|          residuals|
+-------------------+
|  677.7265402216044|
| -1.480573265228145|
|-1.4805713246788912|
| -10.03157264336572|
|-10.209000450850198|
|-10.645814955710492|
| -10.03156488116872|
|-10.031562940619466|
|-13.593094113705057|
|-16.323737374062464|
| -14.64706246543307|
| -9.467656920632997|
| -16.10753949369103|
|-14.891979491574645|
|-17.083755577330724|
|  382.5648083775758|
|-10.612802239427353|
|-10.490130418631793|
| -7.311381141100391|
|-13.304912889375828|
+-------------------+
only showing top 20 rows



In [169]:
# Using our Linear Regression model to make some predictions:
predictions = lr_model.transform(test_df)
predictions.select("prediction","closeTime","features").show()

+------------------+---------+--------------------+
|        prediction|closeTime|            features|
+------------------+---------+--------------------+
| 22.33798946858534|      6.0|[10001.0,7036561....|
|16.573003607326875|      1.0|[10001.0,7055145....|
| 25.39223056385068|      6.0|[10001.0,7066834....|
| 22.77454386618384|      6.0|[10001.0,7066834....|
|  14.4697152669263|     53.0|[10001.0,7525334....|
| 14.44968487277675|     23.0|[10001.0,7528706....|
|14.412754985152453|      6.0|[10001.0,7661040....|
|14.370586849964802|      2.0|[10001.0,7674345....|
| 14.34431147067803|      0.0|[10001.0,7708426....|
|19.931273761198767|      0.0|[10001.0,7708426....|
|20.527268133961364|    125.0|[10001.0,7719730....|
| 19.89802664449699|    125.0|[10001.0,7719730....|
| 24.59936870908634|      9.0|[10001.0,7730812....|
| 12.79042290997917|      6.0|[10001.0,8008630....|
|18.420418346810834|     45.0|[10001.0,8087123....|
|12.439292169614212|      3.0|[10001.0,8160811....|
|22.27926915

# Decision Tree Regression

In [171]:
from pyspark.ml.regression import DecisionTreeRegressor

dt = DecisionTreeRegressor(featuresCol ='features', labelCol = 'closeTime')
dt_model = dt.fit(train_df)
dt_predictions = dt_model.transform(test_df)
dt_evaluator = RegressionEvaluator(
    labelCol="closeTime", predictionCol="prediction", metricName="rmse")
rmse = dt_evaluator.evaluate(dt_predictions)
print("Root Mean Squared Error (RMSE) on test data = %g" % rmse)

Root Mean Squared Error (RMSE) on test data = 40.6957


In [172]:
dt_model.featureImportances


SparseVector(11, {0: 0.1598, 1: 0.1559, 3: 0.1161, 4: 0.2012, 5: 0.0713, 6: 0.0153, 7: 0.0, 8: 0.0405, 9: 0.2398})

### Interpretation:
None of the variables are particularly important in predicting the closeTime however some are greater than others

For example, the index 7 item has 0.0 importance while index 9 item has the highest of 0.2398


Need to actually go back and determine what each index represents...I can do that later by looking at what I fed into the VectorAssembler 

# Note:
I'm going to keep on testing models here but it's starting to become clear that the variable selection process needs to be altered. It's obviously super important what we put into the model. 

It might make sense to put 100% of the variables into the model at first to see how that performs? Especially because some of the models we run have feature selection built into their algorithms so maybe it'd be best just to do that and then work to pair down the insignificant terms?

# Gradient-boosted tree regression

In [176]:
from pyspark.ml.regression import GBTRegressor
gbt = GBTRegressor(featuresCol = 'features', labelCol = 'closeTime', maxIter=10)
gbt_model = gbt.fit(train_df)
gbt_predictions = gbt_model.transform(test_df)
gbt_predictions.select('prediction', 'closeTime', 'features').show(5)

+------------------+---------+--------------------+
|        prediction|closeTime|            features|
+------------------+---------+--------------------+
|50.984674801984916|      6.0|[10001.0,7036561....|
| 4.322943505230454|      1.0|[10001.0,7055145....|
|  23.0004734004963|      6.0|[10001.0,7066834....|
|47.503186649662226|      6.0|[10001.0,7066834....|
|21.848660001687364|     53.0|[10001.0,7525334....|
+------------------+---------+--------------------+
only showing top 5 rows



In [177]:
gbt_evaluator = RegressionEvaluator(
    labelCol="closeTime", predictionCol="prediction", metricName="rmse")
rmse = gbt_evaluator.evaluate(gbt_predictions)
print("Root Mean Squared Error (RMSE) on test data = %g" % rmse)

Root Mean Squared Error (RMSE) on test data = 40.3526


# This performed equally bad to all the others

# Now I'm going to go ahead and actually throw all the features in and see how that performs

# K-Means
MLlib Clustering ipynb from Mod5

In [181]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.getOrCreate()

from pyspark.ml.clustering import KMeans
from pyspark.ml.evaluation import ClusteringEvaluator
from pyspark.ml.feature import VectorAssembler

In [187]:
# Assemble the features

feats = ['Zip', 'ComplaintID', 'ProblemID', 'TypeID', 'MajorCategoryID', 'MinorCategoryID', 'CodeID', 'All Households', 'Families', 'Families with Children', 'Families without Children']
assembler = VectorAssembler(inputCols=feats, outputCol="features")

dataset = assembler.transform(df)
dataset.select(['features','closeTime']).show(truncate=False)
#dataset.select("*").show(truncate=False)

+---------------------------------------------------------------------------------------+---------+
|features                                                                               |closeTime|
+---------------------------------------------------------------------------------------+---------+
|[11432.0,2397487.0,3768602.0,2.0,13.0,106.0,886.0,62148.0,67198.0,55290.0,79468.0]     |53.0     |
|[11432.0,2397487.0,3768603.0,2.0,13.0,106.0,884.0,62148.0,67198.0,55290.0,79468.0]     |53.0     |
|[11432.0,2397487.0,3768604.0,2.0,28.0,198.0,1400.0,62148.0,67198.0,55290.0,79468.0]    |53.0     |
|[11432.0,2397487.0,3768605.0,1.0,13.0,101.0,1309.0,62148.0,67198.0,55290.0,79468.0]    |53.0     |
|[11432.0,2397487.0,3768606.0,1.0,9.0,68.0,1282.0,62148.0,67198.0,55290.0,79468.0]      |53.0     |
|[11432.0,2397487.0,3768607.0,2.0,28.0,198.0,1366.0,62148.0,67198.0,55290.0,79468.0]    |1179.0   |
|[11230.0,2784991.0,4559325.0,1.0,12.0,196.0,1358.0,53070.0,66240.0,61640.0,72023.0]    |49.0     |


In [183]:
# Train a k-means model with k=2

kmeans = KMeans().setK(2).setSeed(314)
model = kmeans.fit(dataset)

In [190]:
# Make Predictions
# note the transform() method does prediction

predictions = model.transform(dataset)
predictions.select(['features','closeTime','prediction']).show()

+--------------------+---------+----------+
|            features|closeTime|prediction|
+--------------------+---------+----------+
|[11432.0,2397487....|     53.0|         0|
|[11432.0,2397487....|     53.0|         0|
|[11432.0,2397487....|     53.0|         0|
|[11432.0,2397487....|     53.0|         0|
|[11432.0,2397487....|     53.0|         0|
|[11432.0,2397487....|   1179.0|         0|
|[11230.0,2784991....|     49.0|         0|
|[10467.0,3976078....|   3045.0|         0|
|[10467.0,3976078....|    152.0|         0|
|[10467.0,3976078....|   3045.0|         0|
|[10467.0,3976078....|     84.0|         0|
|[10467.0,3976078....|     84.0|         0|
|[10467.0,3976078....|   3045.0|         0|
|[10456.0,4249936....|     52.0|         0|
|[11211.0,4827199....|   2362.0|         0|
|[10035.0,5299242....|     16.0|         0|
|[10035.0,5446548....|     12.0|         0|
|[11209.0,6020766....|    100.0|         0|
|[11220.0,6102817....|     50.0|         0|
|[10029.0,6156209....|    621.0|

In [192]:
# Evaluate the Model:

# Evaluate clustering by computing Silhouette score
evaluator = ClusteringEvaluator()

silhouette = evaluator.evaluate(predictions)
print("Silhouette with squared euclidean distance = " + str(silhouette))



# cost = model.computeCost(dataset)
# print("Within Set Sum of Squared Errors = " + str(cost))
# ...
# https://stackoverflow.com/questions/65195312/kmeansmodel-object-has-no-attribute-computecost-in-apache-pyspark
# apparently computeCost is deprecated in Spark 3.0.0
# the docs for PySpark haven't been updated or published to detail how computeCost can now be calculated in PySpark 3.0 ... which is unfortunate
# ...


print("Cluster Centers: ")
centers = model.clusterCenters()
print(centers)

Silhouette with squared euclidean distance = 0.7786344236324183
Cluster Centers: 
[array([1.07353882e+04, 7.79518686e+06, 1.61352546e+07, 1.81410997e+00,
       4.53313726e+01, 2.92043470e+02, 2.56444802e+03, 5.51138692e+04,
       6.67303528e+04, 6.35402681e+04, 7.09187095e+04]), array([1.07290812e+04, 9.50751532e+06, 1.94349597e+07, 1.76571904e+00,
       4.67669404e+01, 2.98089929e+02, 2.59335079e+03, 5.40721223e+04,
       6.52975332e+04, 6.17738328e+04, 6.96796428e+04])]


# Performance: good but not great

# Gaussian Mixture Model
The Gaussian Mixture Model is a weighted combination of underlying Gaussian distributions, each with a fixed probability.
The expectation-maximization algorithm is used in spark.mllib to estimate the parameters.
There is a mean vector and a covariance matrix for each cluster.

Fit Mixture of Two Gaussians

In [193]:
from pyspark.ml.clustering import GaussianMixture

# reuse data from K-Means example above

gmm = GaussianMixture().setK(2).setSeed(314)
model = gmm.fit(dataset)

print("Gaussians shown as a DataFrame: ")
print("component mean vectors")
model.gaussiansDF.select("mean").show(truncate=False)

print("component covariance matrices")
model.gaussiansDF.select("cov").show(truncate=False)

Gaussians shown as a DataFrame: 
component mean vectors
+------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|mean                                                                                                                                                                                                        |
+------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|[10732.274651954249,8640507.667964537,1.776421155258006E7,1.7902209404953453,46.04006566832411,295.0284102672134,2578.7163767647453,54599.59264936093,66023.01643160942,62668.23653672232,70307.02262070368]|
|[10732.274651954249,8640507.667964537,1.776421155258006E7,1.7902209404953453,46.04006566832411,295.0284102672134,25

# Note:
It's great that all of the models now run but it is painfully clear how important it is going to be to approparitely feature select for this project


# Useful ipynbs will be the cal_housing assignment and the fidelity clustering assignment from Mod5

looking at classification_wisc_breast_cancer from Mod4 will also be helpful

In [None]:

# 