# 0. Importing Libraries

In [1]:
import pandas as pd
import numpy as np
import pyspark as sp
import findspark

from pyspark.sql import SparkSession

# 1. Find Spark

Adding pyspark to sys.path at runtime using the library findspark

In [2]:
findspark.init()
findspark.find()

'C:\\Users\\alexs\\Downloads\\spark-3.4.0-bin-hadoop3\\spark-3.4.0-bin-hadoop3'

# 2. Creating SparkSession

One aspect of the explanation why SparkSession is preferable over SparkContext in SparkSession Vs SparkContext battle is that SparkSession unifies all of Spark’s numerous contexts, removing the developer’s need to worry about generating separate contexts.

In [3]:
#Create the SparkSession
my_spark = SparkSession.builder.getOrCreate()

#print the session
print(my_spark)

FileNotFoundError: [WinError 2] El sistema no puede encontrar el archivo especificado

# 3. Dataset overview

In [None]:
df = my_spark.read.csv('2017_StPaul_MN_Real_Estate.csv', header=True)
df.columns

In [None]:
# Select our dependent variable
Y_df = df.select(['SalesClosePrice'])

# Display summary statistics
Y_df.describe().show()

# 4. Preprocessing data I

In [None]:
#Check data of the feature to predict 
df.select(["SalesClosePrice"]).dtypes

In [None]:
# convert the data type of SalesClosePrice to integer
df = df.withColumn("SalesClosePrice", df.SalesClosePrice.cast("integer"))

In [None]:
#Check again the type after converting it.
df.select(["SalesClosePrice"]).dtypes

In [None]:
df.select('SalesClosePrice').describe().show()

In [None]:
#Modifying some types of key variables
df = df.withColumn("AssessedValuation", df.AssessedValuation.cast("double"))
df = df.withColumn("AssociationFee", df.AssociationFee.cast("bigint"))
df = df.withColumn("SQFTBELOWGROUND", df.SQFTBELOWGROUND.cast("bigint"))

In [None]:
#modifying name of column to capital letters
required_dtypes = [('NO', 'bigint'),
 ('MLSID', 'string'),
 ('STREETNUMBERNUMERIC', 'bigint'),
 ('STREETADDRESS', 'string'),
 ('STREETNAME', 'string'),
 ('POSTALCODE', 'bigint'),
 ('STATEORPROVINCE', 'string'),
 ('CITY', 'string'),
 ('SALESCLOSEPRICE', 'bigint'),
 ('LISTDATE', 'string'),
 ('LISTPRICE', 'bigint'),
 ('LISTTYPE', 'string'),
 ('ORIGINALLISTPRICE', 'bigint'),
 ('PRICEPERTSFT', 'double'),
 ('FOUNDATIONSIZE', 'bigint'),
 ('FENCE', 'string'),
 ('MAPLETTER', 'string'),
 ('LOTSIZEDIMENSIONS', 'string'),
 ('SCHOOLDISTRICTNUMBER', 'string'),
 ('DAYSONMARKET', 'bigint'),
 ('OFFMARKETDATE', 'string'),
 ('FIREPLACES', 'bigint'),
 ('ROOMAREA4', 'string'),
 ('ROOMTYPE', 'string'),
 ('ROOF', 'string'),
 ('ROOMFLOOR4', 'string'),
 ('POTENTIALSHORTSALE', 'string'),
 ('POOLDESCRIPTION', 'string'),
 ('PDOM', 'bigint'),
 ('GARAGEDESCRIPTION', 'string'),
 ('SQFTABOVEGROUND', 'bigint'),
 ('TAXES', 'bigint'),
 ('ROOMFLOOR1', 'string'),
 ('ROOMAREA1', 'string'),
 ('TAXWITHASSESSMENTS', 'double'),
 ('TAXYEAR', 'bigint'),
 ('LIVINGAREA', 'bigint'),
 ('UNITNUMBER', 'string'),
 ('YEARBUILT', 'bigint'),
 ('ZONING', 'string'),
 ('STYLE', 'string'),
 ('ACRES', 'double'),
 ('COOLINGDESCRIPTION', 'string'),
 ('APPLIANCES', 'string'),
 ('BACKONMARKETDATE', 'double'),
 ('ROOMFAMILYCHAR', 'string'),
 ('ROOMAREA3', 'string'),
 ('EXTERIOR', 'string'),
 ('ROOMFLOOR3', 'string'),
 ('ROOMFLOOR2', 'string'),
 ('ROOMAREA2', 'string'),
 ('DININGROOMDESCRIPTION', 'string'),
 ('BASEMENT', 'string'),
 ('BATHSFULL', 'bigint'),
 ('BATHSHALF', 'bigint'),
 ('BATHQUARTER', 'bigint'),
 ('BATHSTHREEQUARTER', 'double'),
 ('CLASS', 'string'),
 ('BATHSTOTAL', 'bigint'),
 ('BATHDESC', 'string'),
 ('ROOMAREA5', 'string'),
 ('ROOMFLOOR5', 'string'),
 ('ROOMAREA6', 'string'),
 ('ROOMFLOOR6', 'string'),
 ('ROOMAREA7', 'string'),
 ('ROOMFLOOR7', 'string'),
 ('ROOMAREA8', 'string'),
 ('ROOMFLOOR8', 'string'),
 ('BEDROOMS', 'bigint'),
 ('SQFTBELOWGROUND', 'bigint'),
 ('ASSUMABLEMORTGAGE', 'string'),
 ('ASSOCIATIONFEE', 'bigint'),
 ('ASSESSMENTPENDING', 'string'),
 ('ASSESSEDVALUATION', 'double')]

In [None]:
old_columns = df.columns

In [None]:
new_columns = [c for c, d in required_dtypes]

In [None]:
for n, o in zip(new_columns, old_columns): 
    df = df.withColumnRenamed(o, n)

In [None]:
#Checking the transformation of column names to capital letters
df.dtypes

In [None]:
for required_type, current_column in zip(required_dtypes, df.columns):
    # since the required and current column names are the exact order we can do:
    if required_type[1] != 'string':
        df = df.withColumn(current_column, df["{:}".format(current_column)].cast(required_type[1]))

In [None]:
df.dtypes

In [None]:
check_columns = ['FOUNDATIONSIZE',
 'DAYSONMARKET',
 'FIREPLACES',
 'PDOM',
 'SQFTABOVEGROUND',
 'TAXES',
 'TAXWITHASSESSMENTS',
 'TAXYEAR',
 'LIVINGAREA',
 'YEARBUILT',
 'ACRES',
 'BACKONMARKETDATE',
 'BATHSFULL',
 'BATHSHALF',
 'BATHQUARTER',
 'BATHSTHREEQUARTER',
 'BATHSTOTAL',
 'BEDROOMS',
 'SQFTBELOWGROUND',
 'ASSOCIATIONFEE',
 'ASSESSEDVALUATION']

In [None]:
# Name and value of col with max corr
corr_max = 0
corr_max_col = check_columns[0]

# Loop to check all columns contained in list
for col in check_columns:
    # Check the correlation of a pair of columns
    corr_val = df.corr(col, 'SALESCLOSEPRICE')
    # Logic to compare corr_max with current corr_val
    if corr_val > corr_max:
        # Update the column name and corr value
        corr_max = corr_val
        corr_max_col = col

print(corr_max_col)

# 5. Visualizations

In [None]:
%matplotlib inline

import matplotlib.pyplot as plt
import seaborn as sns

import warnings
warnings.filterwarnings("ignore")

In [None]:
# Select a single column and sample and convert to pandas
# sample 50% and not use replacement and setting the random seed to 42.
sample_df = df.select(['LISTPRICE']).sample(False, .5, 42)
pandas_df = sample_df.toPandas()

# Plot distribution of pandas_df and display plot
sns.distplot(pandas_df)
plt.show()

# Import skewness function
from pyspark.sql.functions import skewness

# Compute and print skewness of LISTPRICE
print(df.agg({'LISTPRICE': 'skewness'}).collect())

In [None]:
# Select a the relevant columns and sample
sample_df = df.select(['SALESCLOSEPRICE','LIVINGAREA']).sample(False, .5, 42)

# Convert to pandas dataframe
pandas_df = sample_df.toPandas()

In [None]:
# Linear model plot of pandas_df
sns.lmplot(x='LIVINGAREA', y='SALESCLOSEPRICE', data=pandas_df)

We can see a relation. If Livingarea increase, the salescloseprice also increases.

# 6. Preprocessing data II

### 6.1 Dropping a list of columns

In [None]:
# List of columns to remove from dataset
cols_to_drop = ['STREETNUMBERNUMERIC', 'LOTSIZEDIMENSIONS']

# Drop columns in list
df = df.drop(*cols_to_drop)

### 6.2 Using text filters to remove records

In [None]:
# Inspect unique values in the column 'ASSUMABLEMORTGAGE'
df.select(['ASSUMABLEMORTGAGE']).distinct().show()

In [None]:
df.groupBy('ASSUMABLEMORTGAGE').count().show()

In [None]:
df.count()

In [None]:
# List of possible values containing 'yes'
yes_values = ['Yes w/ Qualifying', 'Yes w/No Qualifying']

# Filter the text values out of df but keep null values
text_filter = ~df['ASSUMABLEMORTGAGE'].isin(yes_values) | df['ASSUMABLEMORTGAGE'].isNull()
df = df.where(text_filter)

In [None]:
df.count()

In [None]:
# We kept values that were null and values that where not in the list provided.
df.groupBy('ASSUMABLEMORTGAGE').count().show()

### 6.3 Filtering numeric fields conditionally

In [None]:
#Changing column to his log value.
from pyspark.sql.functions import log
df = df.withColumn('log_SalesClosePrice', log('SalesClosePrice'))

In [None]:
# Select a the relevant columns and sample
sample_df = df.select(['log_SalesClosePrice'])

# Convert to pandas dataframe
pandas_df = sample_df.toPandas()

In [None]:
pandas_df.head()

In [None]:
sns.boxplot(x=pandas_df["log_SalesClosePrice"])

In [None]:
from pyspark.sql.functions import mean, stddev

# Calculate values used for outlier filtering
mean_val = df.agg({'log_SalesClosePrice': 'mean'}).collect()[0][0]
stddev_val = df.agg({'log_SalesClosePrice': 'stddev'}).collect()[0][0]

In [None]:
# Create three standard deviation (μ ± 3σ) lower and upper bounds for data
low_bound = mean_val - (3 * stddev_val)
hi_bound = mean_val + (3 * stddev_val)

In [None]:
df.count()

In [None]:
# Filter the data to fit between the lower and upper bounds
df = df.where((df['log_SalesClosePrice'] < hi_bound) & (df['log_SalesClosePrice'] > low_bound))

In [None]:
df.count()

In [None]:
# Check that we have deleted outliers
sample_df = df.select(['log_SalesClosePrice'])
pandas_df = sample_df.toPandas()
sns.boxplot(x=pandas_df["log_SalesClosePrice"])

### 6.4 Custom Percentage Scaling

Creating a manual scaling of the Daysonmarket column

In [None]:
from pyspark.sql.functions import round

In [None]:
# Define max and min values and collect them
max_days = df.agg({'DAYSONMARKET': 'max'}).collect()[0][0]
min_days = df.agg({'DAYSONMARKET': 'min'}).collect()[0][0]

In [None]:
# Create a new column based off the scaled data using the formula manually
df = df.withColumn('percentage_scaled_days', 
                  round((df['DAYSONMARKET'] - min_days) / (max_days - min_days)) * 100)


In [None]:
# Calc max and min for new column
print(df.agg({'percentage_scaled_days': 'max'}).collect())
print(df.agg({'percentage_scaled_days': 'min'}).collect())

In [None]:
sample_df = df.select(['DAYSONMARKET'])
pandas_df = sample_df.toPandas()
sns.boxplot(x=pandas_df["DAYSONMARKET"])

In [None]:
sample_df = df.select(['percentage_scaled_days'])
pandas_df = sample_df.toPandas()
sns.boxplot(x=pandas_df["percentage_scaled_days"])

### 6.5 Scaling your scalers

Creating a function that will scale automatically the desired features.

In [None]:
def min_max_scaler(df, cols_to_scale):
    # Takes a dataframe and list of columns to minmax scale. Returns a dataframe.
    for col in cols_to_scale:
        # Define min and max values and collect them
        max_days = df.agg({col: 'max'}).collect()[0][0]
        min_days = df.agg({col: 'min'}).collect()[0][0]
        new_column_name = 'scaled_' + col
        # Create a new column based off the scaled data
        df = df.withColumn(new_column_name, 
                          (df[col] - min_days) / (max_days - min_days))
    return df

In [None]:
df = min_max_scaler(df, ['FOUNDATIONSIZE', 'DAYSONMARKET', 'FIREPLACES'])

In [None]:
# Show that our data is now between 0 and 1
df[['DAYSONMARKET', 'scaled_DAYSONMARKET']].show()

### 6.6 Correcting Right Skew Data

In [None]:
# Compute the skewness
print(df.agg({'YEARBUILT': 'skewness'}).collect())

# Calculate the max year
max_year = df.agg({'YEARBUILT': 'max'}).collect()[0][0]

# Create a new column of reflected data
df = df.withColumn('Reflect_YearBuilt', (max_year + 1) - df['YEARBUILT'])

# Create a new column based reflected data
df = df.withColumn('adj_yearbuilt', 1 / log(df['Reflect_YearBuilt']))

### 6.7 Visualizing Missing Data

In [None]:
columns = ['APPLIANCES',
 'BACKONMARKETDATE',
 'ROOMFAMILYCHAR',
 'BASEMENT',
 'DININGROOMDESCRIPTION']

In [None]:
df.select(columns).show()

In [None]:
# Sample the dataframe and convert to Pandas
sample_df = df.select(columns).sample(False, 0.5, 42)
pandas_df = sample_df.toPandas()

# Convert all values to T/F
tf_df = pandas_df.isnull()

In [None]:
# Plot it
sns.heatmap(data=tf_df)
plt.xticks(rotation=30, fontsize=10)
plt.yticks(rotation=0, fontsize=10)
plt.show()

### 6.8 Imputing Missing Data

In [None]:
# Count missing rows
# In this case there aren't nulls. But the way to input would be the same.
df.where(df['PDOM'].isNull()).count()

In [None]:
# Calculate the mean value
col_mean = df.agg({'PDOM': 'mean'}).collect()[0][0]

In [None]:
# Replacing with the mean value for that column
df.fillna(col_mean, subset=['PDOM'])

In [None]:
# Count missing rows after imputing 
df.where(df['PDOM'].isNull()).count()

### 6.9 Calculate Missing Percents

We are going to create a function that will drop columns that have X percentage os missing values.

In [None]:
def column_dropper(df, threshold):
    # Takes a dataframe and threshold for missing values. Returns a dataframe.
    total_records = df.count()
    for col in df.columns:
    # Calculate the percentage of missing values
        missing = df.where(df[col].isNull()).count()
        missing_percent = missing / total_records
        # Drop column if percent of missing is more than threshold
        if missing_percent > threshold:
            df = df.drop(col)
    return df

In [None]:
#Number of columns before
print(len(df.columns))

In [None]:
# Drop columns that are more than 60% missing
df = column_dropper(df, .6)

In [None]:
#Number of columns after
print(len(df.columns))

# 7. Feature Engineering

### 7.1 Differences

In [None]:
# Lot size in square feet
acres_to_sqfeet = 43560
df = df.withColumn('LOT_SIZE_SQFT', df['ACRES'] * acres_to_sqfeet)

In [None]:
# Create new column YARD_SIZE
df = df.withColumn('YARD_SIZE', df['LOT_SIZE_SQFT'] - df['FOUNDATIONSIZE'])

In [None]:
# Corr of ACRES vs SALESCLOSEPRICE
print("Corr of ACRES vs SALESCLOSEPRICE: " + str(df.corr('ACRES', 'SALESCLOSEPRICE')))
# Corr of FOUNDATIONSIZE vs SALESCLOSEPRICE
print("Corr of FOUNDATIONSIZE vs SALESCLOSEPRICE: " + str(df.corr('FOUNDATIONSIZE', 'SALESCLOSEPRICE')))
# Corr of YARD_SIZE vs SALESCLOSEPRICE
print("Corr of YARD_SIZE vs SALESCLOSEPRICE: " + str(df.corr('YARD_SIZE', 'SALESCLOSEPRICE')))

### 7.2 Ratios

In [None]:
# ASSESSED_TO_LIST
df = df.withColumn('ASSESSED_TO_LIST', df['ASSESSEDVALUATION'] / df['LISTPRICE'])
df[['ASSESSEDVALUATION', 'LISTPRICE', 'ASSESSED_TO_LIST']].show(5)

In [None]:
# TAX_TO_LIST
df = df.withColumn('TAX_TO_LIST', df['TAXES'] / df['LISTPRICE'])
df[['TAX_TO_LIST', 'TAXES', 'LISTPRICE']].show(5)

In [None]:
# BED_TO_BATHS
df = df.withColumn('BED_TO_BATHS', df['BEDROOMS'] / df['BATHSTOTAL'])
df[['BED_TO_BATHS', 'BEDROOMS', 'BATHSTOTAL']].show(5)

### 7.3 Deeper Features

In [None]:
from scipy import stats

In [None]:
def r2(x, y):
    return stats.pearsonr(x, y)[0] ** 2

In [None]:
# Create new feature by adding two features together
df = df.withColumn('Total_SQFT', df['SQFTBELOWGROUND'] + df['SQFTABOVEGROUND'])

In [None]:
# Create additional new feature using previously created feature
df = df.withColumn('BATHS_PER_1000SQFT', df['BATHSTOTAL'] / (df['Total_SQFT'] / 1000))
df[['BATHS_PER_1000SQFT']].describe().show()

In [None]:
# Sample and create pandas dataframe
pandas_df = df.sample(False, 0.5, 0).toPandas()

In [None]:
pandas_df=pandas_df[pandas_df["BATHS_PER_1000SQFT"]<1000]

In [None]:
# Linear model plots
sns.jointplot(x='Total_SQFT', y='SALESCLOSEPRICE', data=pandas_df)
sns.jointplot(x='BATHS_PER_1000SQFT', y='SALESCLOSEPRICE', data=pandas_df)

### 7.4 Time Components

In [None]:

# Import needed functions
from pyspark.sql.functions import to_date, dayofweek

#Important to set this. If not, issues with datetime appear
my_spark.sql("set spark.sql.legacy.timeParserPolicy=LEGACY")

In [None]:
# Convert to date type
df = df.withColumn('LISTDATE', to_date(df['LISTDATE'], format='MM/dd/yyyy HH:mm'))

In [None]:
# Get the day of the week
df = df.withColumn('List_Day_of_Week', dayofweek(df['LISTDATE']))

In [None]:
# Sample and convert to pandas dataframe
sample_df = df.sample(False, .5, 42).toPandas()

In [None]:
# Plot count plot of of day of week
sns.countplot(x="List_Day_of_Week", data=sample_df)
plt.show()

### 7.5 Joining On Time Components

In [None]:
import pandas as pd
data = dict(City=['LELM - Lake Elmo', 'MAPW - Maplewood','STP - Saint Paul','WB - Woodbury', \
                  'OAKD - Oakdale', 'LELM - Lake Elmo', 'MAPW - Maplewood', \
                  'STP - Saint Paul', 'WB - Woodbury', 'OAKD - Oakdale'],
     MedianHomeValue=[401000, 193000, 172000, 291000, 210000, 385000, 187000, 162000, 277000, 192000],
     Year= [2016,2016,2016,2016,2016,2015,2015,2015,2015, 2015])

In [None]:
df_price = pd.DataFrame(data)

In [None]:
price_df = my_spark.createDataFrame(df_price)
price_df.show()

In [None]:
from pyspark.sql.functions import year

# Create year column
df = df.withColumn('list_year', year(df['LISTDATE']))

# Adjust year to match
df = df.withColumn('report_year', (df['list_year'] - 1))

In [None]:
# Create join condition
condition = [df['CITY'] == price_df['City'], df['report_year'] == price_df['year']]

# Join the dataframes together
df = df.join(price_df, on=condition, how='left')

In [None]:
# Inspect that new columns are available
df[['MedianHomeValue']].show()

In [None]:
df[['MedianHomeValue']].describe().show()

### 7.6 Date Math

In [None]:
from pyspark.sql.functions import lag, datediff, to_date
from pyspark.sql.window import Window

In [None]:
# Cast data type
mort_df = df.withColumn('DATE', to_date(df['LISTDATE']))

In [None]:
# Create window
w = Window().orderBy(mort_df['DATE'])
# Create lag column
mort_df = mort_df.withColumn('DATE-1', lag(mort_df['DATE'], offset=1).over(w))

In [None]:
mort_df.select(['DATE','DATE-1']).show(10)

In [None]:
# Calculate difference between date columns
mort_df = mort_df.withColumn('Days_Between_Report', datediff(mort_df['DATE'], mort_df['DATE-1']))

In [None]:
mort_df.select(['DATE','DATE-1',"Days_Between_Report"]).show(10)

In [None]:
# Print results
mort_df.select('Days_Between_Report').distinct().show()

### 7.7 Extracting Text to New Features

In [None]:
# Import needed functions
from pyspark.sql.functions import when

In [None]:
# Create boolean conditions for string matches
has_attached_garage = df['GARAGEDESCRIPTION'].like('%Attached%')
has_detached_garage = df['GARAGEDESCRIPTION'].like('%Detached%')


In [None]:
# Conditional value assignment 
df = df.withColumn('has_attached_garage', (when(has_attached_garage, 1)
                                          .when(has_detached_garage, 0)
                                          .otherwise(None)))


In [None]:
# Inspect results
df[['GARAGEDESCRIPTION', 'has_attached_garage']].show(truncate=100)

Sources:
https://github.com/ozlerhakan/datacamp/blob/master/Feature%20Engineering%20with%20PySpark/Feature%20Engineering%20with%20PySpark.ipynb