###What's been done in this notebook:

1. Stood up Spark cluster

2. uploaded listings.csv as a table into the default database

3. Cleaned dirty data since sample was locked down.

4. Replicated Siavash's notebook he sent me for timings

5. Tested how long it would take to calculate a root mean squared error on 82029 records.

###Functions and timing:

1. Count() - 14.8s

2. Replace nulls with '0', regex_replace special characters, change 6 columns to float data type - 1.13s

3. Show histogram of all prices - 17.31s

4. Plot a GroupBy and Count of 'Neighborhood_cleansed' column - 14.80s

5. Calculate average price and median price - 27s

6. Calculate RMSE for both Avg Price and Median columns - 28.5s

In [2]:
%%time

# loading table into spark format
listings = spark.table('listings_csv')

# count total number of records
recordCount = listings.count()

print("Total number of records: " + str(recordCount))

This isn't a clean dataset. I pulled this from http://insideairbnb.com/get-the-data.html for London (02 June, 2019)
Download link: http://data.insideairbnb.com/united-kingdom/england/london/2019-06-05/data/listings.csv.gz

While loading the data in - I noticed that the prices in the data had money signs and were loaded in as strings. We need to take out "$" signs and convert their data types to a float since they have to do with money.

In [4]:
%%time
from pyspark.sql import functions as F

# tried to iterate through with this function but for some reason it would just regex the last column...
def cleanColumn(tmpdf,colName,findChar,replaceChar):
  """Pass in DF, column, and character to replace and replaces it with replaceChar variable"""
  tmpdf = tmpdf.withColumn(colName, F.regexp_replace(colName, findChar, replaceChar))
  return tmpdf

def fillNA(tmpdf, colName, nullReplacement):
  """pass DF, column name, and what to replace nulls with"""
  tmpdf = tmpdf.fillna({colName:nullReplacement})
  return tmpdf

def changeColumnDataType(tmpdf, colName, dataType):
  """pass in DF, column name, and datatype to convert to"""
  tmpdf = tmpdf.withColumn(colName, tmpdf[colName].cast(dataType))
  return tmpdf

monetaryColumns = ['price', 'weekly_price', 'monthly_price', 'security_deposit', 'cleaning_fee', 'extra_people']

# this is calling the function to try and iterate through and regex the columns I needed but would not work.
charToRemove='\W'
replaceWith =''
nullReplacement = '0'
dataType = "float"
for colName in monetaryColumns:
  listings = cleanColumn(listings,colName,charToRemove,replaceWith)
  listings = fillNA(listings, colName, nullReplacement)
  listings = changeColumnDataType(listings, colName, dataType)

# the columns we're interested in are 'price', 'weekly_price', 'monthly_price', 'security_deposit', 'cleaning_fee', 'extra_people'
# listings.select('price', 'weekly_price', 'monthly_price', 'security_deposit', 'cleaning_fee', 'extra_people').show()

# notice there are some null values. let's change those to 0.
  
listings.select('price', 'weekly_price', 'monthly_price', 'security_deposit', 'cleaning_fee', 'extra_people').show()

print(listings.select('price', 'weekly_price', 'monthly_price', 'security_deposit', 'cleaning_fee', 'extra_people').dtypes)

print(len(listings.columns))

In [5]:
# IGNORE THIS CELL. THIS IS BACKUP. 
# %%time
# from pyspark.sql import functions as F

# # tried to iterate through with this function but for some reason it would just regex the last column...
# # def cleanColumn(tmpdf,colName,findChar,replaceChar):
# #   """Pass in DF, column, and character to replace and replaces it with replaceChar variable"""
# #   tmpdf = tmpdf.withColumn(colName, F.regexp_replace(colName, findChar, replaceChar))
# #   return tmpdf

# def fillNA(tmpdf, colName, nullReplacement):
#   """pass DF, column name, and what to replace nulls with"""
#   tmpdf = tmpdf.fillna({colName:nullReplacement})
#   return tmpdf

# def changeColumnDataType(tmpdf, colName, dataType):
#   """pass in DF, column name, and datatype to convert to"""
#   tmpdf = tmpdf.withColumn(colName, tmpdf[colName].cast(dataType))
#   return tmpdf

# monetaryColumns = ['price', 'weekly_price', 'monthly_price', 'security_deposit', 'cleaning_fee', 'extra_people']

# # this is calling the function to try and iterate through and regex the columns I needed but would not work.
# # charToRemove='\W'
# # replaceWith =''
# # for colName in monetaryColumns:
# #   listings_clean = cleanColumn(listings,colName,charToRemove,replaceWith)

# # regex column values to take out dollar signs
# # honestly this was the most frustrating thing ever. I had to make 6 separate dataframes? I would try to overwrite the listings_clean dataframe but it would only regex the last column.
# listings_clean = listings.withColumn('price', F.regexp_replace('price', '\W', ''))
# listings_clean_2 = listings_clean.withColumn('weekly_price', F.regexp_replace('weekly_price', '\W', ''))
# listings_clean_3 = listings_clean_2.withColumn('monthly_price', F.regexp_replace('monthly_price', '\W', ''))
# listings_clean_4 = listings_clean_3.withColumn('security_deposit', F.regexp_replace('security_deposit', '\W', ''))
# listings_clean_5 = listings_clean_4.withColumn('cleaning_fee', F.regexp_replace('cleaning_fee', '\W', ''))
# listings_clean_6 = listings_clean_5.withColumn('extra_people', F.regexp_replace('extra_people', '\W', ''))

# # the columns we're interested in are 'price', 'weekly_price', 'monthly_price', 'security_deposit', 'cleaning_fee', 'extra_people'
# listings_clean_6.select('price', 'weekly_price', 'monthly_price', 'security_deposit', 'cleaning_fee', 'extra_people').show()

# # notice there are some null values. let's change those to 0.

# nullReplacement = '0'
# for col in monetaryColumns:
#   listings_clean_6 = fillNA(listings_clean_6, col, nullReplacement)
  
# listings_clean_6.select('price', 'weekly_price', 'monthly_price', 'security_deposit', 'cleaning_fee', 'extra_people').show()

# # now that nulls are cleaned up, change the datatype to float
# dataType = "float"
# for col in monetaryColumns:
#   listings_clean_6 = changeColumnDataType(listings_clean_6, col, dataType)

# print(listings_clean_6.select('price', 'weekly_price', 'monthly_price', 'security_deposit', 'cleaning_fee', 'extra_people').dtypes)

# print(len(listings_clean_6.columns))

In [6]:
%%time
display(listings.select("price"))

price
8800.0
6500.0
10000.0
30000.0
17500.0
6500.0
2900.0
14700.0
7500.0
3400.0


In [7]:
%%time
display(listings.select(F.log(F.col("price"))))

LOG(price)
9.0825070004663
8.779557455883728
9.210340371976184
10.308952660644293
9.769956159911606
8.779557455883728
7.972466015974565
9.595602772766828
8.922658299524402
8.131530710604252


In [8]:
%%time
display(listings.groupBy("neighbourhood_cleansed").count())

neighbourhood_cleansed,count
Wandsworth,4135
Croydon,1081
Bexley,240
Lambeth,4712
Barking and Dagenham,334
Camden,5830
Greenwich,1640
Newham,1926
Tower Hamlets,7885
Barnet,1564


In [9]:
%%time
from pyspark.sql.functions import desc
display(listings.groupBy("neighbourhood_cleansed").count().sort(desc("count")))

neighbourhood_cleansed,count
Westminster,9275
Tower Hamlets,7885
Hackney,6007
Camden,5830
Kensington and Chelsea,5770
Islington,5020
Southwark,4784
Lambeth,4712
Wandsworth,4135
Hammersmith and Fulham,4134


In [10]:
%%time
avgPrice = listings.select(F.avg((listings['price']))).collect()[0]["avg(price)"]
print("Average price: ", avgPrice)

medianPrice = listings.approxQuantile('price', [0.5], 0.25)[0]
print("Median: ", medianPrice)

In [11]:
%%time
predDF = listings.withColumn('avgPrediction', F.lit(avgPrice).cast("float"))
predDF = predDF.withColumn('medianPrediction', F.lit(medianPrice).cast("float"))

print(len(predDF.columns))

predDF.select('avgPrediction', 'medianPrediction').show()

In [12]:
%%time
from pyspark.ml.evaluation import RegressionEvaluator

regressionMeanEvaluator = RegressionEvaluator(predictionCol="avgPrediction", labelCol="price", metricName="rmse")

print("The RMSE for predicting the average price is: ", regressionMeanEvaluator.evaluate(predDF))

regressionMedianEvaluator = RegressionEvaluator(predictionCol="medianPrediction", labelCol="price", metricName="rmse")
  
print("The RMSE for predicting the median price is: ", regressionMedianEvaluator.evaluate(predDF))

This is like the worst RMSE ever since it's just an average price and median. The compute time is what we're interested in. To calculate RMSE you have to compute each row with this formula: