In [1]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, count, when, isnan, split, row_number, lit
import pandas as pd
from pyspark.ml.feature import PCA
from pyspark.ml.feature import VectorAssembler
from pyspark.sql.window import Window

In [2]:
spSession = SparkSession.builder.master("local").appName("Local-Session").getOrCreate()

In [3]:
# Loading files into a Data Frame

trainDF1 = spSession.read.csv("../dataset/train_1.csv", header = True) 
testDF1 = spSession.read.csv("../dataset/test_1.csv", header = True) 
type(trainDF1)

pyspark.sql.dataframe.DataFrame

In [6]:
# Converting string values to float

trainDF2 = trainDF1.select(*(col(c).cast("float").alias(c) for c in trainDF1.columns))
testDF2 = testDF1.select(*(col(c).cast("float").alias(c) for c in testDF1.columns)).orderBy("ID")

In [10]:
# Removing columns with correlation lower than 0.15 to the target variable

train_corr = [trainDF2.corr("TARGET", col, method = "pearson") for col in trainDF2.columns]
train_corr = pd.Series(train_corr, index = trainDF2.columns)

w = Window().orderBy(lit('A'))

trainDF3 = trainDF2.select([c for c in trainDF2.columns if abs(train_corr[c]) > 0.15]).withColumn("ID_temp", row_number().over(w))
testDF3 = testDF2.select([c for c in trainDF2.columns if (abs(train_corr[c]) > 0.15
                        and c != "TARGET") or c == "ID"]).withColumn("ID_temp", row_number().over(w))
testDF3.toPandas()

+----+-----+--------+---------+--------+--------+---------+---------+---------+-----+-------------------+-----+-------+
|  ID|var15|ind_var5|ind_var30|num_var4|num_var5|num_var30|num_var35|num_var42|var36|num_meses_var5_ult3|Zeros|ID_temp|
+----+-----+--------+---------+--------+--------+---------+---------+---------+-----+-------------------+-----+-------+
| 2.0| 32.0|     1.0|      1.0|     1.0|     3.0|      3.0|      3.0|      3.0|  3.0|                3.0| 32.0|      1|
| 5.0| 35.0|     1.0|      1.0|     1.0|     3.0|      3.0|      3.0|      3.0|  3.0|                3.0| 29.0|      2|
| 6.0| 23.0|     1.0|      1.0|     2.0|     3.0|      3.0|      6.0|      3.0|  3.0|                3.0| 48.0|      3|
| 7.0| 24.0|     0.0|      0.0|     0.0|     0.0|      0.0|      0.0|      0.0| 99.0|                0.0| 15.0|      4|
| 9.0| 23.0|     1.0|      1.0|     1.0|     3.0|      3.0|      3.0|      3.0|  3.0|                3.0| 30.0|      5|
|11.0| 43.0|     0.0|      0.0|     0.0|

In [11]:
# Applying PCA to the low correlation variables

trainPCA = trainDF2.select([c for c in trainDF2.columns if 
                            (abs(train_corr[c]) <= 0.15 and train_corr[c] > 0.03)])
vecAssembler = VectorAssembler(inputCols=trainPCA.columns, outputCol="features")
train_vector = vecAssembler.transform(trainPCA)
pca = PCA(k=4, inputCol="features", outputCol="pcaFeatures")
train_model = pca.fit(train_vector)
trainPCA = train_model.transform(train_vector).select("pcaFeatures")
trainPCA = sqlContext.createDataFrame([row.pcaFeatures.toArray().tolist() for row in trainPCA.collect()], 
                                      ["PCA0","PCA1","PCA2","PCA3"]).withColumn("ID_temp", row_number().over(w))

testPCA = testDF2.select([c for c in trainDF2.columns if 
                          (abs(train_corr[c]) <= 0.15 and train_corr[c] > 0.03 and c != "TARGET")])
vecAssembler = VectorAssembler(inputCols=testPCA.columns, outputCol="features")
test_vector = vecAssembler.transform(testPCA)
test_model = pca.fit(test_vector)
testPCA = test_model.transform(test_vector).select("pcaFeatures")
testPCA = sqlContext.createDataFrame([row.pcaFeatures.toArray().tolist() for row in testPCA.collect()], 
                                      ["PCA0","PCA1","PCA2","PCA3"]).withColumn("ID_temp", row_number().over(w))

print("The explained variance ratio on train data is: " + str(sum(train_model.explainedVariance)))
print("The explained variance ratio on test data is: " + str(sum(test_model.explainedVariance)))

trainPCA.show(10)

The explained variance ratio on train data is: 0.9993754116664153
The explained variance ratio on test data is: 0.9985690332592299
+--------------------+--------------------+--------------------+--------------------+-------+
|                PCA0|                PCA1|                PCA2|                PCA3|ID_temp|
+--------------------+--------------------+--------------------+--------------------+-------+
|5.730756982406865E-7|-2.06264472058925...|-2.52694897074928...|-1.17289877545407...|      1|
|5.730756982406865E-7|-2.06264472058925...|-2.52694897074928...|-1.17289877545407...|      2|
|-7.85541762425793...|-0.00187603504195...|5.151522965696654E-4|0.006647019048736...|      3|
|5.730756982406865E-7|-2.06264472058925...|-2.52694897074928...|-1.17289877545407...|      4|
|  -952.9135165006217|  -494.2469384776781| -140.63734106636034|  -11.59890753569684|      5|
|5.730756982406865E-7|-2.06264472058925...|-2.52694897074928...|-1.17289877545407...|      6|
| -1669.6743350929253| 

In [15]:
# Merging PCA to dataframe

trainDF4 = trainDF3.join(trainPCA, on = "ID_temp").drop("ID_temp")
testDF4 = testDF3.join(testPCA, on = "ID_temp").drop("ID_temp")
testDF4.toPandas()

Unnamed: 0,ID,var15,ind_var5,ind_var30,num_var4,num_var5,num_var30,num_var35,num_var42,var36,num_meses_var5_ult3,Zeros,PCA0,PCA1,PCA2,PCA3
0,2.0,32.0,1.0,1.0,1.0,3.0,3.0,3.0,3.0,3.0,3.0,32.0,0.000002,0.000007,0.000014,0.000052
1,5.0,35.0,1.0,1.0,1.0,3.0,3.0,3.0,3.0,3.0,3.0,29.0,0.000002,0.000007,0.000014,0.000052
2,6.0,23.0,1.0,1.0,2.0,3.0,3.0,6.0,3.0,3.0,3.0,48.0,-145.550378,2.958999,20.252669,0.084338
3,7.0,24.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,99.0,0.0,15.0,0.000002,0.000007,0.000014,0.000052
4,9.0,23.0,1.0,1.0,1.0,3.0,3.0,3.0,3.0,3.0,3.0,30.0,0.000002,0.000007,0.000014,0.000052
...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
75813,151831.0,23.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,99.0,0.0,17.0,0.000002,0.000007,0.000014,0.000052
75814,151832.0,26.0,1.0,1.0,1.0,3.0,3.0,3.0,3.0,3.0,3.0,30.0,0.000002,0.000007,0.000014,0.000052
75815,151833.0,24.0,1.0,1.0,1.0,3.0,3.0,3.0,3.0,99.0,3.0,30.0,0.000002,0.000007,0.000014,0.000052
75816,151834.0,40.0,1.0,1.0,1.0,3.0,3.0,3.0,3.0,3.0,3.0,30.0,0.000002,0.000007,0.000014,0.000052


In [16]:
# Saving data

trainDF4.write.csv("../dataset/train_2.csv", header = "true", mode = "overwrite")
testDF4.write.csv("../dataset/test_2.csv", header = "true", mode = "overwrite")