In [1]:
from pyspark.ml.feature import PCA
from pyspark.ml.linalg import Vectors
from pyspark.mllib.regression import LabeledPoint
from pyspark.mllib.util import MLUtils
import numpy as np
from pyspark.ml.feature import StandardScaler
import pyspark.sql.functions as f
import pyspark.sql.types
import pandas as pd
from pyspark.sql import Row
from pyspark.ml.feature import VectorAssembler

In [2]:
#creating dataframe in spark
df = pd.read_csv("https://raw.githubusercontent.com/venky14/Machine-Learning-with-Iris-Dataset/master/Iris.csv")
df = df.drop(['Species'], axis = 1)
spark_df = spark.createDataFrame(df)

In [3]:
spark_df.show(10)

In [4]:
#single vector column
cols = spark_df.drop('Id').columns
cols

In [5]:
assembler = VectorAssembler(inputCols=cols, outputCol = 'features')
output_dat = assembler.transform(spark_df).select('Id', 'features')
output_dat.show(5, truncate = False)

In [6]:
#centre the data
scaler = StandardScaler(inputCol="features", outputCol="scaledFeatures", withStd=False, withMean=True)

# Compute summary statistics by fitting the StandardScaler
scalerModel = scaler.fit(output_dat)

# Normalize each feature to have unit standard deviation.
scaledData = scalerModel.transform(output_dat)
scaledData.select(['Id', 'scaledFeatures']).show(5, truncate = False) # sample centered data

In [7]:
pca = PCA(k=4, inputCol = scaler.getOutputCol(), outputCol="pcaFeatures")

model = pca.fit(scaledData)
transformed_feature = model.transform(scaledData)

In [8]:
np.round(100.00*model.explainedVariance.toArray(),4)

In [9]:
pcs = np.round(model.pc.toArray(),4)
pcs

In [10]:
pcs = np.round(model.pc.toArray(),4)
df_pc = pd.DataFrame(pcs, columns = ['PC1','PC2','PC3','PC4'], index = cols)
df_pc

Unnamed: 0,PC1,PC2,PC3,PC4
SepalLengthCm,-0.3616,-0.6565,0.581,0.3173
SepalWidthCm,0.0823,-0.7297,-0.5964,-0.3241
PetalLengthCm,-0.8566,0.1758,-0.0725,-0.4797
PetalWidthCm,-0.3588,0.0747,-0.5491,0.7511


In [11]:
transformed_feature.select('pcaFeatures').rdd.map(lambda x: Row(pcaFeatures = str([x for x in list(np.round(x[0].\
                                                    toArray(),4))]))).toDF().show(truncate = False)

In [12]:
#different scale
df = pd.read_csv('https://courses.edx.org/asset-v1:MITx+15.071x_2a+2T2015+type@asset+block/climate_change.csv')
spark_df = spark.createDataFrame(df)
spark_df.printSchema()

In [13]:
spark_df.show(5)

In [14]:
cols = spark_df.drop('Temp').columns
cols

In [15]:
assembler = VectorAssembler(inputCols=cols, outputCol = 'features')
output_dat = assembler.transform(spark_df).select('Temp', 'features')
output_dat.show(5, truncate = False)

In [16]:
scaler = StandardScaler(inputCol="features", outputCol="scaledFeatures", withStd=True, withMean=True)
# Compute summary statistics by fitting the StandardScaler
scalerModel = scaler.fit(output_dat)

# Normalize each feature to have unit standard deviation.
scaledData = scalerModel.transform(output_dat)
scaledData.select(['Temp', 'scaledFeatures']).show(5, truncate = False)

In [17]:
pca = PCA(k=10, inputCol = scaler.getOutputCol(), outputCol="pcaFeatures")

model = pca.fit(scaledData)
transformed_feature = model.transform(scaledData)

In [18]:
np.round(100.00*model.explainedVariance.toArray(),4)

In [19]:
pcs = np.round(model.pc.toArray(),4)
pcs

In [20]:
pcs = np.round(model.pc.toArray(),4)
df_pc = pd.DataFrame(pcs, columns = ['PC'+str(i) for i in range(1, 11)], index = cols)
df_pc

Unnamed: 0,PC1,PC2,PC3,PC4,PC5,PC6,PC7,PC8,PC9,PC10
Year,-0.4237,0.1014,-0.127,-0.0212,0.2036,0.0916,0.1556,-0.3187,-0.0458,0.7848
Month,0.011,0.0091,-0.0696,0.9886,0.0319,0.0906,0.054,0.07,-0.0181,0.018
MEI,0.0697,-0.5125,-0.5834,-0.0822,-0.21,0.5833,-0.0286,0.001,0.0099,0.0003
CO2,-0.4128,0.1341,-0.1264,-0.0971,0.2475,0.1322,0.3007,0.7632,-0.1558,-0.1017
CH4,-0.4262,-0.092,-0.0107,0.0492,-0.0282,-0.087,-0.8778,0.1638,-0.0077,0.0389
N2O,-0.417,0.1201,-0.1153,0.014,0.2715,0.1342,0.0796,-0.3456,0.603,-0.466
CFC-11,-0.2862,-0.4596,0.138,0.0466,-0.4918,-0.3956,0.2781,0.1735,0.4,0.1409
CFC-12,-0.4192,-0.1951,0.0093,0.0357,-0.1428,-0.1381,0.1653,-0.3669,-0.6702,-0.3664
TSI,-0.0591,-0.3811,0.748,0.0033,0.2102,0.4968,0.0059,0.0053,-0.0046,0.0266
Aerosols,0.1687,-0.54,-0.1751,0.0065,0.6865,-0.4215,-0.0015,0.0063,-0.0166,0.0057


In [21]:
#Sample of PCs are shown below
transformed_feature.select('pcaFeatures').show(10, truncate = False)