In [1]:
from pyspark import SparkContext, SparkConf, SQLContext
from pyspark.ml import Pipeline
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.clustering import KMeans
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.stat import Correlation, Summarizer
from pyspark.sql.functions import col, when, lit


In [2]:
if sc:
    print(sc.appName)
else:
    print('Could not initialise pyspark session')

PySparkShell


In [3]:
shipData = sqlContext.read.option('header', True) \
    .option('delimiter', ',') \
    .option('inferSchema', True) \
    .csv('file:///home/ckkhandare/spark-3.1.2-bin-hadoop3.2/datasets/train.csv')

In [4]:
shipData.printSchema()

root
 |-- PassengerId: integer (nullable = true)
 |-- Survived: integer (nullable = true)
 |-- Pclass: integer (nullable = true)
 |-- Name: string (nullable = true)
 |-- Sex: string (nullable = true)
 |-- Age: double (nullable = true)
 |-- SibSp: integer (nullable = true)
 |-- Parch: integer (nullable = true)
 |-- Ticket: string (nullable = true)
 |-- Fare: double (nullable = true)
 |-- Cabin: string (nullable = true)
 |-- Embarked: string (nullable = true)



In [5]:
shipData.show()

+-----------+--------+------+--------------------+------+----+-----+-----+----------------+-------+-----+--------+
|PassengerId|Survived|Pclass|                Name|   Sex| Age|SibSp|Parch|          Ticket|   Fare|Cabin|Embarked|
+-----------+--------+------+--------------------+------+----+-----+-----+----------------+-------+-----+--------+
|          1|       0|     3|Braund, Mr. Owen ...|  male|22.0|    1|    0|       A/5 21171|   7.25| null|       S|
|          2|       1|     1|Cumings, Mrs. Joh...|female|38.0|    1|    0|        PC 17599|71.2833|  C85|       C|
|          3|       1|     3|Heikkinen, Miss. ...|female|26.0|    0|    0|STON/O2. 3101282|  7.925| null|       S|
|          4|       1|     1|Futrelle, Mrs. Ja...|female|35.0|    1|    0|          113803|   53.1| C123|       S|
|          5|       0|     3|Allen, Mr. Willia...|  male|35.0|    0|    0|          373450|   8.05| null|       S|
|          6|       0|     3|    Moran, Mr. James|  male|null|    0|    0|      

In [6]:
shipData = shipData.drop('Pclass').drop('Name')

In [7]:
shipData = shipData.withColumn('Sex', when(col('Sex') == 'male', 1.0).otherwise(0.0))
shipData.printSchema()

root
 |-- PassengerId: integer (nullable = true)
 |-- Survived: integer (nullable = true)
 |-- Sex: double (nullable = false)
 |-- Age: double (nullable = true)
 |-- SibSp: integer (nullable = true)
 |-- Parch: integer (nullable = true)
 |-- Ticket: string (nullable = true)
 |-- Fare: double (nullable = true)
 |-- Cabin: string (nullable = true)
 |-- Embarked: string (nullable = true)



In [8]:
shipData.show()

+-----------+--------+---+----+-----+-----+----------------+-------+-----+--------+
|PassengerId|Survived|Sex| Age|SibSp|Parch|          Ticket|   Fare|Cabin|Embarked|
+-----------+--------+---+----+-----+-----+----------------+-------+-----+--------+
|          1|       0|1.0|22.0|    1|    0|       A/5 21171|   7.25| null|       S|
|          2|       1|0.0|38.0|    1|    0|        PC 17599|71.2833|  C85|       C|
|          3|       1|0.0|26.0|    0|    0|STON/O2. 3101282|  7.925| null|       S|
|          4|       1|0.0|35.0|    1|    0|          113803|   53.1| C123|       S|
|          5|       0|1.0|35.0|    0|    0|          373450|   8.05| null|       S|
|          6|       0|1.0|null|    0|    0|          330877| 8.4583| null|       Q|
|          7|       0|1.0|54.0|    0|    0|           17463|51.8625|  E46|       S|
|          8|       0|1.0| 2.0|    3|    1|          349909| 21.075| null|       S|
|          9|       1|0.0|27.0|    0|    2|          347742|11.1333| null|  

In [9]:
assembler = VectorAssembler(inputCols=['PassengerId', 'Survived', 'Sex', 'Age', 'SibSp', 'Parch'], outputCol='features')\
    .setHandleInvalid('skip')
vectorDF = assembler.transform(shipData)
vectorDF.printSchema()

root
 |-- PassengerId: integer (nullable = true)
 |-- Survived: integer (nullable = true)
 |-- Sex: double (nullable = false)
 |-- Age: double (nullable = true)
 |-- SibSp: integer (nullable = true)
 |-- Parch: integer (nullable = true)
 |-- Ticket: string (nullable = true)
 |-- Fare: double (nullable = true)
 |-- Cabin: string (nullable = true)
 |-- Embarked: string (nullable = true)
 |-- features: vector (nullable = true)



In [10]:
vectorDF.show()

+-----------+--------+---+----+-----+-----+----------------+-------+-----+--------+--------------------+
|PassengerId|Survived|Sex| Age|SibSp|Parch|          Ticket|   Fare|Cabin|Embarked|            features|
+-----------+--------+---+----+-----+-----+----------------+-------+-----+--------+--------------------+
|          1|       0|1.0|22.0|    1|    0|       A/5 21171|   7.25| null|       S|[1.0,0.0,1.0,22.0...|
|          2|       1|0.0|38.0|    1|    0|        PC 17599|71.2833|  C85|       C|[2.0,1.0,0.0,38.0...|
|          3|       1|0.0|26.0|    0|    0|STON/O2. 3101282|  7.925| null|       S|[3.0,1.0,0.0,26.0...|
|          4|       1|0.0|35.0|    1|    0|          113803|   53.1| C123|       S|[4.0,1.0,0.0,35.0...|
|          5|       0|1.0|35.0|    0|    0|          373450|   8.05| null|       S|[5.0,0.0,1.0,35.0...|
|          7|       0|1.0|54.0|    0|    0|           17463|51.8625|  E46|       S|[7.0,0.0,1.0,54.0...|
|          8|       0|1.0| 2.0|    3|    1|          34

In [11]:
df1 = Correlation.corr(vectorDF, "features")
df1.printSchema()

root
 |-- pearson(features): matrix (nullable = false)



In [12]:
r1 = df1.head()

In [13]:
print("Pearson correlation matrix:\n" + str(r1[0]))

Pearson correlation matrix:
DenseMatrix([[ 1.        ,  0.02934016,  0.02457511,  0.0368472 , -0.08239772,
              -0.01161741],
             [ 0.02934016,  1.        , -0.53882559, -0.07722109, -0.01735836,
               0.09331701],
             [ 0.02457511, -0.53882559,  1.        ,  0.09325358, -0.10394968,
              -0.24697204],
             [ 0.0368472 , -0.07722109,  0.09325358,  1.        , -0.30824676,
              -0.18911926],
             [-0.08239772, -0.01735836, -0.10394968, -0.30824676,  1.        ,
               0.38381986],
             [-0.01161741,  0.09331701, -0.24697204, -0.18911926,  0.38381986,
               1.        ]])


In [14]:
df2 = Correlation.corr(vectorDF, "features", "spearman")
df2.printSchema()

root
 |-- spearman(features): matrix (nullable = false)



In [15]:
r2 = df2.head()
print("Spearman correlation matrix:\n" + str(r2[0]))


Spearman correlation matrix:
DenseMatrix([[ 1.        ,  0.02971975,  0.0240503 ,  0.04100992, -0.0659825 ,
              -0.00540992],
             [ 0.02971975,  1.        , -0.53882559, -0.0525653 ,  0.07324381,
               0.1564441 ],
             [ 0.0240503 , -0.53882559,  1.        ,  0.08333033, -0.16488745,
              -0.25469659],
             [ 0.04100992, -0.0525653 ,  0.08333033,  1.        , -0.18206126,
              -0.25421212],
             [-0.0659825 ,  0.07324381, -0.16488745, -0.18206126,  1.        ,
               0.42695488],
             [-0.00540992,  0.1564441 , -0.25469659, -0.25421212,  0.42695488,
               1.        ]])


In [16]:
summarizer = Summarizer.metrics("mean", "count", "variance", "max", "min")
df3 = vectorDF.select(summarizer.summary(vectorDF.features))
df3.printSchema()

root
 |-- aggregate_metrics(features, 1.0): struct (nullable = false)
 |    |-- mean: vector (nullable = false)
 |    |-- count: long (nullable = false)
 |    |-- variance: vector (nullable = false)
 |    |-- max: vector (nullable = false)
 |    |-- min: vector (nullable = false)



In [17]:
df3.show()

+--------------------------------+
|aggregate_metrics(features, 1.0)|
+--------------------------------+
|            {[448.58263305322...|
+--------------------------------+



In [18]:
kMeans = KMeans().setK(2).setSeed(1)
kMeansModel = kMeans.fit(vectorDF)

In [19]:
shipTest = sqlContext.read.option('header', True) \
    .option('delimiter', ',') \
    .option('inferSchema', True) \
    .csv('file:///home/ckkhandare/spark-3.1.2-bin-hadoop3.2/datasets/test.csv')\
    .drop('Pclass').drop('Name')\
    .withColumn('Sex', when(col('Sex') == 'male', 1.0).otherwise(0.0))\
    .withColumn('Survived', lit(1))

In [20]:
vectorDFTest = assembler.transform(shipTest)
vectorDFTest.printSchema()

root
 |-- PassengerId: integer (nullable = true)
 |-- Sex: double (nullable = false)
 |-- Age: double (nullable = true)
 |-- SibSp: integer (nullable = true)
 |-- Parch: integer (nullable = true)
 |-- Ticket: string (nullable = true)
 |-- Fare: double (nullable = true)
 |-- Cabin: string (nullable = true)
 |-- Embarked: string (nullable = true)
 |-- Survived: integer (nullable = false)
 |-- features: vector (nullable = true)



In [21]:
predictions = kMeansModel.transform(vectorDFTest)
modelSummary = kMeansModel.summary

In [22]:
predictions.printSchema()
print(f'Model predictionCol = {modelSummary.predictionCol}')
predictions.show()

root
 |-- PassengerId: integer (nullable = true)
 |-- Sex: double (nullable = false)
 |-- Age: double (nullable = true)
 |-- SibSp: integer (nullable = true)
 |-- Parch: integer (nullable = true)
 |-- Ticket: string (nullable = true)
 |-- Fare: double (nullable = true)
 |-- Cabin: string (nullable = true)
 |-- Embarked: string (nullable = true)
 |-- Survived: integer (nullable = false)
 |-- features: vector (nullable = true)
 |-- prediction: integer (nullable = false)

Model predictionCol = prediction
+-----------+---+----+-----+-----+----------------+-------+-----+--------+--------+--------------------+----------+
|PassengerId|Sex| Age|SibSp|Parch|          Ticket|   Fare|Cabin|Embarked|Survived|            features|prediction|
+-----------+---+----+-----+-----+----------------+-------+-----+--------+--------+--------------------+----------+
|        892|1.0|34.5|    0|    0|          330911| 7.8292| null|       Q|       1|[892.0,1.0,1.0,34...|         1|
|        893|0.0|47.0|    1| 

In [23]:
print(f'Model cluster size = {modelSummary.clusterSizes}')
print(f'Model training Cost = {modelSummary.trainingCost}')

Model cluster size = [363, 351]
Model training Cost = 11838192.555020407


In [24]:
vectorDF = vectorDF.withColumn('Survived', shipData.Survived)
vectorDF.printSchema()

root
 |-- PassengerId: integer (nullable = true)
 |-- Survived: integer (nullable = true)
 |-- Sex: double (nullable = false)
 |-- Age: double (nullable = true)
 |-- SibSp: integer (nullable = true)
 |-- Parch: integer (nullable = true)
 |-- Ticket: string (nullable = true)
 |-- Fare: double (nullable = true)
 |-- Cabin: string (nullable = true)
 |-- Embarked: string (nullable = true)
 |-- features: vector (nullable = true)



In [25]:
lr = LogisticRegression(labelCol='Survived')
pipeline = Pipeline(stages=[assembler, lr])
pipelineModel = pipeline.fit(shipData)
pipelinePrediction = pipelineModel.transform(shipTest)
print(f'Prediction schema from Logistic Regression')
pipelinePrediction.printSchema()

Prediction schema from Logistic Regression
root
 |-- PassengerId: integer (nullable = true)
 |-- Sex: double (nullable = false)
 |-- Age: double (nullable = true)
 |-- SibSp: integer (nullable = true)
 |-- Parch: integer (nullable = true)
 |-- Ticket: string (nullable = true)
 |-- Fare: double (nullable = true)
 |-- Cabin: string (nullable = true)
 |-- Embarked: string (nullable = true)
 |-- Survived: integer (nullable = false)
 |-- features: vector (nullable = true)
 |-- rawPrediction: vector (nullable = true)
 |-- probability: vector (nullable = true)
 |-- prediction: double (nullable = false)



In [26]:
pipelinePrediction.show()

+-----------+---+----+-----+-----+----------------+-------+-----+--------+--------+--------------------+--------------------+--------------------+----------+
|PassengerId|Sex| Age|SibSp|Parch|          Ticket|   Fare|Cabin|Embarked|Survived|            features|       rawPrediction|         probability|prediction|
+-----------+---+----+-----+-----+----------------+-------+-----+--------+--------+--------------------+--------------------+--------------------+----------+
|        892|1.0|34.5|    0|    0|          330911| 7.8292| null|       Q|       1|[892.0,1.0,1.0,34...|[-17.834524704029...|[1.79706723448756...|       1.0|
|        893|0.0|47.0|    1|    0|          363272|    7.0| null|       S|       1|[893.0,1.0,0.0,47...|[-19.049483301884...|[5.33229927976653...|       1.0|
|        894|1.0|62.0|    0|    0|          240276| 9.6875| null|       Q|       1|[894.0,1.0,1.0,62...|[-15.553819219383...|[1.75817485321158...|       1.0|
|        895|1.0|27.0|    0|    0|          315154| 

In [27]:
pipeline = Pipeline(stages=[assembler, kMeansModel])
pipelineModel = pipeline.fit(shipData)
pipelinePrediction = pipelineModel.transform(shipTest)

pipelinePrediction.printSchema()

root
 |-- PassengerId: integer (nullable = true)
 |-- Sex: double (nullable = false)
 |-- Age: double (nullable = true)
 |-- SibSp: integer (nullable = true)
 |-- Parch: integer (nullable = true)
 |-- Ticket: string (nullable = true)
 |-- Fare: double (nullable = true)
 |-- Cabin: string (nullable = true)
 |-- Embarked: string (nullable = true)
 |-- Survived: integer (nullable = false)
 |-- features: vector (nullable = true)
 |-- prediction: integer (nullable = false)

