In [40]:
%load_ext autoreload 
%autoreload 2

%matplotlib inline

The autoreload extension is already loaded. To reload it, use:
  %reload_ext autoreload


In [38]:
name = 'Behtash'

In [39]:
f'Hello {name}'

'Hello Behtash'

In [1]:
from pyspark.sql import SparkSession, Row
import numpy as np
import pandas as pd
import feather
from pyspark.ml.stat import Correlation
#from pyspark.sql.functions import * 
from pyspark.sql.functions import avg, log, col, isnull
from pyspark.sql.functions import year, month, count
import pyspark.sql.functions as fcn 
from pyspark.sql.functions import pandas_udf, PandasUDFType
from pyspark.sql.types import IntegerType, StringType, TimestampType
#------------------------------------------
from sklearn.metrics import accuracy_score
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.feature import StandardScaler
from pyspark.ml.feature import StringIndexer, OneHotEncoder
from pyspark.ml.regression import RandomForestRegressor
#from pyspark.ml.evaluation import BinaryClassificationEvaluator
#from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder, TrainValidationSplit
from pyspark.ml import Pipeline

In [2]:
spark = SparkSession.builder.appName('Bulldozer').getOrCreate()

# Load the data 
### Convention : use 'r' for 'raw'

In [5]:
# CV Set
# Description of the abbreviations 
# rdt: raw data  # prs: parasitics (may include columns that will be deleted) # rlbl: raw label

rdt_t_prs_rlbl = spark.read.csv('train.csv', inferSchema=True, header=True)   
#===============================================================

# Test Set
rdt_T_t_prs = spark.read.csv('Valid.csv', inferSchema=True, header=True) 

# Number of Rows and Columns 

In [6]:
%%time
NumRows_CV = rdt_t_prs_rlbl.count()
NumCols_CV = len(rdt_t_prs_rlbl.columns)
print('No. of rows in the training set: ' , NumRows_CV)
print('No. of columns in the training set: ' , NumCols_CV)
 
print('-----------------------------------------')
NumRows_T = rdt_T_t_prs.count()
NumCols_T = len(rdt_T_t_prs.columns)
print('No. of rows in the validation set: ' , NumRows_T)
print('No. of columns in the validation set: ' , NumCols_T)


No. of rows in the training set:  401125
No. of columns in the training set:  53
-----------------------------------------
No. of rows in the validation set:  11573
No. of columns in the validation set:  52
Wall time: 1.02 s


# Print Schema

In [5]:
rdt_t_prs_rlbl.printSchema()

# Change column names if necessary 
'''
for name in df.schema.names:
  df = df.withColumnRenamed(name, name.replace(' ', ''))'''

root
 |-- SalesID: integer (nullable = true)
 |-- SalePrice: integer (nullable = true)
 |-- MachineID: integer (nullable = true)
 |-- ModelID: integer (nullable = true)
 |-- datasource: integer (nullable = true)
 |-- auctioneerID: integer (nullable = true)
 |-- YearMade: integer (nullable = true)
 |-- MachineHoursCurrentMeter: integer (nullable = true)
 |-- UsageBand: string (nullable = true)
 |-- saledate: string (nullable = true)
 |-- fiModelDesc: string (nullable = true)
 |-- fiBaseModel: string (nullable = true)
 |-- fiSecondaryDesc: string (nullable = true)
 |-- fiModelSeries: string (nullable = true)
 |-- fiModelDescriptor: string (nullable = true)
 |-- ProductSize: string (nullable = true)
 |-- fiProductClassDesc: string (nullable = true)
 |-- state: string (nullable = true)
 |-- ProductGroup: string (nullable = true)
 |-- ProductGroupDesc: string (nullable = true)
 |-- Drive_System: string (nullable = true)
 |-- Enclosure: string (nullable = true)
 |-- Forks: string (nullable =

"\nfor name in df.schema.names:\n  df = df.withColumnRenamed(name, name.replace(' ', ''))"

# Convert all forms of "null" to Pyspark null

In [21]:
%%time 
Null_Representations =  ["None or Unspecified", "NA", "NaN"]   # This list should be defined based on each dataframe
cols = [fcn.when(~col(c).isin(*Null_Representations), col(c)).alias(c)  for c in rdt_t_prs_rlbl.columns]
rdt_t_prs_rlbl = rdt_t_prs_rlbl.select(*cols)

Wall time: 839 ms


In [23]:
# Perform spot tests 
%time rdt_t_prs_rlbl.select('Ripper').show()

+------+
|Ripper|
+------+
|  null|
|  null|
|  null|
|  null|
|  null|
|  null|
|  null|
|  null|
|  null|
|  null|
|  null|
|  null|
|  null|
|  null|
|  null|
|  null|
|  null|
|  null|
|  null|
|  null|
+------+
only showing top 20 rows

Wall time: 219 ms


# Perform operations on the label column if necessary

In [24]:
rdt_t_prs = rdt_t_prs_rlbl.withColumn('logSalePrice',log(rdt_t_prs_rlbl.SalePrice)).drop('SalePrice')
rdt_t_prs.printSchema()

root
 |-- SalesID: integer (nullable = true)
 |-- MachineID: integer (nullable = true)
 |-- ModelID: integer (nullable = true)
 |-- datasource: integer (nullable = true)
 |-- auctioneerID: integer (nullable = true)
 |-- YearMade: integer (nullable = true)
 |-- MachineHoursCurrentMeter: integer (nullable = true)
 |-- UsageBand: string (nullable = true)
 |-- saledate: string (nullable = true)
 |-- fiModelDesc: string (nullable = true)
 |-- fiBaseModel: string (nullable = true)
 |-- fiSecondaryDesc: string (nullable = true)
 |-- fiModelSeries: string (nullable = true)
 |-- fiModelDescriptor: string (nullable = true)
 |-- ProductSize: string (nullable = true)
 |-- fiProductClassDesc: string (nullable = true)
 |-- state: string (nullable = true)
 |-- ProductGroup: string (nullable = true)
 |-- ProductGroupDesc: string (nullable = true)
 |-- Drive_System: string (nullable = true)
 |-- Enclosure: string (nullable = true)
 |-- Forks: string (nullable = true)
 |-- Pad_Type: string (nullable = t

# Identify the Label and Index columns

In [11]:
LBL = 'logSalePrice'
IDX = 'SalesID'

# ==========================

# Handling datetime 

# ==========================

### Define the Pandas function to convert the string series to timestamp

In [26]:
def pd_StringToTime(s):    
    '''
    The current format to be converted is: 'YYYY/M/D 0:00'
    '''
    return pd.to_datetime(s)

### Define the pandas_udf function

In [27]:
# Syntax: pyspark.sql.functions.pandas_udf(f=None, returnType=None, functionType=None)[source]
Time_udf = pandas_udf(pd_StringToTime, TimestampType())   

### Apply the pandas_udf function to timeseries (string type) of the orignial dataframe 
### Add time-features 

In [28]:
%%time 

# CV set 

rdt_prs = rdt_t_prs.withColumn('Sale_Date',    Time_udf(rdt_t_prs.saledate)).drop('saledate')\
                      .withColumn('Year', fcn.year('Sale_Date'))\
                      .withColumn('yDay', fcn.dayofyear('Sale_Date'))\
                      .withColumn('mDay', fcn.dayofmonth('Sale_Date'))\
                      .withColumn('wDay', fcn.dayofweek('Sale_Date')).orderBy('Sale_Date')#.drop('Sale_Date')


# ================================================================================================


# Test set 
rdt_T_prs = rdt_T_t_prs.withColumn('Sale_Date', Time_udf(rdt_T_t_prs.saledate)).drop('saledate')\
                      .withColumn('Year', fcn.year('Sale_Date'))\
                      .withColumn('yDay', fcn.dayofyear('Sale_Date'))\
                      .withColumn('mDay', fcn.dayofmonth('Sale_Date'))\
                      .withColumn('wDay', fcn.dayofweek('Sale_Date')).orderBy('Sale_Date')#.drop('Sale_Date')




Wall time: 135 ms


# Have a look at the first few rows

In [29]:
pd.set_option('display.max_columns', 100)

pd.DataFrame(rdt_prs.head(8))



Unnamed: 0,0,1,2,3,4,5,6,7,8,9,10,11,12,13,14,15,16,17,18,19,20,21,22,23,24,25,26,27,28,29,30,31,32,33,34,35,36,37,38,39,40,41,42,43,44,45,46,47,48,49,50,51,52,53,54,55,56
0,1646770,1126363,8434,132,18,1974,,,TD20,TD20,,,,Medium,"Track Type Tractor, Dozer - 105.0 to 130.0 Hor...",Texas,TTT,Track Type Tractors,,OROPS,,,,,Direct Drive,,,,,,2 Valve,,,,,,,,,,,,,,,,,Straight,,,,9.159047,1989-01-17,1989,17,17,3
1,1451684,1434619,3854,132,99,1969,,,966C,966,C,,,Medium,Wheel Loader - 150.0 to 175.0 Horsepower,Florida,WL,Wheel Loader,,OROPS,,,,,,,,,,,2 Valve,,,,,,,,,,,,,,,,,,,Standard,Conventional,10.043249,1989-01-31,1989,31,31,3
2,1671174,1327630,8591,132,99,1980,,,A62,A62,,,,,Wheel Loader - Unidentified,Florida,WL,Wheel Loader,,EROPS,,,,,,,,,,,2 Valve,,,,,,,,,,,,,,,,,,,Standard,Conventional,9.680344,1989-01-31,1989,31,31,3
3,1449754,1486544,3356,132,99,1967,,,12F,12,F,,,,Motorgrader - 45.0 to 130.0 Horsepower,Florida,MG,Motor Graders,No,OROPS,,,,,,,,,,No,Base + 1 Function,,,Yes,Sideshift & Tip,,,,,,,,,,,,,,,,,9.305651,1989-01-31,1989,31,31,3
4,1725330,1538651,8948,132,99,1968,,,466,466,,,,Large / Medium,"Hydraulic Excavator, Track - 24.0 to 28.0 Metr...",Florida,TEX,Track Excavators,,EROPS,,,,,,,,,,,Standard,,,,,,,,,,Steel,,,,,Double,,,,,,10.126631,1989-01-31,1989,31,31,3
5,1728883,1523610,9105,132,99,1986,,,WA150,WA150,,,,,Wheel Loader - 90.0 to 100.0 Horsepower,Florida,WL,Wheel Loader,,EROPS,,,,,,,,,,,,,,,,26.5,,,,,,,,,,,,,,,,10.308953,1989-01-31,1989,31,31,3
6,1740836,1531656,11933,132,99,1975,,,D65,D65,,,,Large,"Track Type Tractor, Dozer - 190.0 to 260.0 Hor...",Florida,TTT,Track Type Tractors,,OROPS,,,,,Standard,,,,,,2 Valve,,Single Shank,,,,,,,,,,,,,,,Straight,,,,9.21034,1989-01-31,1989,31,31,3
7,1740272,1090529,8988,132,18,1979,,,D31,D31,,,,,"Track Type Tractor, Dozer - 20.0 to 75.0 Horse...",Georgia,TTT,Track Type Tractors,,OROPS,,,,,Powershift,,,,,,2 Valve,,,,,,,,,,,,,,,,,PAT,,,,9.21034,1989-01-31,1989,31,31,3


# Check the percentage of Null values in each column

## CV Set Null counts 

In [32]:
'''
%%time 
NullCheck = rdt_prs.select([fcn.bround(((fcn.sum(fcn.when(isnull(c), 1).otherwise(0)))/NumRows_CV*100),2).alias(c)\
                   for c in rdt_prs.columns])
'''

'\n%%time \nNullCheck = rdt_prs.select([fcn.bround(((fcn.sum(fcn.when(isnull(c), 1).otherwise(0)))/NumRows_CV*100),2).alias(c)                   for c in rdt_prs.columns])\n'

In [33]:
'''
counter = 0
thresh = 50
for c in NullCheck.columns:
    if NullCheck.select(c).collect()[0][0]>thresh:
        print(c,NullCheck.select(c).collect()[0][0])  
        counter +=1
print('No. of columns with more than {}% Null values; '.format(thresh), counter) 

'''

"\ncounter = 0\nthresh = 50\nfor c in NullCheck.columns:\n    if NullCheck.select(c).collect()[0][0]>thresh:\n        print(c,NullCheck.select(c).collect()[0][0])  \n        counter +=1\nprint('No. of columns with more than {}% Null values; '.format(thresh), counter) \n\n"

## Test Set Null counts 

In [34]:
'''
NullCheck = rdt_prs.select([fcn.bround(((fcn.sum(fcn.when(isnull(c), 1).otherwise(0)))/rdt_prs.count()*100),2).alias(c)\
                   for c in rdt_prs.columns])
'''

'\nNullCheck = rdt_prs.select([fcn.bround(((fcn.sum(fcn.when(isnull(c), 1).otherwise(0)))/rdt_prs.count()*100),2).alias(c)                   for c in rdt_prs.columns])\n'

In [35]:
'''
counter = 0
thresh = 50
for c in NullCheck_T.columns:
    if NullCheck_T.select(c).collect()[0][0]>thresh:
        print(c, NullCheck_T.select(c).collect()[0][0])  
        counter +=1
print('No. of columns with more than {}% Null values; '.format(thresh), counter) 
'''

"\ncounter = 0\nthresh = 50\nfor c in NullCheck_T.columns:\n    if NullCheck_T.select(c).collect()[0][0]>thresh:\n        print(c, NullCheck_T.select(c).collect()[0][0])  \n        counter +=1\nprint('No. of columns with more than {}% Null values; '.format(thresh), counter) \n"

# Generate the list of string columns : includes all columns

In [37]:
STR_clms_prs = [List_element[0] for List_element in rdt_prs.dtypes if List_element[1].startswith('string')]
print('No. of string columns in the test set: ',len(STR_clms_prs))
STR_clms_prs

No. of string columns in the test set:  44


['UsageBand',
 'fiModelDesc',
 'fiBaseModel',
 'fiSecondaryDesc',
 'fiModelSeries',
 'fiModelDescriptor',
 'ProductSize',
 'fiProductClassDesc',
 'state',
 'ProductGroup',
 'ProductGroupDesc',
 'Drive_System',
 'Enclosure',
 'Forks',
 'Pad_Type',
 'Ride_Control',
 'Stick',
 'Transmission',
 'Turbocharged',
 'Blade_Extension',
 'Blade_Width',
 'Enclosure_Type',
 'Engine_Horsepower',
 'Hydraulics',
 'Pushblock',
 'Ripper',
 'Scarifier',
 'Tip_Control',
 'Tire_Size',
 'Coupler',
 'Coupler_System',
 'Grouser_Tracks',
 'Hydraulics_Flow',
 'Track_Type',
 'Undercarriage_Pad_Width',
 'Stick_Length',
 'Thumb',
 'Pattern_Changer',
 'Grouser_Type',
 'Backhoe_Mounting',
 'Blade_Type',
 'Travel_Controls',
 'Differential_Type',
 'Steering_Controls']

# Check out the uniqueness of values in each column
### The idea is to see if each string column is worthwhile for ML training and prediction
### Large ratios (close to 1) show that there are very few categorical variables that have been repeated
### Small ratios (close to 0) show that there are many examples to learn from for each distinct value 

###   !  Becareful about the append function  !


In [68]:
%%time 
# 1min 1s 
minRep_cnt = 5       # Minimum allowed AVERAGE number of repetitions for each feature   
maxBin_cnt = 100

# The initial columns are the ones with large percentage of Nulls
clms_drop_list = []
Dis_counts     = np.array([])
Rep_counts     = np.array([])


for c in STR_clms_prs:
    if c != IDX and c != LBL:          # Index columns will not be used as a feature
        '''
        Records columns that have less than Rep_cnt samples per feature
        Recrods columns that have more than maxBin_cnt in feature veriety  
        '''
        Distinct_cnt   = rdt_prs.select(c).distinct().count()
        Rep_cnt = NumRows_CV / Distinct_cnt
        
        if Rep_cnt < minRep_cnt or Distinct_cnt > maxBin_cnt:
            
            print(c, '|' ,  Distinct_cnt, '|', Rep_cnt)
            clms_drop_list.append(c)     
            np.append(Dis_counts, Distinct_cnt)
            np.append(Rep_counts, Rep_cnt)

print(clms_drop_list)        
print(len(clms_drop_list))      


fiModelDesc | 4999 | 80.24104820964193
fiBaseModel | 1950 | 205.7051282051282
fiSecondaryDesc | 176 | 2279.119318181818
fiModelSeries | 123 | 3261.1788617886177
fiModelDescriptor | 140 | 2865.1785714285716
['fiModelDesc', 'fiBaseModel', 'fiSecondaryDesc', 'fiModelSeries', 'fiModelDescriptor']
5
Wall time: 43min 26s


In [69]:
rdt_prs.columns

['SalesID',
 'MachineID',
 'ModelID',
 'datasource',
 'auctioneerID',
 'YearMade',
 'MachineHoursCurrentMeter',
 'UsageBand',
 'fiModelDesc',
 'fiBaseModel',
 'fiSecondaryDesc',
 'fiModelSeries',
 'fiModelDescriptor',
 'ProductSize',
 'fiProductClassDesc',
 'state',
 'ProductGroup',
 'ProductGroupDesc',
 'Drive_System',
 'Enclosure',
 'Forks',
 'Pad_Type',
 'Ride_Control',
 'Stick',
 'Transmission',
 'Turbocharged',
 'Blade_Extension',
 'Blade_Width',
 'Enclosure_Type',
 'Engine_Horsepower',
 'Hydraulics',
 'Pushblock',
 'Ripper',
 'Scarifier',
 'Tip_Control',
 'Tire_Size',
 'Coupler',
 'Coupler_System',
 'Grouser_Tracks',
 'Hydraulics_Flow',
 'Track_Type',
 'Undercarriage_Pad_Width',
 'Stick_Length',
 'Thumb',
 'Pattern_Changer',
 'Grouser_Type',
 'Backhoe_Mounting',
 'Blade_Type',
 'Travel_Controls',
 'Differential_Type',
 'Steering_Controls',
 'logSalePrice',
 'Sale_Date',
 'Year',
 'yDay',
 'mDay',
 'wDay']

# This segment filters out categorical features with large unique values


In [70]:
# CV Set
selec_lst   = list (set(rdt_prs.columns)  - set(clms_drop_list ))   # List of acceptable columns
rdt = rdt_prs.select(selec_lst)


# Test Set
selec_T_lst = list (set(rdt_T_prs.columns)  - set(clms_drop_list ))
rdt_T = rdt_T_prs.select(selec_T_lst)


In [71]:
rdt_prs.select(clms_drop_list).columns

['fiModelDesc',
 'fiBaseModel',
 'fiSecondaryDesc',
 'fiModelSeries',
 'fiModelDescriptor']

# Generate a list of string columns 

In [74]:
STR_clms = [List_element[0] for List_element in rdt.dtypes if List_element[1].startswith('string')]
print('No. of string columns in the test set: ',len(STR_clms))
STR_clms

No. of string columns in the test set:  39


['Stick',
 'Blade_Extension',
 'Track_Type',
 'state',
 'Tire_Size',
 'Coupler',
 'Stick_Length',
 'Enclosure_Type',
 'Transmission',
 'UsageBand',
 'Ride_Control',
 'Thumb',
 'Drive_System',
 'Forks',
 'Pattern_Changer',
 'Blade_Width',
 'Pushblock',
 'Hydraulics',
 'ProductSize',
 'Turbocharged',
 'Hydraulics_Flow',
 'Travel_Controls',
 'Engine_Horsepower',
 'ProductGroupDesc',
 'Scarifier',
 'Backhoe_Mounting',
 'Ripper',
 'Coupler_System',
 'Enclosure',
 'fiProductClassDesc',
 'Blade_Type',
 'Tip_Control',
 'Grouser_Tracks',
 'Grouser_Type',
 'Undercarriage_Pad_Width',
 'Pad_Type',
 'Differential_Type',
 'ProductGroup',
 'Steering_Controls']

# Generate the list of numerical columns

In [75]:
# CV Set 
NUM_clms = [item[0] for item in rdt.dtypes if item[1].startswith('int') or\
                                              item[1].startswith('double') or\
                                              item[1].startswith('float')]
print('No. of numerical columns in the test set: ',len(NUM_clms)) 
# =============================================================================

# Test Set
NUM_T_clms = [item[0] for item in rdt_T.dtypes if item[1].startswith('int') or\
                                                  item[1].startswith('double') or\
                                                  item[1].startswith('float')]
print(NUM_T_clms)

No. of numerical columns in the test set:  12
['yDay', 'ModelID', 'mDay', 'Year', 'datasource', 'SalesID', 'auctioneerID', 'YearMade', 'MachineID', 'wDay', 'MachineHoursCurrentMeter']


# Convert string columns to numerical indexes

In [76]:
%%time 

# Define the indexer for all string columns
indexers        = [StringIndexer(inputCol= c, outputCol= c+'_IDX', handleInvalid='keep').fit(rdt) for c in STR_clms]


# Create the pipeline
pipe_StrToIdx   = Pipeline(stages=indexers)


# Execute the pipeline to index the string columns
# Also drop the original string columns

# -------------------------------------------------------------------------------
# CV set from train data 
sfdt_IDXed         = pipe_StrToIdx.fit(rdt).transform(rdt).drop(*STR_clms)          #sf: string-filled
# ========================================================================

# Test set 
sfdt_T_IDXed       = pipe_StrToIdx.fit(rdt_T).transform(rdt_T).drop(*STR_clms)      #sf: string-filled
# -------------------------------------------------------------------------------


#sfdt_IDXed.show()
#sfdt_T_IDXed.show()

Wall time: 13min 23s


# Handling Null Values for numerical columns

### Null Percentages per column

#### CV Set

In [94]:
%%time
sfdt_IDXed.select([fcn.bround(((fcn.sum(when(fcn.isnull(c), 1).otherwise(0)))/NumRows_CV*100),2).alias(c)\
                   for c in sfdt_IDXed.columns]).show()

+----------+--------+----+----+------------+------------------------+-------+-------+----+------------+----------------+-------------------+--------------------+-----------------+--------------+----------------------+----------+------------------+-------------+----------------+----------------+--------------+---------------+---------------------+----------------+-------------+-------------------+------------------+---------------+--------------------+-------------------+----------------+------------+-------------------+---------------+--------------+----------------+-------------+---------+-------------------+---------------------+-------------+---------------------------+---------------------+---------+---------------+---------+---------------------+------------------+---------+-----------+---------------+-------------+----------------+
|datasource|YearMade|mDay|yDay|logSalePrice|MachineHoursCurrentMeter|SalesID|ModelID|wDay|auctioneerID|ProductGroup_IDX|Blade_Extension_IDX|Backhoe_Mo

#### Test Set

In [95]:
%%time 
sfdt_T_IDXed.select([fcn.bround(((fcn.sum(when(fcn.isnull(c), 1).otherwise(0)))/NumRows_T*100),2).alias(c)\
                     for c in sfdt_T_IDXed.columns]).show()

+----------+--------+----+----+------------------------+-------+-------+----+------------+----------------+-------------------+--------------------+-----------------+--------------+----------------------+----------+------------------+-------------+----------------+----------------+--------------+---------------+---------------------+----------------+-------------+-------------------+------------------+---------------+--------------------+-------------------+----------------+------------+-------------------+---------------+--------------+----------------+-------------+---------+-------------------+---------------------+-------------+---------------------------+---------------------+---------+---------------+---------+---------------------+------------------+---------+-----------+---------------+-------------+----------------+
|datasource|YearMade|mDay|yDay|MachineHoursCurrentMeter|SalesID|ModelID|wDay|auctioneerID|ProductGroup_IDX|Blade_Extension_IDX|Backhoe_Mounting_IDX|fiModelSeries_I

### Compute Null replacements if any 
#### Figure out the catogories and their frequencies

### Replacement value calculations  (only performed based on the CV set)

In [81]:
%%time 
# Calculate the median for numerical columns 
Med_dict = {}
for c in NUM_T_clms:
    Med_dict[c] = sfdt_IDXed.approxQuantile(c, [0.5], 0.05)[0]

Med_dict

Wall time: 8min 15s


In [82]:
Med_dict

{'yDay': 192.0,
 'ModelID': 4781.0,
 'mDay': 19.0,
 'Year': 2006.0,
 'datasource': 132.0,
 'SalesID': 1711491.0,
 'auctioneerID': 2.0,
 'YearMade': 1997.0,
 'MachineID': 1323881.0,
 'wDay': 5.0,
 'MachineHoursCurrentMeter': 672.0}

### Replace the Null values

In [83]:
%%time 

# CV Set 
fdt = sfdt_IDXed.na.fill(Med_dict)         # fdt: filled data (includes both string and numerical values)
fdt.show()

# ===================================================================

fdt_T = sfdt_T_IDXed.na.fill(Med_dict)     # fdt: filled data (includes both string and numerical values)
fdt_T.show()


+-------------------+----+------------------+-------+----+----+----------+-------+------------+--------+---------+----+------------------------+---------+-------------------+--------------+---------+-------------+-----------+----------------+------------------+----------------+-------------+----------------+---------+----------------+---------+-------------------+---------------+-------------+--------------+---------------+----------------+-------------------+-------------------+---------------------+--------------------+-------------+--------------------+----------+------------------+-------------+----------------------+--------------+---------------+------------------+----------------+---------------------------+------------+---------------------+----------------+---------------------+
|          Sale_Date|yDay|      logSalePrice|ModelID|mDay|Year|datasource|SalesID|auctioneerID|YearMade|MachineID|wDay|MachineHoursCurrentMeter|Stick_IDX|Blade_Extension_IDX|Track_Type_IDX|state_IDX|Ti

#### Check for any left over Null values

In [50]:
%%time 
# CV SET
fdt.dropna().count()/fdt.count()

Wall time: 15.7 s


1.0

In [51]:
%%time 
# Test Set
fdt_T.dropna().count()/fdt_T.count()

Wall time: 10.9 s


1.0

# Rename the processed dataframes as well as the labels column

In [84]:
# CV Set 
Xy = fdt.withColumnRenamed(LBL,'Labels')#.drop('ModelID','datasource','MachineID')

# ======================================

# Test Set (No Labels)
X = fdt_T


In [85]:
Xy.printSchema()

root
 |-- Sale_Date: timestamp (nullable = true)
 |-- yDay: integer (nullable = true)
 |-- Labels: double (nullable = true)
 |-- ModelID: integer (nullable = true)
 |-- mDay: integer (nullable = true)
 |-- Year: integer (nullable = true)
 |-- datasource: integer (nullable = true)
 |-- SalesID: integer (nullable = true)
 |-- auctioneerID: integer (nullable = true)
 |-- YearMade: integer (nullable = true)
 |-- MachineID: integer (nullable = true)
 |-- wDay: integer (nullable = true)
 |-- MachineHoursCurrentMeter: integer (nullable = true)
 |-- Stick_IDX: double (nullable = false)
 |-- Blade_Extension_IDX: double (nullable = false)
 |-- Track_Type_IDX: double (nullable = false)
 |-- state_IDX: double (nullable = false)
 |-- Tire_Size_IDX: double (nullable = false)
 |-- Coupler_IDX: double (nullable = false)
 |-- Stick_Length_IDX: double (nullable = false)
 |-- Enclosure_Type_IDX: double (nullable = false)
 |-- Transmission_IDX: double (nullable = false)
 |-- UsageBand_IDX: double (nullabl

# Save to feather format      =======================================

In [88]:
# CV Set 
pd_Xy = Xy.toPandas()

# Test Set 
pd_X =  X.toPandas()

In [89]:
# CV Set
pd_Xy.to_feather('Xy')

# Test Set
pd_X.to_feather('X')


In [3]:
Xy = spark.createDataFrame(pd.read_feather('Xy'))
X  = spark.createDataFrame(pd.read_feather('X'))

# EDA
### EDA-1 summary statistics

In [88]:
# CV Set
for c in Xy.columns:
    #XY.select(c).describe().select(bround(c,2)).show()
    Xy.select(c).describe().select('summary',bround(c,2).alias(c)).show()



+-------+--------+
|summary|FullBath|
+-------+--------+
|  count|  1460.0|
|   mean|    1.57|
| stddev|    0.55|
|    min|     0.0|
|    max|     3.0|
+-------+--------+

+-------+---------+
|summary|GrLivArea|
+-------+---------+
|  count|   1460.0|
|   mean|  1515.46|
| stddev|   525.48|
|    min|    334.0|
|    max|   5642.0|
+-------+---------+

+-------+------------+
|summary|BsmtHalfBath|
+-------+------------+
|  count|      1460.0|
|   mean|        0.06|
| stddev|        0.24|
|    min|         0.0|
|    max|         2.0|
+-------+------------+

+-------+-----------+
|summary|TotalBsmtSF|
+-------+-----------+
|  count|     1460.0|
|   mean|    1057.43|
| stddev|     438.71|
|    min|        0.0|
|    max|     6110.0|
+-------+-----------+

+-------+------------+
|summary|KitchenAbvGr|
+-------+------------+
|  count|      1460.0|
|   mean|        1.05|
| stddev|        0.22|
|    min|         0.0|
|    max|         3.0|
+-------+------------+

+-------+----------+
|summary|Ga

### EDA-2 vectorize

In [89]:
# Drop the index column for the CV set 
Xy_clms = Xy.columns # XY labels 
Xy_clms.remove(IDX)
# Drop the index column for the test set 
X_clms  = X.columns
X_clms.remove(IDX)


EDA_Vectorizer = VectorAssembler(inputCols= Xy_clms , outputCol='features_labels')
Vectorized_lng = EDA_Vectorizer.transform(Xy)
#Vectorized_lng.show(truncate=False)
Vectorized_sht = Vectorized_lng.select('features_labels')
Vectorized_sht.show()

+--------------------+
|     features_labels|
+--------------------+
|(80,[0,1,3,4,5,6,...|
|(80,[0,1,2,3,4,5,...|
|(80,[0,1,3,4,5,6,...|
|(80,[0,1,3,4,5,6,...|
|(80,[0,1,3,4,5,6,...|
|(80,[0,1,3,4,5,6,...|
|(80,[0,1,3,4,5,6,...|
|(80,[0,1,3,4,5,6,...|
|(80,[0,1,3,4,5,6,...|
|(80,[0,1,3,4,5,6,...|
|(80,[0,1,3,4,5,6,...|
|(80,[0,1,3,4,5,6,...|
|(80,[0,1,3,4,5,6,...|
|(80,[0,1,3,4,5,6,...|
|(80,[0,1,3,4,5,6,...|
|(80,[0,1,3,4,5,6,...|
|(80,[0,1,3,4,5,6,...|
|(80,[0,1,4,5,6,8,...|
|(80,[0,1,3,4,5,6,...|
|(80,[0,1,3,4,5,6,...|
+--------------------+
only showing top 20 rows



### EDA-3 correlations

In [90]:
# Calculate the pairwise correlations
CorrMat_Spark = Correlation.corr(Vectorized_sht,'features_labels').head()
#print("Pearson correlation matrix:\n" + str(CorrMat_Spark[0]))

#=======================================================================

# Find out the correlation matrix size

CorrMat_Spark[0]

DenseMatrix(80, 80, [1.0, 0.63, -0.0545, 0.3237, 0.1331, 0.4057, 0.5948, 0.1877, ..., 0.0082, 0.0039, 0.0255, -0.0181, -0.0148, 0.1351, -0.0221, 1.0], False)

#### Record the size

In [91]:
N_corr = 80

#### EDA-4 Convert to Pandas Dataframe For Visual Checks 

In [92]:
CorrMat_np = -2*np.ones( (N_corr,N_corr) )
for ii in range(N_corr):
    for jj in range(N_corr):
            CorrMat_np[ii,jj] = CorrMat_Spark[0][ii,jj]
CorrMat_np = np.around(CorrMat_np,2)
#CorrMat_np

# ----------------------------------------------------

CorrMat_pd = pd.DataFrame(CorrMat_np, columns = enumerate(Xy_clms))
#CorrMat_pd


Unnamed: 0,"(0, FullBath)","(1, GrLivArea)","(2, BsmtHalfBath)","(3, TotalBsmtSF)","(4, KitchenAbvGr)","(5, GarageArea)","(6, Labels)","(7, WoodDeckSF)","(8, BedroomAbvGr)","(9, PoolArea)","(10, LotArea)","(11, OverallQual)","(12, MasVnrArea)","(13, YearRemodAdd)","(14, OverallCond)","(15, LotFrontage)","(16, HalfBath)","(17, MSSubClass)","(18, BsmtFinSF1)","(19, GarageCars)","(20, TotRmsAbvGrd)","(21, GarageYrBlt)","(22, BsmtUnfSF)","(23, YearBuilt)","(24, YrSold)","(25, 3SsnPorch)","(26, MiscVal)","(27, MoSold)","(28, OpenPorchSF)","(29, 2ndFlrSF)","(30, LowQualFinSF)","(31, EnclosedPorch)","(32, ScreenPorch)","(33, BsmtFinSF2)","(34, 1stFlrSF)","(35, Fireplaces)","(36, BsmtFullBath)","(37, Heating_IDX)","(38, LandSlope_IDX)","(39, FireplaceQu_IDX)","(40, SaleCondition_IDX)","(41, RoofStyle_IDX)","(42, HouseStyle_IDX)","(43, MasVnrType_IDX)","(44, Exterior1st_IDX)","(45, CentralAir_IDX)","(46, GarageCond_IDX)","(47, Exterior2nd_IDX)","(48, BsmtFinType1_IDX)","(49, HeatingQC_IDX)","(50, Alley_IDX)","(51, RoofMatl_IDX)","(52, Functional_IDX)","(53, ExterQual_IDX)","(54, Condition1_IDX)","(55, MSZoning_IDX)","(56, LotShape_IDX)","(57, LandContour_IDX)","(58, Fence_IDX)","(59, Street_IDX)","(60, BsmtQual_IDX)","(61, Neighborhood_IDX)","(62, Foundation_IDX)","(63, GarageType_IDX)","(64, ExterCond_IDX)","(65, Utilities_IDX)","(66, BsmtCond_IDX)","(67, SaleType_IDX)","(68, BsmtExposure_IDX)","(69, BldgType_IDX)","(70, PoolQC_IDX)","(71, MiscFeature_IDX)","(72, KitchenQual_IDX)","(73, LotConfig_IDX)","(74, PavedDrive_IDX)","(75, Electrical_IDX)","(76, BsmtFinType2_IDX)","(77, GarageQual_IDX)","(78, GarageFinish_IDX)","(79, Condition2_IDX)"
0,1.00,0.63,-0.05,0.32,0.13,0.41,0.59,0.19,0.36,0.05,0.13,0.55,0.27,0.44,-0.19,0.12,0.14,0.13,0.06,0.47,0.55,0.14,0.29,0.47,-0.02,0.04,-0.01,0.06,0.26,0.42,-0.00,-0.12,-0.01,-0.08,0.38,0.24,-0.06,-0.03,-0.05,0.20,0.03,-0.01,-0.03,0.27,-0.17,-0.11,-0.14,-0.14,-0.24,-0.28,-0.01,0.01,-0.01,0.40,0.02,-0.06,0.18,-0.03,-0.19,-0.05,0.32,0.09,-0.35,-0.12,-0.09,-0.03,-0.02,0.05,0.09,0.06,0.06,-0.02,0.30,0.06,-0.14,-0.16,-0.07,-0.11,0.28,0.06
1,0.63,1.00,-0.02,0.45,0.10,0.47,0.70,0.25,0.52,0.17,0.26,0.59,0.39,0.29,-0.08,0.22,0.42,0.07,0.21,0.47,0.83,0.16,0.24,0.20,-0.04,0.02,-0.00,0.05,0.33,0.69,0.13,0.01,0.10,-0.01,0.57,0.46,0.03,-0.02,0.04,0.32,0.03,0.15,0.14,0.23,-0.01,-0.09,-0.11,-0.01,-0.15,-0.22,-0.00,0.19,0.06,0.37,0.08,-0.09,0.21,0.05,-0.10,-0.04,0.19,0.12,-0.20,-0.07,-0.05,-0.01,-0.05,0.04,0.07,-0.07,0.15,-0.02,0.31,0.06,-0.07,-0.12,-0.06,-0.06,0.19,0.09
2,-0.05,-0.02,1.00,-0.00,-0.04,-0.02,-0.01,0.04,0.05,0.02,0.05,-0.04,0.03,-0.01,0.12,-0.03,-0.01,-0.00,0.07,-0.02,-0.02,0.02,-0.10,-0.04,-0.05,0.04,-0.01,0.03,-0.03,-0.02,-0.01,-0.01,0.03,0.07,0.00,0.03,-0.15,-0.01,0.07,0.07,0.05,0.04,0.05,0.02,0.02,-0.04,-0.04,0.05,0.07,0.07,-0.04,0.02,-0.01,-0.07,-0.02,-0.06,0.06,0.03,0.03,-0.02,-0.05,0.01,0.03,-0.01,0.05,0.10,0.00,-0.00,0.02,-0.02,0.03,0.02,-0.03,0.04,-0.01,-0.02,0.06,-0.05,-0.05,-0.02
3,0.32,0.45,-0.00,1.00,-0.07,0.49,0.61,0.23,0.05,0.13,0.26,0.54,0.36,0.29,-0.17,0.24,-0.05,-0.24,0.52,0.43,0.29,0.18,0.42,0.39,-0.01,0.04,-0.02,0.01,0.25,-0.17,-0.03,-0.10,0.08,0.10,0.82,0.34,0.31,-0.14,0.03,0.20,0.04,0.16,-0.27,0.36,-0.09,-0.21,-0.15,-0.07,-0.20,-0.27,-0.11,0.20,-0.03,0.39,-0.00,-0.20,0.20,0.06,-0.11,-0.01,0.04,0.04,-0.38,-0.28,-0.13,-0.01,-0.33,0.09,-0.01,-0.13,0.10,-0.06,0.28,0.04,-0.19,-0.18,-0.11,-0.13,0.20,0.05
4,0.13,0.10,-0.04,-0.07,1.00,-0.06,-0.15,-0.09,0.20,-0.01,-0.02,-0.18,-0.04,-0.15,-0.09,0.03,-0.07,0.28,-0.08,-0.05,0.26,-0.16,0.03,-0.17,0.03,-0.02,0.06,0.03,-0.07,0.06,0.01,0.04,-0.05,-0.04,0.07,-0.12,-0.04,0.08,-0.04,-0.11,0.12,0.01,0.07,-0.06,0.10,0.25,0.14,0.12,0.10,0.14,0.02,0.02,0.03,-0.09,0.04,0.09,-0.09,0.02,-0.06,-0.01,0.07,-0.06,0.20,0.22,0.01,-0.01,0.22,0.03,0.10,0.50,-0.01,0.05,-0.10,-0.03,0.11,0.16,0.07,0.14,-0.04,0.15
5,0.41,0.47,-0.02,0.49,-0.06,1.00,0.65,0.22,0.07,0.06,0.18,0.56,0.37,0.37,-0.15,0.20,0.16,-0.10,0.30,0.88,0.34,0.56,0.18,0.48,-0.03,0.04,-0.03,0.03,0.24,0.14,-0.07,-0.12,0.05,-0.02,0.49,0.27,0.18,-0.10,0.00,0.23,-0.00,0.08,-0.12,0.34,-0.19,-0.23,-0.30,-0.17,-0.19,-0.28,-0.06,0.09,-0.07,0.41,0.02,-0.10,0.17,0.04,-0.12,0.05,0.25,-0.01,-0.34,-0.31,-0.16,0.01,-0.12,0.05,0.12,-0.13,0.06,-0.04,0.27,0.07,-0.24,-0.21,-0.04,-0.28,-0.03,0.05
6,0.59,0.70,-0.01,0.61,-0.15,0.65,1.00,0.33,0.21,0.07,0.26,0.82,0.43,0.57,-0.04,0.18,0.31,-0.07,0.37,0.68,0.53,0.35,0.22,0.59,-0.04,0.05,-0.02,0.06,0.32,0.32,-0.04,-0.15,0.12,0.00,0.60,0.49,0.24,-0.15,0.04,0.36,-0.04,0.13,-0.09,0.39,-0.22,-0.35,-0.28,-0.19,-0.25,-0.41,-0.09,0.07,-0.13,0.52,-0.02,-0.24,0.29,0.06,-0.16,-0.06,0.26,0.09,-0.49,-0.33,-0.17,-0.01,-0.23,0.06,0.11,-0.18,0.07,-0.07,0.42,0.11,-0.26,-0.29,-0.10,-0.22,0.26,-0.00
7,0.19,0.25,0.04,0.23,-0.09,0.22,0.33,1.00,0.05,0.07,0.17,0.24,0.16,0.21,-0.00,-0.02,0.11,-0.01,0.20,0.23,0.17,0.12,-0.01,0.22,0.02,-0.03,-0.01,0.02,0.06,0.09,-0.03,-0.13,-0.07,0.07,0.24,0.20,0.18,-0.08,0.10,0.20,-0.05,0.07,-0.04,0.11,-0.08,-0.15,-0.09,-0.04,-0.11,-0.10,-0.12,0.07,-0.01,0.17,-0.01,-0.15,0.16,0.09,0.03,0.02,0.11,0.13,-0.18,-0.14,-0.03,-0.02,-0.09,-0.02,0.11,-0.07,0.04,0.01,0.13,0.06,-0.10,-0.14,0.01,-0.08,0.14,-0.00
8,0.36,0.52,0.05,0.05,0.20,0.07,0.21,0.05,1.00,0.07,0.12,0.10,0.10,-0.04,0.01,0.14,0.23,-0.02,-0.11,0.09,0.68,-0.01,0.17,-0.07,-0.04,-0.02,0.01,0.05,0.09,0.50,0.11,0.04,0.04,-0.02,0.13,0.11,-0.15,-0.02,-0.05,0.08,0.02,0.03,0.14,0.03,-0.01,-0.01,-0.00,0.00,0.02,0.00,-0.01,-0.02,0.01,-0.05,0.09,-0.11,0.06,-0.04,0.01,-0.03,-0.10,-0.06,0.04,0.06,0.02,0.00,0.01,-0.01,-0.05,-0.04,0.08,0.03,-0.03,0.04,0.03,-0.05,-0.02,-0.02,-0.00,0.00
9,0.05,0.17,0.02,0.13,-0.01,0.06,0.07,0.07,0.07,1.00,0.08,0.07,0.01,0.01,-0.00,0.11,0.02,0.01,0.14,0.02,0.08,0.02,-0.04,0.00,-0.06,-0.01,0.03,-0.03,0.06,0.08,0.06,0.05,0.05,0.04,0.13,0.10,0.07,-0.01,-0.02,0.11,0.10,0.06,0.03,-0.00,0.07,-0.02,0.05,0.08,-0.01,0.04,-0.02,0.18,-0.02,0.03,0.03,-0.03,0.05,-0.01,0.10,-0.00,0.01,0.00,-0.01,-0.04,0.03,-0.00,-0.02,-0.01,0.00,-0.03,0.90,0.13,0.05,0.03,-0.02,-0.02,0.03,0.05,0.01,-0.01


# End of EDA ================================================

# Define Train Validation Sets

In [8]:
Ratio         = 0.8
NumRows_Train = round(NumRows_CV * Ratio)

# Train Set 
Train_dt = Xy.limit(NumRows_Train)

In [98]:
Train_dt.first()


Row(Sale_Date=datetime.datetime(1989, 1, 17, 0, 0), yDay=17, Labels=9.159047077588632, ModelID=8434, mDay=17, Year=1989, datasource=132, SalesID=1646770, auctioneerID=18, YearMade=1974, MachineID=1126363, wDay=3, MachineHoursCurrentMeter=672, Stick_IDX=2.0, Blade_Extension_IDX=1.0, Track_Type_IDX=2.0, state_IDX=1.0, Tire_Size_IDX=16.0, Coupler_IDX=2.0, Stick_Length_IDX=28.0, Enclosure_Type_IDX=2.0, Transmission_IDX=4.0, UsageBand_IDX=3.0, Ride_Control_IDX=2.0, Thumb_IDX=2.0, Drive_System_IDX=4.0, Forks_IDX=1.0, Pattern_Changer_IDX=2.0, Blade_Width_IDX=5.0, Pushblock_IDX=1.0, Hydraulics_IDX=0.0, ProductSize_IDX=0.0, Turbocharged_IDX=1.0, Hydraulics_Flow_IDX=2.0, Travel_Controls_IDX=6.0, Engine_Horsepower_IDX=2.0, ProductGroupDesc_IDX=1.0, Scarifier_IDX=1.0, Backhoe_Mounting_IDX=1.0, Ripper_IDX=3.0, Coupler_System_IDX=1.0, Enclosure_IDX=0.0, fiProductClassDesc_IDX=33.0, Blade_Type_IDX=1.0, Tip_Control_IDX=2.0, Grouser_Tracks_IDX=1.0, Grouser_Type_IDX=3.0, Undercarriage_Pad_Width_IDX=18.0

In [101]:
Train_dt.orderBy('Sale_Date', ascending=0).first()

Row(Sale_Date=datetime.datetime(2009, 9, 12, 0, 0), yDay=255, Labels=9.95227771670556, ModelID=3162, mDay=12, Year=2009, datasource=132, SalesID=1265778, auctioneerID=99, YearMade=2005, MachineID=1397545, wDay=7, MachineHoursCurrentMeter=1712, Stick_IDX=2.0, Blade_Extension_IDX=1.0, Track_Type_IDX=2.0, state_IDX=2.0, Tire_Size_IDX=16.0, Coupler_IDX=2.0, Stick_Length_IDX=28.0, Enclosure_Type_IDX=2.0, Transmission_IDX=0.0, UsageBand_IDX=1.0, Ride_Control_IDX=2.0, Thumb_IDX=2.0, Drive_System_IDX=4.0, Forks_IDX=1.0, Pattern_Changer_IDX=2.0, Blade_Width_IDX=5.0, Pushblock_IDX=1.0, Hydraulics_IDX=0.0, ProductSize_IDX=6.0, Turbocharged_IDX=1.0, Hydraulics_Flow_IDX=2.0, Travel_Controls_IDX=6.0, Engine_Horsepower_IDX=2.0, ProductGroupDesc_IDX=1.0, Scarifier_IDX=1.0, Backhoe_Mounting_IDX=1.0, Ripper_IDX=3.0, Coupler_System_IDX=1.0, Enclosure_IDX=0.0, fiProductClassDesc_IDX=1.0, Blade_Type_IDX=0.0, Tip_Control_IDX=2.0, Grouser_Tracks_IDX=1.0, Grouser_Type_IDX=3.0, Undercarriage_Pad_Width_IDX=18.0

# Validation set 

In [9]:
Dev_dt = Xy.where(Xy.Sale_Date > fcn.to_date(fcn.lit('2009-9-13').cast(TimestampType()) ))

In [12]:
Xy_clms = []
for c in Train_dt.columns:
    if c not in [IDX ,'Sale_Date', 'datasource']: 
        Xy_clms.append(c)
        
X_clms = []
for c in X.columns:
    if c not in [IDX ,'Sale_Date', 'datasource']: 
        X_clms.append(c)

'''        
# Drop the index column for the CV set 
Xy_clms = Xy.columns # XY labels 
Xy_clms.remove(IDX)#
# Drop the index column for the test set 
X_clms  = X.columns
X_clms.remove(IDX)
'''

'        \n# Drop the index column for the CV set \nXy_clms = Xy.columns # XY labels \nXy_clms.remove(IDX)#\n# Drop the index column for the test set \nX_clms  = X.columns\nX_clms.remove(IDX)\n'

# Assemble for Spark training 

In [21]:
# Train Set 
ML_Vectorizer     = VectorAssembler(inputCols= X_clms , outputCol='features')
ML_Vectorized_trn = ML_Vectorizer.transform(Train_dt.select(*X_clms, 'Labels'))


# Dev Set 
ML_Vectorizer     = VectorAssembler(inputCols= X_clms , outputCol='features')
ML_Vectorized_dev = ML_Vectorizer.transform(Dev_dt.select(*X_clms, 'Labels'))


# ===================================================================================

# Validation Set 
ML_Vectorizer     = VectorAssembler(inputCols= X_clms , outputCol='features')
ML_Vectorized_T   = ML_Vectorizer.transform(Train_dt.select(IDX,*X_clms))


# Peform feature scaling if necessary

In [197]:
'''
scaler = StandardScaler(inputCol="uns_features", outputCol="features",
                        withStd=True, withMean=False)
Vec_scaled = scaler.fit(Vectorized).transform(Vectorized)
'''

'\nscaler = StandardScaler(inputCol="uns_features", outputCol="features",\n                        withStd=True, withMean=False)\nVec_scaled = scaler.fit(Vectorized).transform(Vectorized)\n'

# Define the ML Data Set

#### train Set

In [16]:
%%time
train_set = ML_Vectorized_trn.select('features','Labels')
train_set.show()

+--------------------+------------------+
|            features|            Labels|
+--------------------+------------------+
|[17.0,8434.0,17.0...| 9.159047077588632|
|[31.0,6788.0,31.0...| 9.159047077588632|
|[31.0,6788.0,31.0...| 8.987196820661973|
|[31.0,6788.0,31.0...| 9.104979856318357|
|[31.0,6788.0,31.0...| 8.948975607841776|
|[31.0,6953.0,31.0...| 9.740968623038354|
|[31.0,6788.0,31.0...| 9.903487552536127|
|[31.0,6788.0,31.0...|  9.95227771670556|
|[31.0,4124.0,31.0...| 10.46310334047155|
|[31.0,4124.0,31.0...|10.799575577092764|
|[31.0,4123.0,31.0...|  9.95227771670556|
|[31.0,4123.0,31.0...|10.085809109330082|
|[31.0,4123.0,31.0...| 10.06475570013225|
|[31.0,3356.0,31.0...| 9.740968623038354|
|[31.0,3356.0,31.0...| 9.740968623038354|
|[31.0,4089.0,31.0...| 9.852194258148577|
|[31.0,4089.0,31.0...| 9.998797732340453|
|[31.0,4089.0,31.0...|10.184900011974314|
|[31.0,4089.0,31.0...|10.221941283654663|
|[31.0,4089.0,31.0...|10.184900011974314|
+--------------------+------------

In [17]:
train_set.printSchema()

root
 |-- features: vector (nullable = true)
 |-- Labels: double (nullable = true)



#### dev set 

In [18]:
%%time
dev_set = ML_Vectorized_dev.select('features','Labels')
dev_set.show()

+--------------------+------------------+
|            features|            Labels|
+--------------------+------------------+
|[257.0,1535.0,14....|10.325481962595504|
|[257.0,4754.0,14....|10.239959789157341|
|[257.0,3893.0,14....| 9.350102314351341|
|[258.0,4147.0,15....|10.373491181781864|
|[258.0,3362.0,15....|11.002099841204238|
|[258.0,3171.0,15....| 9.680344001221918|
|[258.0,3161.0,15....| 9.581903928408666|
|[258.0,3178.0,15....| 9.769956159911606|
|[258.0,3178.0,15....| 9.769956159911606|
|[258.0,2759.0,15....| 9.838949031398556|
|[258.0,7261.0,15....|  9.28266103355581|
|[258.0,3876.0,15....| 9.104979856318357|
|[258.0,4138.0,15....|  9.95227771670556|
|[258.0,4138.0,15....| 9.711115659888671|
|[258.0,23920.0,15...|11.320553572322773|
|[258.0,22072.0,15...| 9.104979856318357|
|[258.0,22074.0,15...| 9.047821442478408|
|[258.0,14451.0,15...| 9.798127036878302|
|[258.0,14451.0,15...| 9.615805480084347|
|[258.0,14451.0,15...| 9.546812608597396|
+--------------------+------------

In [19]:
dev_set.printSchema()

root
 |-- features: vector (nullable = true)
 |-- Labels: double (nullable = true)



#### Test Set

In [22]:
%%time
test_set = ML_Vectorized_T.select(IDX,'features')


Wall time: 7.98 ms


# Define the ML model 

In [23]:
mdl_RF  = RandomForestRegressor(featuresCol='features', labelCol='Labels', )
#mdl_GBT = GBTClassifier(featuresCol='features', labelCol='Labels') 
# mdl_LG =  LogisticRegression(featuresCol='features', labelCol='Labels')

# Create the evaluation object
#### Options: rmse - root mean squared error (default)   |   mse - mean squared error  |  r2 - r^2 metric   |  mae - mean absolute error

In [31]:
eval1_obj = RegressionEvaluator(predictionCol='prediction', labelCol='Labels', metricName='rmse')

# Parameter Grids

In [25]:
paramGrid_RF = ParamGridBuilder()\
    .addGrid(mdl_RF.maxDepth, [5])\
    .addGrid(mdl_RF.maxBins, [100])\
    .addGrid(mdl_RF.impurity, ['variance'])\
    .addGrid(mdl_RF.minInstancesPerNode, [1])\
    .addGrid(mdl_RF.checkpointInterval, [10])\
    .addGrid(mdl_RF.subsamplingRate, [1.0])\
    .addGrid(mdl_RF.numTrees, [20])\
    .addGrid(mdl_RF.featureSubsetStrategy, ['auto'])\
    .build()

#  maxBin default is 32




Np_RF = len(paramGrid_RF)
#Options for subsamplingRate auto, all, onethird, sqrt, log2 

# -------------------------------------
'''
paramGrid_GBT = ParamGridBuilder() \
    .addGrid(mdl_GBT.maxDepth, [5]) \
    .addGrid(mdl_GBT.maxIter,  [6,8,12,20]) \
    .addGrid(mdl_GBT.stepSize, [0.01,0.05,0.1,0.2]) \
    .build()
Np_GBT = len(paramGrid_GBT)
'''
# -------------------------------------
'''
paramGrid_LG = ParamGridBuilder() \
    .addGrid(mdl_LG.regParam, [0.01, 0.1, 0.5]) \
    .addGrid(mdl_LG.maxIter, [100,200]) \
    .build()
Np_LG = len(paramGrid_LG)'''

'\nparamGrid_LG = ParamGridBuilder()     .addGrid(mdl_LG.regParam, [0.01, 0.1, 0.5])     .addGrid(mdl_LG.maxIter, [100,200])     .build()\nNp_LG = len(paramGrid_LG)'

In [65]:


'''
xval_RF = TrainValidationSplit(
                         estimator         = mdl_RF,
                         estimatorParamMaps= paramGrid_RF,
                         evaluator         = eval1_obj,
                         parallelism       = 8,
                         trainRatio        = 0.8)
'''
'''
xval_GBT = CrossValidator(estimator         = mdl_GBT,
                         estimatorParamMaps= paramGrid_GBT,
                         evaluator         = eval1_obj,
                         numFolds          = 10,
                         parallelism       = 8)
'''
'''
# ---------------------------------------------------------
xval_LG = CrossValidator(estimator         = mdl_LG,
                         estimatorParamMaps= paramGrid_LG,
                         evaluator         = eval1_obj,
                         numFolds          = 10,
                         parallelism       = 8)
'''

'\n# ---------------------------------------------------------\nxval_LG = CrossValidator(estimator         = mdl_LG,\n                         estimatorParamMaps= paramGrid_LG,\n                         evaluator         = eval1_obj,\n                         numFolds          = 10,\n                         parallelism       = 8)\n'

# Fit the model to train set

In [32]:
%%time
mdl_RF_ftd = mdl_RF.fit(train_set)

Wall time: 19.8 s


# Use the trained model to make prediction for the dev set

In [33]:
mdl_RF_preds_dev = mdl_RF_ftd.transform(dev_set) 

# Evaluate the model on the dev set

In [34]:
eval1_obj.evaluate(mdl_RF_preds_dev)

0.4482490531074701

# Test Data

In [27]:
mdl_xv_RF_preds_T = mdl_xv_RF_ftd.transform(test_set) 

In [28]:
mdl_xv_RF_preds_T = mdl_xv_RF_preds_T.withColumnRenamed('prediction','logSalePrice')
mdl_xv_RF_preds_T.columns

['SalesID', 'features', 'logSalePrice']

In [30]:
fnl = mdl_xv_RF_preds_T.select('SalesId',fcn.exp('logSalePrice').alias('SalePrice'))
fnl.show()

+-------+------------------+
|SalesId|         SalePrice|
+-------+------------------+
|1646770|20321.448420497047|
|1259551|  15444.3740569086|
|1259760|  15444.3740569086|
|1259842|  15444.3740569086|
|1259973|  15444.3740569086|
|1263797|18390.809466800278|
|1264985|16325.315002688381|
|1265202|15961.130657648622|
|1297898|23657.152212590525|
|1298915| 34296.62589246201|
|1301225| 22814.54858983258|
|1301582| 22284.42621195536|
|1301884|24219.930256120024|
|1305336| 23561.83944433397|
|1305337| 23561.83944433397|
|1328950|25495.764982560017|
|1329056|25495.764982560017|
|1329607|25495.764982560017|
|1329619| 27055.29119154657|
|1329620|25495.764982560017|
+-------+------------------+
only showing top 20 rows



In [213]:
%%time
Sub_PId = fnl.rdd.map(lambda r: r.SalesId).collect()
Sub_Prd = fnl.rdd.map(lambda r: r.SalePrice).collect()

Wall time: 17.4 s


In [214]:
Submission = pd.DataFrame(columns=['SalesId', 'SalePrice'])

In [215]:
Submission['SalesId'] = Sub_PId 
Submission['SalePrice']    = Sub_Prd

In [216]:
Submission 

Unnamed: 0,SalesId,SalePrice
0,1222837,23418.037046
1,1222839,58317.659949
2,1222841,43092.220123
3,1222843,23267.160353
4,1222845,46692.748085
5,1222847,11594.325510
6,1222849,21653.606464
7,1222850,21653.606464
8,1222855,27612.322412
9,1222863,24103.718419


In [165]:
Submission.to_csv('Submission.csv')