In [1]:
from pyspark.sql import SparkSession
from pyspark.sql.dataframe import DataFrame
from pyspark.sql import functions as f
from pyspark.sql import types as t
import pandas as pd
import numpy as np
import matrix_multiplier as mp
import pca

In [2]:
spark = SparkSession.builder.appName('lol').getOrCreate()

In [3]:
#loading titanic dataset as Spark DataFrame
pdf = pd.read_csv('./train.csv')
converter = {'int64':t.IntegerType(),'float64':t.FloatType(),'object':t.StringType()}
schema = t.StructType([t.StructField(c,converter[str(i[0])],False) for c, i in pdf.dtypes.to_frame().iterrows()])
df = spark.createDataFrame(pdf, schema=schema)

In [4]:
selected_cols = ['Age', 'Parch', 'Pclass']
df_new = df.select(selected_cols)

In [5]:
m = np.array([[1, 0],
          [0, 1]])

In [6]:
n = np.array([1,0])

In [7]:
print(mp.MatrixMultiplier(df_new.fillna(0), m, n, activation='relu', df_cols=['Age', 'Pclass']))

select
	Parch,
	case when 1*Age + 0*Pclass + 1 > 0 then 1*Age + 0*Pclass + 1 else 0 end as col_0,
	case when 0*Age + 1*Pclass + 0 > 0 then 0*Age + 1*Pclass + 0 else 0 end as col_1
from
	table


In [9]:
m2 = np.array([[0, 1],
             [1, 0],
             [0,1]])

In [10]:
print(mp.MatrixMultiplier(
        spark.sql(str(mp.MatrixMultiplier(df_new.fillna(0),
                                      m,
                                      n,
                                      activation='relu',
                                      df_cols=['Age', 'Pclass']))),
        m2,
        activation='relu'))

select
	case when 0*Parch + 1*col_0 + 0*col_1 + 0.0 > 0 then 0*Parch + 1*col_0 + 0*col_1 + 0.0 else 0 end as col_0,
	case when 1*Parch + 0*col_0 + 1*col_1 + 0.0 > 0 then 1*Parch + 0*col_0 + 1*col_1 + 0.0 else 0 end as col_1
from
	table


In [11]:
df_new.fillna(0).show()

+----+-----+------+
| Age|Parch|Pclass|
+----+-----+------+
|22.0|    0|     3|
|38.0|    0|     1|
|26.0|    0|     3|
|35.0|    0|     1|
|35.0|    0|     3|
| 0.0|    0|     3|
|54.0|    0|     1|
| 2.0|    1|     3|
|27.0|    2|     3|
|14.0|    0|     2|
| 4.0|    1|     3|
|58.0|    0|     1|
|20.0|    0|     3|
|39.0|    5|     3|
|14.0|    0|     3|
|55.0|    0|     2|
| 2.0|    1|     3|
| 0.0|    0|     2|
|31.0|    0|     3|
| 0.0|    0|     3|
+----+-----+------+
only showing top 20 rows



Successive multiplications, something essential in Neural Nets, seem to be working.

In [12]:
spark.sql(str(mp.MatrixMultiplier(
        spark.sql(str(mp.MatrixMultiplier(df_new.fillna(0),
                                      m,
                                      n,
                                      activation='relu',
                                      df_cols=['Age', 'Pclass']))),
        m2,
        activation='relu'))).show()

+-----+-----+
|col_0|col_1|
+-----+-----+
| 23.0|  3.0|
| 39.0|  1.0|
| 27.0|  3.0|
| 36.0|  1.0|
| 36.0|  3.0|
|  1.0|  3.0|
| 55.0|  1.0|
|  3.0|  4.0|
| 28.0|  5.0|
| 15.0|  2.0|
|  5.0|  4.0|
| 59.0|  1.0|
| 21.0|  3.0|
| 40.0|  8.0|
| 15.0|  3.0|
| 56.0|  2.0|
|  3.0|  4.0|
|  1.0|  2.0|
| 32.0|  3.0|
|  1.0|  3.0|
+-----+-----+
only showing top 20 rows



Code bellow uses the my PCA implementation which also transforms Spark Dataframes:

In [69]:
pca2 = pca.PCA_enhanced(n_components=2)
pca2.fit(pdf[selected_cols].fillna(0))

In [70]:
pca2.df_cols

['Age', 'Parch', 'Pclass']

In [71]:
pca2.transform(df_new.fillna(0)).show()

+---------------------+---------------------+
|principal_component_0|principal_component_1|
+---------------------+---------------------+
|   -1.810059623879277|  -0.3770845059197297|
|   14.221934644472002| -0.36346856543709183|
|    2.189338571502903| -0.36724077345834105|
|   11.222385997935366|  -0.3708513647831333|
|   11.187984511112807| -0.34509237542021665|
|  -23.806749698481266| -0.43122503445736715|
|   30.219527426000724|  -0.3240936355915373|
|  -21.809289819283187|   0.5736108592901694|
|   3.1847096833624273|   1.6350482146906906|
|   -9.791655271232358| -0.40965146552396525|
|  -19.809590721592098|   0.5785327255208638|
|   34.218925621382894| -0.31424990313014867|
|   -3.809758721570367|   -0.382006372150424|
|   15.176186614029936|   4.6643214946253835|
|   -9.808856014643636|  -0.3967719708425069|
|   31.202176231434986|  -0.3087532077947318|
|  -21.809289819283187|   0.5736108592901694|
|  -23.789548955069986|  -0.4441045291388255|
|    7.188586315730628|  -0.354936

In [72]:
pca2.transform(df.select(['Sex']+selected_cols).fillna(0)).show()

+------+---------------------+---------------------+
|   Sex|principal_component_0|principal_component_1|
+------+---------------------+---------------------+
|  male|   -1.810059623879277|  -0.3770845059197297|
|female|   14.221934644472002| -0.36346856543709183|
|female|    2.189338571502903| -0.36724077345834105|
|female|   11.222385997935366|  -0.3708513647831333|
|  male|   11.187984511112807| -0.34509237542021665|
|  male|  -23.806749698481266| -0.43122503445736715|
|  male|   30.219527426000724|  -0.3240936355915373|
|  male|  -21.809289819283187|   0.5736108592901694|
|female|   3.1847096833624273|   1.6350482146906906|
|female|   -9.791655271232358| -0.40965146552396525|
|female|  -19.809590721592098|   0.5785327255208638|
|female|   34.218925621382894| -0.31424990313014867|
|  male|   -3.809758721570367|   -0.382006372150424|
|  male|   15.176186614029936|   4.6643214946253835|
|female|   -9.808856014643636|  -0.3967719708425069|
|female|   31.202176231434986|  -0.30875320779

Comparing it with the output from the standard PCA, we have a match!

In [67]:
pca2.fit_transform(pdf[selected_cols].fillna(0))

array([[ -1.81005962,  -0.37708451],
       [ 14.22193464,  -0.36346857],
       [  2.18933857,  -0.36724077],
       ...,
       [-23.81122814,   1.56860302],
       [  2.22374006,  -0.39299976],
       [  8.18843586,  -0.35247517]])