In [100]:
!pip install pyspark==3.2



In [101]:
import numpy as np # linear algebra
import pandas as pd # data processing, CSV file I/O (e.g. pd.read_csv)
import matplotlib.pyplot as plt   # data visualization
import seaborn as sns

In [102]:
from pyspark import SparkConf, SparkContext
from pyspark.sql import SparkSession, SQLContext
spark = SparkSession.builder.master("local[2]").appName("ml").getOrCreate()
sc = spark.sparkContext

In [103]:
irisdata = spark.read.csv("/content/TitanicData1 (1).csv",inferSchema=True, header = True)

In [104]:
# Summary statistics
print(irisdata.describe())

DataFrame[summary: string, PassengerId: string, Pclass: string, Name: string, Sex: string, Age: string, SibSp: string, Parch: string, Ticket: string, Fare: string, Cabin: string, Embarked: string, Survived: string]


In [105]:
#checking data's in the correct formate

irisdata.dtypes


[('PassengerId', 'int'),
 ('Pclass', 'int'),
 ('Name', 'string'),
 ('Sex', 'string'),
 ('Age', 'double'),
 ('SibSp', 'int'),
 ('Parch', 'int'),
 ('Ticket', 'string'),
 ('Fare', 'double'),
 ('Cabin', 'string'),
 ('Embarked', 'string'),
 ('Survived', 'int')]

In [106]:
# Drop rows with missing close
irisdata =irisdata.dropna()

In [107]:
irisdata.show()
# check data type
print(irisdata.printSchema())

+-----------+------+--------------------+------+----+-----+-----+-----------+--------+-----------+--------+--------+
|PassengerId|Pclass|                Name|   Sex| Age|SibSp|Parch|     Ticket|    Fare|      Cabin|Embarked|Survived|
+-----------+------+--------------------+------+----+-----+-----+-----------+--------+-----------+--------+--------+
|          2|     1|Cumings, Mrs. Joh...|female|38.0|    1|    0|   PC 17599| 71.2833|        C85|       C|       1|
|          4|     1|Futrelle, Mrs. Ja...|female|35.0|    1|    0|     113803|    53.1|       C123|       S|       1|
|          7|     1|McCarthy, Mr. Tim...|  male|54.0|    0|    0|      17463| 51.8625|        E46|       S|       0|
|         11|     3|Sandstrom, Miss. ...|female| 4.0|    1|    1|    PP 9549|    16.7|         G6|       S|       1|
|         12|     1|Bonnell, Miss. El...|female|58.0|    0|    0|     113783|   26.55|       C103|       S|       1|
|         22|     2|Beesley, Mr. Lawr...|  male|34.0|    0|    0

In [108]:
# Display the DataFrame
irisdata.show()





+-----------+------+--------------------+------+----+-----+-----+-----------+--------+-----------+--------+--------+
|PassengerId|Pclass|                Name|   Sex| Age|SibSp|Parch|     Ticket|    Fare|      Cabin|Embarked|Survived|
+-----------+------+--------------------+------+----+-----+-----+-----------+--------+-----------+--------+--------+
|          2|     1|Cumings, Mrs. Joh...|female|38.0|    1|    0|   PC 17599| 71.2833|        C85|       C|       1|
|          4|     1|Futrelle, Mrs. Ja...|female|35.0|    1|    0|     113803|    53.1|       C123|       S|       1|
|          7|     1|McCarthy, Mr. Tim...|  male|54.0|    0|    0|      17463| 51.8625|        E46|       S|       0|
|         11|     3|Sandstrom, Miss. ...|female| 4.0|    1|    1|    PP 9549|    16.7|         G6|       S|       1|
|         12|     1|Bonnell, Miss. El...|female|58.0|    0|    0|     113783|   26.55|       C103|       S|       1|
|         22|     2|Beesley, Mr. Lawr...|  male|34.0|    0|    0

In [109]:
from pyspark.ml.feature import VectorAssembler,StringIndexer

In [110]:
Assembler_features = VectorAssembler(inputCols=[
'Pclass','Age','SibSp','Parch','Fare','Survived'],outputCol='features')

In [111]:
species_to_num = StringIndexer(inputCol='Embarked',outputCol='label')


In [112]:
#Cleaning drop missing values
irisdata.na.drop().show(truncate=True)

+-----------+------+--------------------+------+----+-----+-----+-----------+--------+-----------+--------+--------+
|PassengerId|Pclass|                Name|   Sex| Age|SibSp|Parch|     Ticket|    Fare|      Cabin|Embarked|Survived|
+-----------+------+--------------------+------+----+-----+-----+-----------+--------+-----------+--------+--------+
|          2|     1|Cumings, Mrs. Joh...|female|38.0|    1|    0|   PC 17599| 71.2833|        C85|       C|       1|
|          4|     1|Futrelle, Mrs. Ja...|female|35.0|    1|    0|     113803|    53.1|       C123|       S|       1|
|          7|     1|McCarthy, Mr. Tim...|  male|54.0|    0|    0|      17463| 51.8625|        E46|       S|       0|
|         11|     3|Sandstrom, Miss. ...|female| 4.0|    1|    1|    PP 9549|    16.7|         G6|       S|       1|
|         12|     1|Bonnell, Miss. El...|female|58.0|    0|    0|     113783|   26.55|       C103|       S|       1|
|         22|     2|Beesley, Mr. Lawr...|  male|34.0|    0|    0

In [113]:
from pyspark.ml import Pipeline
data_prep_pipe = Pipeline(stages=[Assembler_features,species_to_num])
cleaner = data_prep_pipe.fit(irisdata)
clean_data = cleaner.transform(irisdata)

In [114]:
clean_data.show()

+-----------+------+--------------------+------+----+-----+-----+-----------+--------+-----------+--------+--------+--------------------+-----+
|PassengerId|Pclass|                Name|   Sex| Age|SibSp|Parch|     Ticket|    Fare|      Cabin|Embarked|Survived|            features|label|
+-----------+------+--------------------+------+----+-----+-----+-----------+--------+-----------+--------+--------+--------------------+-----+
|          2|     1|Cumings, Mrs. Joh...|female|38.0|    1|    0|   PC 17599| 71.2833|        C85|       C|       1|[1.0,38.0,1.0,0.0...|  1.0|
|          4|     1|Futrelle, Mrs. Ja...|female|35.0|    1|    0|     113803|    53.1|       C123|       S|       1|[1.0,35.0,1.0,0.0...|  0.0|
|          7|     1|McCarthy, Mr. Tim...|  male|54.0|    0|    0|      17463| 51.8625|        E46|       S|       0|[1.0,54.0,0.0,0.0...|  0.0|
|         11|     3|Sandstrom, Miss. ...|female| 4.0|    1|    1|    PP 9549|    16.7|         G6|       S|       1|[3.0,4.0,1.0,1.0,...

In [115]:
# Step 5.1 Split the data into training and test sets (70/30%) using randomSplit
(training,testing) = clean_data.randomSplit([0.7,0.3])
# Step 5.2 import NaiveBayes and Train Naive bayes model using training data
from pyspark.ml.classification import NaiveBayes
NB = NaiveBayes()
# Train Naive bayes model using training data
species_predictor = NB.fit(training)
# Step 5.3 Test the model on testing data
test_results = species_predictor.transform(testing)
test_results.show()


+-----------+------+--------------------+------+----+-----+-----+-----------------+-------+-------+--------+--------+--------------------+-----+--------------------+--------------------+----------+
|PassengerId|Pclass|                Name|   Sex| Age|SibSp|Parch|           Ticket|   Fare|  Cabin|Embarked|Survived|            features|label|       rawPrediction|         probability|prediction|
+-----------+------+--------------------+------+----+-----+-----+-----------------+-------+-------+--------+--------+--------------------+-----+--------------------+--------------------+----------+
|          7|     1|McCarthy, Mr. Tim...|  male|54.0|    0|    0|            17463|51.8625|    E46|       S|       0|[1.0,54.0,0.0,0.0...|  0.0|[-87.773775159878...|[0.99630225845360...|       0.0|
|         98|     1|Greenfield, Mr. W...|  male|23.0|    0|    1|         PC 17759|63.3583|D10 D12|       C|       1|[1.0,23.0,0.0,1.0...|  1.0|[-68.886440458751...|[0.44602699775527...|       1.0|
|        1

In [116]:
# Step 5.4 import multiclassmetrics for precision, recall, accuracy and confusion matrix calculation
# extract prediction and ground truth label from test_results
PredicationAndLabel = test_results['prediction','label']
PredicationAndLabel.show()
# import multiclassmetrics for precision, recall, and confusion matrix calculation
from pyspark.mllib.evaluation import MulticlassMetrics
multi_metrics = MulticlassMetrics(PredicationAndLabel.rdd)
precision_score = multi_metrics.weightedPrecision
recall_score = multi_metrics.weightedRecall
accuracy_score = multi_metrics.accuracy
print(recall_score)
print(precision_score)
print(accuracy_score)
multi_metrics.confusionMatrix().toArray()



+----------+-----+
|prediction|label|
+----------+-----+
|       0.0|  0.0|
|       1.0|  1.0|
|       0.0|  0.0|
|       0.0|  0.0|
|       0.0|  0.0|
|       0.0|  0.0|
|       0.0|  1.0|
|       0.0|  0.0|
|       0.0|  1.0|
|       0.0|  0.0|
|       0.0|  1.0|
|       1.0|  1.0|
|       0.0|  0.0|
|       0.0|  0.0|
|       0.0|  1.0|
|       1.0|  0.0|
|       1.0|  1.0|
|       0.0|  0.0|
|       0.0|  0.0|
|       0.0|  0.0|
+----------+-----+
only showing top 20 rows

0.6491228070175439
0.6491228070175439
0.6491228070175439


array([[28., 10.],
       [10.,  9.]])

In [117]:
#Step 6 Logistic Regression algorithm Training and Evaluation (Your Task)
# Step 6.1 Split the data into training and test sets (70/30%) using randomSplit
(training,testing) = clean_data.randomSplit([0.7,0.3])
# Step 6.2 import logistic regression and Train the model using training data
from pyspark.ml.classification import LogisticRegression
lr = LogisticRegression()
# Train logistic regression model using training data
spam_predictor_lr = lr.fit(training)
# Step 6.3 Test the model on testing data
test_results = spam_predictor_lr.transform(testing)
test_results.show()
# Step 6.4 import multiclassmetrics for precision, recall, accuracy and confusion matrix calculation
# extract prediction and ground truth label from test_results
PredicationAndLabel = test_results['prediction','label']
PredicationAndLabel.show()
# import multiclassmetrics for precision, recall, and confusion matrix calculation
from pyspark.mllib.evaluation import MulticlassMetrics
multi_metrics = MulticlassMetrics(PredicationAndLabel.rdd)
precision_score = multi_metrics.weightedPrecision
recall_score = multi_metrics.weightedRecall
accuracy_score = multi_metrics.accuracy
print(recall_score)
print(precision_score)
print(accuracy_score)
multi_metrics.confusionMatrix().toArray()


+-----------+------+--------------------+------+----+-----+-----+-------------+--------+---------------+--------+--------+--------------------+-----+--------------------+--------------------+----------+
|PassengerId|Pclass|                Name|   Sex| Age|SibSp|Parch|       Ticket|    Fare|          Cabin|Embarked|Survived|            features|label|       rawPrediction|         probability|prediction|
+-----------+------+--------------------+------+----+-----+-----+-------------+--------+---------------+--------+--------+--------------------+-----+--------------------+--------------------+----------+
|         11|     3|Sandstrom, Miss. ...|female| 4.0|    1|    1|      PP 9549|    16.7|             G6|       S|       1|[3.0,4.0,1.0,1.0,...|  0.0|[81.4395485551614...|[1.0,4.7803135067...|       0.0|
|         24|     1|Sloper, Mr. Willi...|  male|28.0|    0|    0|       113788|    35.5|             A6|       S|       1|[1.0,28.0,0.0,0.0...|  0.0|[14.0671811437204...|[0.59985966118274.

array([[29.,  3.],
       [14.,  6.]])