In [1]:
import pyspark 
import pandas as pd 

In [15]:
from pyspark.sql import SQLContext,SparkSession
from pyspark import SparkConf, SparkContext
from pyspark import SparkFiles
from pyspark.sql.types import *
import pyspark.sql.functions as F
from pyspark.sql.functions import udf, col
spark = SparkSession.builder.appName("titanic-survival-prediction").getOrCreate()

In [12]:
titanicpath = '../../home/nasir/Desktop/Machine Learning/data/titanic/train.csv'

In [16]:
sc = spark.sparkContext
sc.addFile(titanicpath)

23/01/21 19:40:01 WARN SparkContext: The path ../../home/nasir/Desktop/Machine Learning/data/titanic/train.csv has been added already. Overwriting of added paths is not supported in the current version.


In [17]:
sqlContext = SQLContext(spark.sparkContext)
sqlContext

<pyspark.sql.context.SQLContext at 0x7fd7e5ec04c0>

In [18]:
traindf = sqlContext.read.csv(SparkFiles.get('train.csv'),header=True,inferSchema=True)

                                                                                

In [19]:
traindf.printSchema()

root
 |-- PassengerId: integer (nullable = true)
 |-- Survived: integer (nullable = true)
 |-- Pclass: integer (nullable = true)
 |-- Name: string (nullable = true)
 |-- Sex: string (nullable = true)
 |-- Age: double (nullable = true)
 |-- SibSp: integer (nullable = true)
 |-- Parch: integer (nullable = true)
 |-- Ticket: string (nullable = true)
 |-- Fare: double (nullable = true)
 |-- Cabin: string (nullable = true)
 |-- Embarked: string (nullable = true)



In [21]:
traindf.show(10)

+-----------+--------+------+--------------------+------+----+-----+-----+----------------+-------+-----+--------+
|PassengerId|Survived|Pclass|                Name|   Sex| Age|SibSp|Parch|          Ticket|   Fare|Cabin|Embarked|
+-----------+--------+------+--------------------+------+----+-----+-----+----------------+-------+-----+--------+
|          1|       0|     3|Braund, Mr. Owen ...|  male|22.0|    1|    0|       A/5 21171|   7.25| null|       S|
|          2|       1|     1|Cumings, Mrs. Joh...|female|38.0|    1|    0|        PC 17599|71.2833|  C85|       C|
|          3|       1|     3|Heikkinen, Miss. ...|female|26.0|    0|    0|STON/O2. 3101282|  7.925| null|       S|
|          4|       1|     1|Futrelle, Mrs. Ja...|female|35.0|    1|    0|          113803|   53.1| C123|       S|
|          5|       0|     3|Allen, Mr. Willia...|  male|35.0|    0|    0|          373450|   8.05| null|       S|
|          6|       0|     3|    Moran, Mr. James|  male|null|    0|    0|      

In [22]:
from pyspark.ml.feature import StringIndexer

In [23]:
indexer = StringIndexer(inputCol='Sex',outputCol='Sex-index')
indexed = indexer.fit(traindf).transform(traindf)
indexed.show(5)

                                                                                

+-----------+--------+------+--------------------+------+----+-----+-----+----------------+-------+-----+--------+---------+
|PassengerId|Survived|Pclass|                Name|   Sex| Age|SibSp|Parch|          Ticket|   Fare|Cabin|Embarked|Sex-index|
+-----------+--------+------+--------------------+------+----+-----+-----+----------------+-------+-----+--------+---------+
|          1|       0|     3|Braund, Mr. Owen ...|  male|22.0|    1|    0|       A/5 21171|   7.25| null|       S|      0.0|
|          2|       1|     1|Cumings, Mrs. Joh...|female|38.0|    1|    0|        PC 17599|71.2833|  C85|       C|      1.0|
|          3|       1|     3|Heikkinen, Miss. ...|female|26.0|    0|    0|STON/O2. 3101282|  7.925| null|       S|      1.0|
|          4|       1|     1|Futrelle, Mrs. Ja...|female|35.0|    1|    0|          113803|   53.1| C123|       S|      1.0|
|          5|       0|     3|Allen, Mr. Willia...|  male|35.0|    0|    0|          373450|   8.05| null|       S|      0.0|


In [28]:
df = indexed.select('Sex-index','Survived')

In [29]:
df.show(5)

+---------+--------+
|Sex-index|Survived|
+---------+--------+
|      0.0|       0|
|      1.0|       1|
|      1.0|       1|
|      1.0|       1|
|      0.0|       0|
+---------+--------+
only showing top 5 rows



In [33]:
from pyspark.ml.feature import OneHotEncoder
encoder = OneHotEncoder(inputCol = 'Sex-index',outputCol = 'features')
encoded = encoder.fit(df).transform(df)

In [34]:
train_data,test_data = encoded.randomSplit([.8,.2],seed= 1234)

In [36]:
train_data.show(5)

+---------+--------+-------------+
|Sex-index|Survived|     features|
+---------+--------+-------------+
|      0.0|       0|(1,[0],[1.0])|
|      0.0|       0|(1,[0],[1.0])|
|      0.0|       0|(1,[0],[1.0])|
|      0.0|       0|(1,[0],[1.0])|
|      0.0|       0|(1,[0],[1.0])|
+---------+--------+-------------+
only showing top 5 rows



In [43]:
from pyspark.ml.classification import LogisticRegression
lr = LogisticRegression(maxIter=10, regParam=0.3, elasticNetParam=0.8,featuresCol='features',labelCol='Survived')
lrmodel = lr.fit(train_data)

In [44]:
print("Coefficients: " + str(lrmodel.coefficients))
print("Intercept: " + str(lrmodel.intercept))

Coefficients: [-0.1538246623870983]
Intercept: -0.32828634931482414


In [45]:
predictions = lrmodel.transform(test_data)

In [47]:
predictions.show(5)

+---------+--------+-------------+--------------------+--------------------+----------+
|Sex-index|Survived|     features|       rawPrediction|         probability|prediction|
+---------+--------+-------------+--------------------+--------------------+----------+
|      0.0|       0|(1,[0],[1.0])|[0.48211101170192...|[0.61824623538093...|       0.0|
|      0.0|       0|(1,[0],[1.0])|[0.48211101170192...|[0.61824623538093...|       0.0|
|      0.0|       0|(1,[0],[1.0])|[0.48211101170192...|[0.61824623538093...|       0.0|
|      0.0|       0|(1,[0],[1.0])|[0.48211101170192...|[0.61824623538093...|       0.0|
|      0.0|       0|(1,[0],[1.0])|[0.48211101170192...|[0.61824623538093...|       0.0|
+---------+--------+-------------+--------------------+--------------------+----------+
only showing top 5 rows



In [50]:
predicted = predictions.select('Survived','prediction')

In [51]:
predicted.show(5)

+--------+----------+
|Survived|prediction|
+--------+----------+
|       0|       0.0|
|       0|       0.0|
|       0|       0.0|
|       0|       0.0|
|       0|       0.0|
+--------+----------+
only showing top 5 rows



In [2]:
spark.stop()