In [None]:
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
!wget -q https://downloads.apache.org/spark/spark-3.0.0/spark-3.0.0-bin-hadoop2.7.tgz
!tar xf spark-3.0.0-bin-hadoop2.7.tgz
!pip install -q findspark

In [None]:
# Install Kaggle library
!pip install -q kaggle

In [None]:
# Colab library to upload the kaggle.json file
from google.colab import files
uploaded = files.upload()

Saving kaggle.json to kaggle.json


In [None]:
!mkdir ~/.kaggle
!cp kaggle.json ~/.kaggle/kaggle.json
!chmod 600 /root/.kaggle/kaggle.json
!kaggle datasets download --force -d census/2013-american-community-survey/ss13pusa.csv
!unzip /content/2013-american-community-survey.zip

Downloading 2013-american-community-survey.zip to /content
 98% 898M/916M [00:15<00:00, 115MB/s]
100% 916M/916M [00:15<00:00, 61.3MB/s]
Archive:  /content/2013-american-community-survey.zip
  inflating: shapefiles/._tl_2013_32_puma10.dbf  
  inflating: shapefiles/._tl_2013_32_puma10.prj  
  inflating: shapefiles/._tl_2013_32_puma10.shp  
  inflating: shapefiles/._tl_2013_32_puma10.shp.xml  
  inflating: shapefiles/._tl_2013_32_puma10.shx  
  inflating: shapefiles/._tl_2013_33_puma10.dbf  
  inflating: shapefiles/._tl_2013_33_puma10.prj  
  inflating: shapefiles/._tl_2013_33_puma10.shp  
  inflating: shapefiles/._tl_2013_33_puma10.shp.xml  
  inflating: shapefiles/._tl_2013_33_puma10.shx  
  inflating: shapefiles/._tl_2013_34_puma10.dbf  
  inflating: shapefiles/._tl_2013_34_puma10.prj  
  inflating: shapefiles/._tl_2013_34_puma10.shp  
  inflating: shapefiles/._tl_2013_34_puma10.shp.xml  
  inflating: shapefiles/._tl_2013_34_puma10.shx  
  inflating: shapefiles/._tl_2013_35_puma10.dbf 

In [None]:
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-3.0.0-bin-hadoop2.7"

In [None]:
import findspark

findspark.init("spark-3.0.0-bin-hadoop2.7")# SPARK_HOME
from pyspark.sql import SparkSession
from pyspark import SparkContext

global sc
sc = SparkContext("AmericanSurveyProject").getOrCreate()
sc.setCheckpointDir('checkpoint')
sc._conf.getAll()


[('spark.rdd.compress', 'True'),
 ('spark.driver.memory', '8g'),
 ('spark.driver.host', '9883aca3d49b'),
 ('spark.serializer.objectStreamReset', '100'),
 ('spark.master', 'local[*]'),
 ('spark.executor.memory', '8g'),
 ('spark.submit.pyFiles', ''),
 ('spark.executor.id', 'driver'),
 ('spark.submit.deployMode', 'client'),
 ('spark.app.id', 'local-1595529924688'),
 ('spark.driver.port', '33029'),
 ('spark.ui.showConsoleProgress', 'true'),
 ('spark.app.name', 'AmericanProject')]

In [None]:
from pyspark.sql import SparkSession
from pyspark import SparkContext
from pyspark.sql import SQLContext

sql = SQLContext(sc)

df = (sql.read
         .format("com.databricks.spark.csv")
         .option("header", "true")
         .load("ss13pusa.csv"))

df = df.drop('PERNP').drop('PINCP').drop('FINCP')

In [None]:
from pyspark.sql import functions as F
from pyspark.sql.types import *

names = df.schema.names
df = df.filter(df.AGEP > 17)
for name in names:
    df = df.withColumn(name, df[name].cast(FloatType()))
    if(name == "WAGP"):
        df = df.where(F.col(name).isNotNull())
    else:
        df = df.withColumn(name, (F.when(F.isnull(F.col(name)), 0.0).otherwise(F.col(name)+1)))

In [None]:
import numpy as np
from pyspark.sql import Window
from pyspark.sql import functions as F

sampleData = df.sample(fraction=0.2, seed=42)
sampleLen = sampleData.count()
print(sampleLen)

windowSpec = Window.orderBy("WAGP")
sampleData = sampleData.withColumn("WAGPranks", F.dense_rank().over(windowSpec))

WAGPmean = sampleData.select(F.mean("WAGPranks")).first()[0]
WAGPsquaredSum = sampleData.select(F.sum(F.pow(F.col("WAGPranks"), 2.0))).first()[0]


names = sampleData.schema.names
for name in names:
    windowSpec = Window.orderBy(name)
    dfRanks = sampleData.withColumn("featureRanks", F.dense_rank().over(windowSpec))

    featureMean = dfRanks.select(F.mean("featureRanks")).first()[0]
    featureSquaredSum = dfRanks.select(F.sum(F.pow(F.col("featureRanks"), 2.0))).first()[0]

    numerator = dfRanks.select(F.sum(F.col("WAGPranks")  * (F.col("featureRanks")))).first()[0] - sampleLen * WAGPmean * featureMean
    denominator = np.sqrt(WAGPsquaredSum - sampleLen*WAGPmean**2) * np.sqrt(featureSquaredSum - sampleLen*featureMean**2)

    spearmanScore = numerator / denominator

    spearmanScoreString = name + " "+ str(spearmanScore) + ","
    print(name + " "+ str(spearmanScore))
print(spearmanScoreString)

In [None]:
import numpy as np

spearmanScore = "SERIALNO -0.0020784158091472315,SPORDER -0.10869866076605482,PUMA -0.011662933124747074,PWGTP 0.03831662718612401,AGEP -0.19545717528793344,CIT -0.0031177120225270288,COW 0.15337668651095446,DDRS 0.173975602182416,DEAR 0.13620252116207485,DEYE 0.11136701216422948,DOUT 0.2311544418353776,DPHY 0.24849386227877404,DREM 0.19381425122714702,GCL 0.11378659445481011,HINS1 -0.43768951177239485,HINS2 0.14233189993576748,HINS3 0.41581175856517527,HINS4 0.257510478496328,HINS5 0.03359587366969293,HINS6 0.07909815622858651,HINS7 0.024318295105544573,INTP -0.029266726300754633,JWMNP 0.5378887433593286,JWTR 0.24480312463477116,LANX 0.023936327552290897,MAR -0.12886501495859393,MARHD 0.09643311741634122,MARHM 0.09194645383871781,MARHT 0.04110803724362987,MARHW 0.10315137324717988,MARHYP 0.2562151216355247,MIG -0.019524927911365612,MIL 0.03788996268914506,NWAB 0.5104833151073246,NWAV 0.15418381582704102,NWLA 0.5255833573566556,NWLK 0.5119835882121269,NWRE 0.07578666841629522,OIP -0.08347644995438,PAP -0.06622802919642291,RELP -0.17706829536376753,RETP -0.1889872208405454,SCH -0.10856936680468796,SCHL 0.31128765398222685,SEMP -0.0871368347951742,SEX -0.14062374017091178,SSIP -0.13260268359031704,SSP -0.3846345378274127,WKHP 0.8060869157915436,WKL -0.6978058313890892,WKW 0.14429865056724916,WRK -0.43865533759483255,ANC -0.021130102881405172,ANC1P -0.0711576505837427,ANC2P -0.05846032682167059,DIS 0.2930208150285274,ESR -0.7307648871041318,HICOV -0.11238831459708738,HISP -0.009493094094053724,INDP 0.41345641811078493,JWAP 0.5621214045551339,JWDP 0.4970412369092851,MSP -0.14691579868961321,NATIVITY 0.0009231722057621701,OC -0.09696022978877797,OCCP 0.23689747525255453,POBP 0.006343994688512026,POVPIP 0.4911509723677551,POWPUMA 0.5007935930927065,POWSP 0.5367410396482478,PRIVCOV -0.3523852616329459,PUBCOV 0.4679975309216134,QTRBIR 0.003351846944711651,RAC1P -0.010357988751166813,RAC2P -0.003951307916705394,RAC3P -0.008849351503604563,RACAIAN -0.025984926065625122,RACASN 0.035984002051809044,RACBLK -0.06418511044588104,RACNH 0.004107371512837685,RACNUM -0.0056927849686350205,RACPI -0.0008595993822165405,RACSOR -0.023402659089444628,RACWHT 0.04430749026505265,RC -0.1023436719714487,SOCP 0.20658230736243177,WAOB 0.015264902981217559,FAGEP -0.008823003659286755,FCITP -0.03022060842048593,FCITWP -0.002387207053894812,FCOWP 0.027112192619047612,FDDRSP -0.036763804815907505,FDEARP -0.028582957936466848,FDEYEP -0.030032351200338897,FDISP -0.04575772197767356,FDOUTP -0.03719589158429998,FDPHYP -0.03754071517826975,FDRATP -0.010373304042186811,FDRATXP -0.02445605658620091,FDREMP -0.03895735482221284,FENGP -0.014511345978248002,FESRP -0.08303479000623476,FFERP -0.015386126121487476,FFODP 0.06162525776156262,FGCLP -0.02328231273428114,FGCMP -0.004743984215656845,FGCRP -0.013974714315249364,FHINS1P -0.10158802585209184,FHINS2P -0.10458502290413461,FHINS3P -0.030359481476627072,FHINS4P -0.11783561394606905,FHINS5P -0.12723149769265502,FHINS6P -0.12404838259336806,FHINS7P -0.13146103073176846,FHISP -0.05532002399041921,FINDP 0.03303462837327663,FINTP -0.04797920896623181,FJWDP 0.18043249682489404,FJWMNP 0.1370647007513432,FJWRIP 0.12070813613216115,FJWTRP 0.11646827246265434,FLANP -0.015731674050858468,FLANXP -0.03599001380834269,FMARHDP -0.026379669725700404,FMARHMP -0.03594921816690989,FMARHTP -0.028637064831818625,FMARHWP -0.032723563652003775,FMARHYP -0.07198084794736916,FMARP -0.05472406434255027,FMIGP -0.05006992750967437,FMIGSP -0.021163978990726154,FMILPP -0.030649010367749346,FMILSP -0.045179506926485535,FOCCP 0.03714626849915537,FOIP -0.04531515170873,FPAP -0.04600108355568222,FPERNP 0.02574784305635562,FPINCP -0.0636048405328097,FPOBP -0.04649902333810055,FPOWSP 0.12847612918098938,FPRIVCOVP -0.13305905237136365,FPUBCOVP -0.1342943525789352,FRACP -0.016121059513604163,FRELP -0.027466114344437517,FRETP -0.06259502378476665,FSCHGP -0.0453997441268146,FSCHLP -0.05972358593628446,FSCHP -0.034033105060077846,FSEMP -0.09712768533570895,FSEXP -0.006573480689103249,FSSIP -0.051434787093568,FSSP -0.09365977552362797,FWAGP 0.04283029867890299,FWKHP 0.10621284458084658,FWKLP -0.07062243231958558,FWKWP 0.10409969040461643,FWRKP -0.010432307175015747,FYOEP -0.01241361058073579,pwgtp1 0.03158299292518658,pwgtp2 0.03172267268650871,pwgtp3 0.03096720422215968,pwgtp4 0.029647373998850535,pwgtp5 0.032473927287425965,pwgtp6 0.030415754851848133,pwgtp7 0.029897456927422343,pwgtp8 0.03034785809924581,pwgtp9 0.03123785618340319,pwgtp10 0.030630212411359072,pwgtp11 0.029773117998110513,pwgtp12 0.02990377015032525,pwgtp13 0.03187109749935374,pwgtp14 0.029222631760484948,pwgtp15 0.03193252905278535,pwgtp16 0.03169903413238581,pwgtp17 0.030015846310864173,pwgtp18 0.030816648411786822,pwgtp19 0.03146196227919121,pwgtp20 0.031005082785677842,pwgtp21 0.030546123001490465,pwgtp22 0.03191738835800542,pwgtp23 0.030556588570418854,pwgtp24 0.02907061920654028,pwgtp25 0.03169999590050073,pwgtp26 0.030614815568843014,pwgtp27 0.029187714806324632,pwgtp28 0.02935521330919687,pwgtp29 0.031979644551292125,pwgtp30 0.031003514904279873,pwgtp31 0.031931435918044544,pwgtp32 0.031654318910148936,pwgtp33 0.029776448405285698,pwgtp34 0.03061267867204305,pwgtp35 0.030754822278991734,pwgtp36 0.03040332733091676,pwgtp37 0.030113900559069887,pwgtp38 0.031156182736380964,pwgtp39 0.03181783743167289,pwgtp40 0.03221239522696887,pwgtp41 0.03199245698023919,pwgtp42 0.030039050241370574,pwgtp43 0.0301523862942185,pwgtp44 0.03095932171711289,pwgtp45 0.03206086453632531,pwgtp46 0.029813629749851727,pwgtp47 0.031463613687896394,pwgtp48 0.030144273449728417,pwgtp49 0.02973699631989878,pwgtp50 0.02961606086897918,pwgtp51 0.03100373000730614,pwgtp52 0.030386652577423397,pwgtp53 0.03082372383986165,pwgtp54 0.02988361370251483,pwgtp55 0.03298623707765321,pwgtp56 0.03120163232215752,pwgtp57 0.029778118188522076,pwgtp58 0.031554971407541765,pwgtp59 0.03182669412263354,pwgtp60 0.03168265245062419,pwgtp61 0.031873825661978474,pwgtp62 0.02936145744703401,pwgtp63 0.030338739907796574,pwgtp64 0.029699578955670353,pwgtp65 0.02995745472072087"

spilttedStr = spearmanScore.split(',')

featureScoreList = []
for x in spilttedStr:
    record = x.split(' ')
    featureScoreList.append([record[0], abs(float(record[1]))])

def takeSecond(elem):
    return elem[1]
featureScoreList.sort(key=takeSecond, reverse=True)

featureList = []
for x in featureScoreList:
    if abs(x[1]) > 0.1:
        featureList.append(x[0])
        print("feature:"+x[0] + "score:"+str(x[1]))
print(featureList)
print(len(featureList))

feature:WKHPscore:0.8060869157915436
feature:ESRscore:0.7307648871041318
feature:WKLscore:0.6978058313890892
feature:JWAPscore:0.5621214045551339
feature:JWMNPscore:0.5378887433593286
feature:POWSPscore:0.5367410396482478
feature:NWLAscore:0.5255833573566556
feature:NWLKscore:0.5119835882121269
feature:NWABscore:0.5104833151073246
feature:POWPUMAscore:0.5007935930927065
feature:JWDPscore:0.4970412369092851
feature:POVPIPscore:0.4911509723677551
feature:PUBCOVscore:0.4679975309216134
feature:WRKscore:0.43865533759483255
feature:HINS1score:0.43768951177239485
feature:HINS3score:0.41581175856517527
feature:INDPscore:0.41345641811078493
feature:SSPscore:0.3846345378274127
feature:PRIVCOVscore:0.3523852616329459
feature:SCHLscore:0.31128765398222685
feature:DISscore:0.2930208150285274
feature:HINS4score:0.257510478496328
feature:MARHYPscore:0.2562151216355247
feature:DPHYscore:0.24849386227877404
feature:JWTRscore:0.24480312463477116
feature:OCCPscore:0.23689747525255453
feature:DOUTscore:0

In [None]:
import numpy as np

def squaredError(label, prediction):
    return (label - prediction) ** 2

def calcRMSE(labelsAndPreds):
    return np.sqrt(labelsAndPreds.map(lambda p: squaredError(p[0],p[1])).mean())

def getLabeledPrediction(weights, observation, featureCol, labelCol):
    return (observation[labelCol], weights.dot(DenseVector(observation[featureCol])))

def gradientSummand(weights, lp, featureCol, labelCol):
    return (weights.dot(DenseVector(lp[featureCol])) - lp[labelCol]) * lp[featureCol]

def myRidgeRegressionBGD(trainingData, valData, labelCol, featureCol, maxIterations = 10, alpha = 0.0, learningRate = 1.0):
    sizeTrainData = trainingData.count()
    sizeOfFeatures = len(trainingData.take(1)[0][featureCol])
    #Itialization of the weights with zeros, adding 1 more weight that will correspond to the  intercept
    weights = np.zeros(sizeOfFeatures+1)
    
    #Added a new column of all 1 to the training data for the intercept updates
    trainingData = trainingData.withColumn("intercept", F.lit(1))
    assembler = VectorAssembler(inputCols=["intercept", featureCol] , outputCol="featuresI")
    trainingData = assembler.transform(trainingData)
    
    #Added a new column of all 1 to the validation data for the intercept
    valData = valData.withColumn("intercept", F.lit(1))
    valData = assembler.transform(valData)
    featureCol = "featuresI"
    
    minImprovement = 1
    trainingErrors = [] 
    for i in range(maxIterations):
        #Calculation of the accuracy of the model during the iteration to better understand the improvents of the model during the training
        #The accuracy is calculated through the compute of root mean square error on the validation data 
        labelsAndPredsTrain = valData.rdd.map(lambda p: getLabeledPrediction(weights, p, featureCol, labelCol))
        rmse = calcRMSE(labelsAndPredsTrain)
        trainingErrors.append(rmse)
        
        print("Iteration:"+str(i)+" rmse:"+str(trainingErrors[i]))
        
        #The difference between the error of the current iteration and the error of the previous iteration 
        #has to be greater than a threshold fixed to 1
        if(i > 0 and (trainingErrors[i-1] - trainingErrors[i] <= minImprovement)):
            print("Error doesn't go down. Stopped the training.")
            break
            
        #To calculate the gradient of the current iteration will be executed a map job through spark, will be calculated for every     
        gradient = sum(trainingData.rdd.map(lambda p: (DenseVector(gradientSummand(weights, p, featureCol, labelCol))))
                       .collect())
        
        weights = weights - learningRate * (2 * gradient + 2*alpha*sum(weights))
        
    return weights, trainingErrors

In [None]:
from pyspark.ml.linalg import DenseVector
from pyspark.ml.feature import StandardScaler 
from pyspark.ml.feature import VectorAssembler

sampleData = df.sample(fraction = 0.5, seed = 42)

assembler = VectorAssembler(inputCols=featureList, outputCol="features")
sampleData = assembler.transform(sampleData)

scaler = StandardScaler(inputCol="features", outputCol="scaledFeatures",withStd=True, withMean=True)
sampleData = scaler.fit(sampleData).transform(sampleData)
sampleData = sampleData.select("features", "scaledFeatures", "WAGP")

(trainingData, testData) = sampleData.randomSplit([0.7, 0.3])
trainingData.cache()
testData.cache()

In [None]:
iterations = [10]
learningRate = [0.1, 0.001, 0.0001, 0.00001, 0.000001, 0.0000001, 0.00000001]
firstIteration = True
for i in iterations:
    for lr in learningRate:
        print("iteration:"+str(i)+" learningRate:" + str(lr))
        weights, trainingError = myRidgeRegressionBGD(trainingData = trainingData, valData = testData, labelCol="WAGP", featureCol = "scaledFeatures", iterations = i, learningRate=lr, verbose = True)
        if(firstIteration):
            bestError = trainingError
            firstIteration = False
        
        if(trainingError < bestError):
            bestError = trainingError
            bestIteration = i
            bestLr = lr
print("Best hyperparameters iterations "+str(bestIteration)+" learningRate:"+str(bestLr))

iteration:10 learningRate:0.1
Iteration:0 rmse:55976.190484794206
Updating weights
Iteration:1 rmse:20212994788.12141
Error doesn't go down. Stopped the training.
iteration:10 learningRate:0.001
Iteration:0 rmse:55976.190484794206
Updating weights
Iteration:1 rmse:202102281.75862274
Error doesn't go down. Stopped the training.
iteration:10 learningRate:0.0001
Iteration:0 rmse:55976.190484794206
Updating weights
Iteration:1 rmse:20185129.546028767
Error doesn't go down. Stopped the training.
iteration:10 learningRate:1e-05
Iteration:0 rmse:55976.190484794206
Updating weights
Iteration:1 rmse:1993940.705076324
Error doesn't go down. Stopped the training.
iteration:10 learningRate:1e-06
Iteration:0 rmse:55976.190484794206
Updating weights
Iteration:1 rmse:180804.57573199284
Error doesn't go down. Stopped the training.
iteration:10 learningRate:1e-07
Iteration:0 rmse:55976.190484794206
Updating weights
Iteration:1 rmse:49111.438354204685
Updating weights
Iteration:2 rmse:47181.85549580404


In [None]:
import time

startTime = time.time()
weights, trainingError = myRidgeRegressionBGD(trainingData = trainingData, valData = testData, labelCol="WAGP", featureCol = "scaledFeatures", maxIterations = 1000, learningRate=0.0000001)
print("elapsed time in seconds:"+str(time.time() - startTime))

Iteration:0 rmse:55202.4317154726
Iteration:1 rmse:48382.50742216523
Iteration:2 rmse:45066.95251789525
Iteration:3 rmse:42967.58630616818
Iteration:4 rmse:41477.28046184557
Iteration:5 rmse:40428.401801446606
Iteration:6 rmse:39640.46340799318
Iteration:7 rmse:39059.01404026377
Iteration:8 rmse:38611.82674387964
Iteration:9 rmse:38273.06988119003
Iteration:10 rmse:38009.021990423316
Iteration:11 rmse:37804.97290388352
Iteration:12 rmse:37643.787829881134
Iteration:13 rmse:37516.743149453214
Iteration:14 rmse:37414.75968628273
Iteration:15 rmse:37332.66969169662
Iteration:16 rmse:37265.5395309745
Iteration:17 rmse:37210.31967421946
Iteration:18 rmse:37164.27093258963
Iteration:19 rmse:37125.5884181779
Iteration:20 rmse:37092.71555397639
Iteration:21 rmse:37064.57186741475
Iteration:22 rmse:37040.24829880005
Iteration:23 rmse:37019.085227465686
Iteration:24 rmse:37000.534325092514
Iteration:25 rmse:36984.18188803487
Iteration:26 rmse:36969.684724980274
Iteration:27 rmse:36956.7747251385

In [None]:
def predictLabels(testData, weights, featureCol, labelCol):
    testData = testData.withColumn("intercept", F.lit(1))
    assembler = VectorAssembler(inputCols=["intercept", featureCol] , outputCol="featuresI")
    testData = assembler.transform(testData)
    featureCol = "featuresI"
    
    labelsAndPreds = testData.rdd.map(lambda p: getLabeledPrediction(weights, p, featureCol, labelCol))
    return labelsAndPreds
    

In [None]:
labelsAndPreds = predictLabels(testData, weights, "scaledFeatures", "WAGP")
rmse = calcRMSE(labelsAndPreds)
print(rmse)

36834.96402605909


In [None]:
from pyspark.sql import functions as F

def squaredError(label, prediction):
    return (label - prediction) ** 2

def checkType(data, feature, maxCatValues):
    distinctValues = np.unique(data[feature])

    if (len(distinctValues) <= maxCatValues):
        featureType =  "categorical"
    else:
        featureType = "continuous"
    return [feature, featureType]

def getTypeOfFeature(data, featureList, maxCatValues = 15):
    
    featureList = sc.parallelize(featureList)
    featureTypes = featureList.map(lambda f: checkType(data, f, maxCatValues)).collect()
    featureDictTypes = {}
    for f in featureTypes:
        featureDictTypes[f[0]] = f[1]
    return featureDictTypes

def getCandidates(data, feature, bins = 32):
    
    candidates = [feature]

    if FEATURE_TYPES[feature] == "continuous":
        minValue = min(data[feature])
        maxValue = max(data[feature])
        
        if len(data) < bins:
            bins = len(data)
            
        step = (maxValue - minValue)/bins

        value = minValue
        candidates = [feature]
        while(value <= maxValue):
            candidates.append(value)
            value += step

    else: 
        distinctValues = np.unique(data[feature])
        distinctValues.sort()
        for values in distinctValues:
            candidates.append(values)
            
    return candidates

def getPotentialSplits(data, featureNameList, bins = 32):
    featureList = sc.parallelize(featureNameList)
    candidates = featureList.map(lambda f: getCandidates(data, f, bins))
    
    return candidates

def splitData(data, splitColumn, splitValue):

    typeOfFeature = FEATURE_TYPES[splitColumn]
    if typeOfFeature == "continuous":
        dataLeft = data.loc[data[splitColumn]<= splitValue]
        dataRight = data.loc[data[splitColumn] > splitValue]
    # feature is categorical   
    else:
        dataLeft = data.loc[data[splitColumn] == splitValue]
        dataRight = data.loc[data[splitColumn] != splitValue]
    
    return dataLeft, dataRight

def variance(data, labelCol):

    if len(data) == 0:   # empty data
        v = 0
        
    else:
        prediction = np.mean(data[labelCol])
        v = np.mean((data[labelCol] - prediction) **2)
    
    return v


def totalVariance(dataLeft, dataRight, labelCol):

    n = len(dataLeft) + len(dataRight) 
    dataLeftProb = len(dataLeft) / n
    dataRightProb = len(dataRight) / n

    metric = (dataLeftProb * variance(dataLeft, labelCol)  + dataRightProb * variance(dataRight, labelCol))
    return metric

def getBestMetric(data, feature, values, labelCol):    
    firstIteration = True
    gotBestMetric = False
    
    for value in values:
        dataLeft, dataRight = splitData(data, splitColumn=feature, splitValue=value)
        metric = totalVariance(dataLeft, dataRight, labelCol)

        if firstIteration or metric < bestMetric:
            gotBestMetric = True
            firstIteration = False
            bestSplitValue = value
            bestMetric = metric
            if(bestMetric == 0.0):
                break
        elif gotBestMetric:
            break
    
    return [feature, bestSplitValue, bestMetric]

def getBestSplit(data, potentialSplits, labelCol):    
    metrics = potentialSplits.map(lambda candidates: getBestMetric(data, candidates[0], candidates[1:], labelCol)).collect()
    
    firstIteration = True
    for m in metrics:
        if firstIteration or m[2] < bestMetric:
            firstIteration = False
            bestSplitColumn = m[0]
            bestSplitValue = m[1]
            bestMetric = m[2]
    
    return bestSplitColumn, bestSplitValue, bestMetric
                   
def createLeaf(data, labelCol):
    return np.mean(data[labelCol])

def myDecisionTreeRegression(data, featureList, labelCol, counter=0, minSamples=2, maxDepth=5, maxCatValues = 30, bins = 32):
    
    # data preparations
    if counter == 0:
        global COLUMN_HEADERS, FEATURE_TYPES
        data = data.toPandas()
        COLUMN_HEADERS = featureList
        FEATURE_TYPES = getTypeOfFeature(data, featureList, maxCatValues)  
        print(FEATURE_TYPES)
        
    
    # base cases
    if (len(data) < minSamples) or (counter == maxDepth):
        leaf = createLeaf(data, labelCol)
        return leaf
    
    # recursive part
    else:    
        counter += 1
        
        print("Depth:"+str(counter))
        
        #Taking the cadidates for which evaluate the split of the data
        potentialSplits = getPotentialSplits(data, featureList, bins)
        
        #Finding the best cadidate for the split
        splitColumn, splitValue, metric = getBestSplit(data, potentialSplits, labelCol)
        
        #Split the data with the best cadidate
        dataLeft, dataRight = splitData(data, splitColumn, splitValue) 
        
        # if the split produce an empty set of observations a leaf will be created and the recursion will 
        # stop a this level
        if len(dataLeft) == 0 or len(dataRight) == 0 or metric == 0.0: 
            leaf = createLeaf(data, labelCol)
            return leaf
        
        # decision point construction
        featureName = [feature for feature in COLUMN_HEADERS if feature == splitColumn]
        typeOfFeature = FEATURE_TYPES[splitColumn]
        if typeOfFeature == "continuous":
            question = "{} <= {}".format(featureName[0], splitValue)
            
        # if feature is categorical
        else:
            question = "{} = {}".format(featureName[0], splitValue)
        
        print(question)
        
        # instantiate sub-tree
        subTree = {question: []}
        
        del data
        
        # recursion
        yesAnswer = myDecisionTreeRegression(dataLeft, featureList, labelCol, counter, minSamples, maxDepth, maxCatValues, bins)
        noAnswer = myDecisionTreeRegression(dataRight, featureList, labelCol, counter, minSamples, maxDepth, maxCatValues, bins)
        
        subTree[question].append(yesAnswer)
        subTree[question].append(noAnswer)
        
    return subTree

In [None]:
def predictLabel(observation, tree, labelCol):
    question = list(tree.keys())[0]
    featureName, condition, value = question.split(" ")

    # ask question
    if condition == "<=":
        if observation[featureName] <= float(value):
            answer = tree[question][0]
        else:
            answer = tree[question][1]
    
    # feature is categorical
    else:
        if str(observation[featureName]) == value:
            answer = tree[question][0]
        else:
            answer = tree[question][1]

    # base case
    if not isinstance(answer, dict):
        return (observation[labelCol], answer)
    
    # recursive part
    else:
        residualTree = answer
        return predictLabel(observation, residualTree, labelCol)

In [None]:
from pyspark.ml.linalg import DenseVector
from pyspark.ml.feature import StandardScaler 
from pyspark.ml.feature import VectorAssembler 

featureLabelList = list(featureList)
featureLabelList.append("WAGP")

sampleData = df.sample(fraction = 0.5, seed = 42)
sampleData = sampleData.select(featureLabelList)

(trainingData, testData) = sampleData.randomSplit([0.7, 0.3])
trainingData.cache()
testData.cache()

DataFrame[WKHP: double, ESR: double, WKL: double, JWAP: double, JWMNP: double, POWSP: double, NWLA: double, NWLK: double, NWAB: double, POWPUMA: double, JWDP: double, POVPIP: double, PUBCOV: double, WRK: double, HINS1: double, HINS3: double, INDP: double, SSP: double, PRIVCOV: double, SCHL: double, DIS: double, HINS4: double, MARHYP: double, DPHY: double, JWTR: double, OCCP: double, DOUT: double, SOCP: double, AGEP: double, DREM: double, RETP: double, FJWDP: double, RELP: double, DDRS: double, NWAV: double, COW: double, MSP: double, WKW: double, HINS2: double, SEX: double, FJWMNP: double, DEAR: double, FPUBCOVP: double, FPRIVCOVP: double, SSIP: double, FHINS7P: double, MAR: double, FPOWSP: double, FHINS5P: double, FHINS6P: double, FJWRIP: double, FHINS4P: double, FJWTRP: double, GCL: double, HICOV: double, DEYE: double, SPORDER: double, SCH: double, FWKHP: double, FHINS2P: double, FWKWP: double, MARHW: double, RC: double, FHINS1P: double, WAGP: float]

In [None]:
depth = [5, 10, 15, 20]
maxCatValues = [2, 15, 50, 1000]
bins = [4, 8, 16, 32, 64]
firstIteration = True
for d in depth:
    for numBins in bins:
      if d == 20 and bins > ):
        for numCat in maxCatValues:
            print("MaxDepth:"+str(d)+" maxCatValues:"+str(numCat) + " bins:"+str(numBins))
            tree = myDecisionTreeRegression(data = trainingData, featureList = featureList, minSamples = 1000, maxDepth=d, maxCatValues = numCat, bins = numBins, labelCol = "WAGP")        

            predsAndLabels = testData.rdd.map(lambda y: predictLabel(y, tree, "WAGP"))
            rmse = calcRMSE(predsAndLabels)
            print("RMSE:"+str(rmse))

            if(firstIteration or rmse < bestError):
                firstIteration = False
                bestError = rmse
                bestDepth = d
                bestMaxCat = numCat
                bestBins = numBins
                
print("Best model, RMSE:"+str(bestError)+" maxDepth:"+str(bestDepth)+" maxCatValues:"+str(bestMaxCat) + " bins:"+str(bestBins))


In [None]:
import time
startTime = time.time()
tree = myDecisionTreeRegression(data = trainingData, featureList = featureList, minSamples = 1000, maxDepth=15, maxCatValues = 2, bins = 4, labelCol = "WAGP")        
print("elapsed time in seconds:"+str(time.time() - startTime))

{'WKHP': 'continuous', 'ESR': 'continuous', 'WKL': 'continuous', 'JWAP': 'continuous', 'JWMNP': 'continuous', 'POWSP': 'continuous', 'NWLA': 'continuous', 'NWLK': 'continuous', 'NWAB': 'continuous', 'POWPUMA': 'continuous', 'JWDP': 'continuous', 'POVPIP': 'continuous', 'PUBCOV': 'categorical', 'WRK': 'continuous', 'HINS1': 'categorical', 'HINS3': 'categorical', 'INDP': 'continuous', 'SSP': 'continuous', 'PRIVCOV': 'categorical', 'SCHL': 'continuous', 'DIS': 'categorical', 'HINS4': 'categorical', 'MARHYP': 'continuous', 'DPHY': 'categorical', 'JWTR': 'continuous', 'OCCP': 'continuous', 'DOUT': 'categorical', 'SOCP': 'continuous', 'AGEP': 'continuous', 'DREM': 'categorical', 'RETP': 'continuous', 'FJWDP': 'categorical', 'RELP': 'continuous', 'DDRS': 'categorical', 'NWAV': 'continuous', 'COW': 'continuous', 'MSP': 'continuous', 'WKW': 'continuous', 'HINS2': 'categorical', 'SEX': 'categorical', 'FJWMNP': 'categorical', 'DEAR': 'categorical', 'FPUBCOVP': 'categorical', 'FPRIVCOVP': 'categor