# ML  

## Loading Data

In [1]:
from pyspark.sql import SparkSession

In [2]:
spark = SparkSession.builder.appName("TitanicDataSet").getOrCreate()
spark

In [3]:
df=spark.read.csv("titanic_clean.csv",header=True,inferSchema=True)

## Data Cleaning

In [4]:
age_missing=df.filter(df["Age"].isNull()) #isNull() 
age_missing.show()

+---+------+--------+----+---+---+-----+-----+------+----+-----+--------+----+----+---------+----------------+
|_c0|pclass|survived|name|sex|age|sibsp|parch|ticket|fare|cabin|embarked|boat|body|home.dest|has_cabin_number|
+---+------+--------+----+---+---+-----+-----+------+----+-----+--------+----+----+---------+----------------+
+---+------+--------+----+---+---+-----+-----+------+----+-----+--------+----+----+---------+----------------+



In [5]:
df.show()

+---+------+--------+--------------------+------+----------------+-----+-----+--------+--------+-------+--------+----+----+--------------------+----------------+
|_c0|pclass|survived|                name|   sex|             age|sibsp|parch|  ticket|    fare|  cabin|embarked|boat|body|           home.dest|has_cabin_number|
+---+------+--------+--------------------+------+----------------+-----+-----+--------+--------+-------+--------+----+----+--------------------+----------------+
|  1|     1|       1|Allen, Miss. Elis...|female|            29.0|    0|    0|   24160|211.3375|     B5|       S|   2|  NA|        St Louis, MO|               1|
|  2|     1|       1|Allison, Master. ...|  male|          0.9167|    1|    2|  113781|  151.55|C22 C26|       S|  11|  NA|Montreal, PQ / Ch...|               1|
|  3|     1|       0|Allison, Miss. He...|female|             2.0|    1|    2|  113781|  151.55|C22 C26|       S|  NA|  NA|Montreal, PQ / Ch...|               1|
|  4|     1|       0|Allison

In [6]:
df=df.replace('NA','0')

In [7]:
df.printSchema()

root
 |-- _c0: integer (nullable = true)
 |-- pclass: string (nullable = true)
 |-- survived: string (nullable = true)
 |-- name: string (nullable = true)
 |-- sex: string (nullable = true)
 |-- age: double (nullable = true)
 |-- sibsp: string (nullable = true)
 |-- parch: string (nullable = true)
 |-- ticket: string (nullable = true)
 |-- fare: string (nullable = true)
 |-- cabin: string (nullable = true)
 |-- embarked: string (nullable = true)
 |-- boat: string (nullable = true)
 |-- body: string (nullable = true)
 |-- home.dest: string (nullable = true)
 |-- has_cabin_number: integer (nullable = true)



In [8]:
df=df.drop("_c0")
df.show(5)

+------+--------+--------------------+------+------+-----+-----+------+--------+-------+--------+----+----+--------------------+----------------+
|pclass|survived|                name|   sex|   age|sibsp|parch|ticket|    fare|  cabin|embarked|boat|body|           home.dest|has_cabin_number|
+------+--------+--------------------+------+------+-----+-----+------+--------+-------+--------+----+----+--------------------+----------------+
|     1|       1|Allen, Miss. Elis...|female|  29.0|    0|    0| 24160|211.3375|     B5|       S|   2|   0|        St Louis, MO|               1|
|     1|       1|Allison, Master. ...|  male|0.9167|    1|    2|113781|  151.55|C22 C26|       S|  11|   0|Montreal, PQ / Ch...|               1|
|     1|       0|Allison, Miss. He...|female|   2.0|    1|    2|113781|  151.55|C22 C26|       S|   0|   0|Montreal, PQ / Ch...|               1|
|     1|       0|Allison, Mr. Huds...|  male|  30.0|    1|    2|113781|  151.55|C22 C26|       S|   0| 135|Montreal, PQ / Ch

In [9]:
df.printSchema()

root
 |-- pclass: string (nullable = true)
 |-- survived: string (nullable = true)
 |-- name: string (nullable = true)
 |-- sex: string (nullable = true)
 |-- age: double (nullable = true)
 |-- sibsp: string (nullable = true)
 |-- parch: string (nullable = true)
 |-- ticket: string (nullable = true)
 |-- fare: string (nullable = true)
 |-- cabin: string (nullable = true)
 |-- embarked: string (nullable = true)
 |-- boat: string (nullable = true)
 |-- body: string (nullable = true)
 |-- home.dest: string (nullable = true)
 |-- has_cabin_number: integer (nullable = true)



In [10]:
df_indexed=df

In [11]:
from pyspark.ml.feature import StringIndexer
indexer0 = StringIndexer(inputCol='pclass',outputCol='pclass_indexed',handleInvalid='keep')
df_indexed=indexer0.fit(df_indexed).transform(df_indexed)
indexer1 = StringIndexer(inputCol='Sex',outputCol='Gender',handleInvalid='keep')
df_indexed=indexer1.fit(df_indexed).transform(df_indexed)
indexer2 = StringIndexer(inputCol='sibsp',outputCol='SiblingsOrSpouses',handleInvalid='keep')
df_indexed=indexer2.fit(df_indexed).transform(df_indexed)
indexer3 = StringIndexer(inputCol='parch',outputCol='ParentsOrChildren',handleInvalid='keep')
df_indexed=indexer3.fit(df_indexed).transform(df_indexed)
indexer4 = StringIndexer(inputCol='fare',outputCol='Fare',handleInvalid='keep')
df_indexed=indexer4.fit(df_indexed).transform(df_indexed)
indexer5 = StringIndexer(inputCol='embarked',outputCol='PortOfEmbarkation',handleInvalid='keep')
df_indexed=indexer5.fit(df_indexed).transform(df_indexed)
indexer6 = StringIndexer(inputCol='survived',outputCol='Survived_indexed',handleInvalid='keep')
df_indexed=indexer6.fit(df_indexed).transform(df_indexed)

In [12]:
df_indexed.printSchema()

root
 |-- pclass: string (nullable = true)
 |-- survived: string (nullable = true)
 |-- name: string (nullable = true)
 |-- sex: string (nullable = true)
 |-- age: double (nullable = true)
 |-- sibsp: string (nullable = true)
 |-- parch: string (nullable = true)
 |-- ticket: string (nullable = true)
 |-- Fare: double (nullable = false)
 |-- cabin: string (nullable = true)
 |-- embarked: string (nullable = true)
 |-- boat: string (nullable = true)
 |-- body: string (nullable = true)
 |-- home.dest: string (nullable = true)
 |-- has_cabin_number: integer (nullable = true)
 |-- pclass_indexed: double (nullable = false)
 |-- Gender: double (nullable = false)
 |-- SiblingsOrSpouses: double (nullable = false)
 |-- ParentsOrChildren: double (nullable = false)
 |-- PortOfEmbarkation: double (nullable = false)
 |-- Survived_indexed: double (nullable = false)



In [13]:
df.printSchema()

root
 |-- pclass: string (nullable = true)
 |-- survived: string (nullable = true)
 |-- name: string (nullable = true)
 |-- sex: string (nullable = true)
 |-- age: double (nullable = true)
 |-- sibsp: string (nullable = true)
 |-- parch: string (nullable = true)
 |-- ticket: string (nullable = true)
 |-- fare: string (nullable = true)
 |-- cabin: string (nullable = true)
 |-- embarked: string (nullable = true)
 |-- boat: string (nullable = true)
 |-- body: string (nullable = true)
 |-- home.dest: string (nullable = true)
 |-- has_cabin_number: integer (nullable = true)



In [14]:
df_indexed=df_indexed.drop('pclass','Sex','sibsp','parch','fare','embarked','survived')

In [15]:
df_indexed.printSchema()

root
 |-- name: string (nullable = true)
 |-- age: double (nullable = true)
 |-- ticket: string (nullable = true)
 |-- cabin: string (nullable = true)
 |-- boat: string (nullable = true)
 |-- body: string (nullable = true)
 |-- home.dest: string (nullable = true)
 |-- has_cabin_number: integer (nullable = true)
 |-- pclass_indexed: double (nullable = false)
 |-- Gender: double (nullable = false)
 |-- SiblingsOrSpouses: double (nullable = false)
 |-- ParentsOrChildren: double (nullable = false)
 |-- PortOfEmbarkation: double (nullable = false)
 |-- Survived_indexed: double (nullable = false)



In [16]:
df_indexed.select('PortOfEmbarkation').groupBy('PortOfEmbarkation').count().show()

+-----------------+-----+
|PortOfEmbarkation|count|
+-----------------+-----+
|              0.0|  917|
|              1.0|  270|
|              2.0|  123|
+-----------------+-----+



In [17]:
df.select('cabin').show() #Has Null Values -> Cannot take as an input

+-------+
|  cabin|
+-------+
|     B5|
|C22 C26|
|C22 C26|
|C22 C26|
|C22 C26|
|    E12|
|     D7|
|    A36|
|   C101|
|   NULL|
|C62 C64|
|C62 C64|
|    B35|
|   NULL|
|    A23|
|   NULL|
|B58 B60|
|B58 B60|
|    D15|
|     C6|
+-------+
only showing top 20 rows


In [18]:
df_indexed.printSchema()

root
 |-- name: string (nullable = true)
 |-- age: double (nullable = true)
 |-- ticket: string (nullable = true)
 |-- cabin: string (nullable = true)
 |-- boat: string (nullable = true)
 |-- body: string (nullable = true)
 |-- home.dest: string (nullable = true)
 |-- has_cabin_number: integer (nullable = true)
 |-- pclass_indexed: double (nullable = false)
 |-- Gender: double (nullable = false)
 |-- SiblingsOrSpouses: double (nullable = false)
 |-- ParentsOrChildren: double (nullable = false)
 |-- PortOfEmbarkation: double (nullable = false)
 |-- Survived_indexed: double (nullable = false)



In [19]:
df_indexed.columns

['name',
 'age',
 'ticket',
 'cabin',
 'boat',
 'body',
 'home.dest',
 'has_cabin_number',
 'pclass_indexed',
 'Gender',
 'SiblingsOrSpouses',
 'ParentsOrChildren',
 'PortOfEmbarkation',
 'Survived_indexed']

## Data Preparation

In [20]:
from pyspark.ml.feature import VectorAssembler

In [21]:
assembler = VectorAssembler(inputCols=['age',
                                       'has_cabin_number',
                                       'pclass_indexed',
                                       'Gender',
                                       'SiblingsOrSpouses',
                                       'ParentsOrChildren',
                                       'PortOfEmbarkation'],outputCol='features')

In [22]:
assembled_df=assembler.transform(df_indexed).select('features','Survived_indexed')
assembled_df.show(truncate=False)

+--------------------------------+----------------+
|features                        |Survived_indexed|
+--------------------------------+----------------+
|[29.0,1.0,1.0,1.0,0.0,0.0,0.0]  |1.0             |
|[0.9167,1.0,1.0,0.0,1.0,2.0,0.0]|1.0             |
|[2.0,1.0,1.0,1.0,1.0,2.0,0.0]   |0.0             |
|[30.0,1.0,1.0,0.0,1.0,2.0,0.0]  |0.0             |
|[25.0,1.0,1.0,1.0,1.0,2.0,0.0]  |0.0             |
|(7,[0,1,2],[48.0,1.0,1.0])      |1.0             |
|[63.0,1.0,1.0,1.0,1.0,0.0,0.0]  |1.0             |
|(7,[0,1,2],[39.0,1.0,1.0])      |0.0             |
|[53.0,1.0,1.0,1.0,2.0,0.0,0.0]  |1.0             |
|(7,[0,2,6],[71.0,1.0,1.0])      |0.0             |
|[47.0,1.0,1.0,0.0,1.0,0.0,1.0]  |0.0             |
|[18.0,1.0,1.0,1.0,1.0,0.0,1.0]  |1.0             |
|[24.0,1.0,1.0,1.0,0.0,0.0,1.0]  |1.0             |
|(7,[0,2,3],[26.0,1.0,1.0])      |1.0             |
|(7,[0,1,2],[80.0,1.0,1.0])      |1.0             |
|(7,[0,2],[29.8811345124283,1.0])|0.0             |
|[24.0,1.0,1

## Train and Test Data Split

In [23]:
train_df,test_df = assembled_df.randomSplit([0.8,0.2],seed=42)

## Model Building & Training

In [24]:
from pyspark.ml.classification import LogisticRegression,RandomForestClassifier,DecisionTreeClassifier

### Classification

#### Logistic Regression

In [25]:
lr=LogisticRegression(featuresCol='features',labelCol='Survived_indexed',maxIter=4)
model_lr=lr.fit(train_df)

In [26]:
lr_train_predictions=model_lr.transform(train_df)
lr_test_predictions=model_lr.transform(test_df)

#### Decision Tree

In [27]:
dt=DecisionTreeClassifier(featuresCol='features',labelCol='Survived_indexed',maxDepth=3)
model_dt=dt.fit(train_df)

In [28]:
dt_train_predictions=model_dt.transform(train_df)
dt_test_predictions=model_dt.transform(test_df)

#### Random Forest

In [29]:
rf=RandomForestClassifier(featuresCol='features',labelCol='Survived_indexed',maxDepth=4,numTrees=6)
model_rf=rf.fit(train_df)

In [30]:
rf_train_predictions=model_rf.transform(train_df)
rf_test_predictions=model_rf.transform(test_df)

## Model Evaluation

In [31]:
lr_train_predictions.show(4)

+--------------+----------------+--------------------+--------------------+----------+
|      features|Survived_indexed|       rawPrediction|         probability|prediction|
+--------------+----------------+--------------------+--------------------+----------+
|(7,[0],[14.0])|             1.0|[1.74107269823676...|[0.78683127848220...|       0.0|
|(7,[0],[16.0])|             0.0|[1.78427875663145...|[0.80057769225976...|       0.0|
|(7,[0],[16.0])|             0.0|[1.78427875663145...|[0.80057769225976...|       0.0|
|(7,[0],[16.0])|             0.0|[1.78427875663145...|[0.80057769225976...|       0.0|
+--------------+----------------+--------------------+--------------------+----------+
only showing top 4 rows


In [32]:
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
accuracy_evaluator=MulticlassClassificationEvaluator(predictionCol='prediction',labelCol='Survived_indexed',metricName='accuracy')
F1_evaluator=MulticlassClassificationEvaluator(predictionCol='prediction',labelCol='Survived_indexed',metricName='f1')
precision_evaluator=MulticlassClassificationEvaluator(predictionCol='prediction',labelCol='Survived_indexed',metricName='weightedPrecision')
recall_evaluator=MulticlassClassificationEvaluator(predictionCol='prediction',labelCol='Survived_indexed',metricName='weightedRecall')
#accuracy_evaluator.evaluate(lr_train_predictions)

In [33]:
def evaluate_model(model_predictions):
    print("Accuracy = ",accuracy_evaluator.evaluate(model_predictions))
    print("F1 = ",F1_evaluator.evaluate(model_predictions))
    print("Precision = ",precision_evaluator.evaluate(model_predictions))
    print("Recall = ",recall_evaluator.evaluate(model_predictions))

    print("Confusion Matrix = ", model_predictions.groupBy("Survived_indexed","prediction").count().show())

### Logistic Regression

In [34]:
print("Logistic Regression Training Predictions")
print("-------------------------------")
evaluate_model(lr_train_predictions)

Logistic Regression Training Predictions
-------------------------------
Accuracy =  0.7813653136531366
F1 =  0.7798642631823984
Precision =  0.7793728227086418
Recall =  0.7813653136531366
+----------------+----------+-----+
|Survived_indexed|prediction|count|
+----------------+----------+-----+
|             1.0|       1.0|  284|
|             0.0|       1.0|  105|
|             1.0|       0.0|  132|
|             0.0|       0.0|  563|
+----------------+----------+-----+

Confusion Matrix =  None


In [35]:
print("Logistic Regression Testing Predictions")
print("-------------------------------")
evaluate_model(lr_test_predictions)

Logistic Regression Testing Predictions
-------------------------------
Accuracy =  0.8185840707964602
F1 =  0.8192070261630203
Precision =  0.8201263218903058
Recall =  0.8185840707964602
+----------------+----------+-----+
|Survived_indexed|prediction|count|
+----------------+----------+-----+
|             1.0|       1.0|   65|
|             0.0|       1.0|   22|
|             1.0|       0.0|   19|
|             0.0|       0.0|  120|
+----------------+----------+-----+

Confusion Matrix =  None


### Decision Tree

In [36]:
print("Decision Tree Training Predictions")
print("-------------------------------")
evaluate_model(dt_train_predictions)

Decision Tree Training Predictions
-------------------------------
Accuracy =  0.7952029520295203
F1 =  0.7913508170795993
Precision =  0.7934862508088567
Recall =  0.7952029520295203
+----------------+----------+-----+
|Survived_indexed|prediction|count|
+----------------+----------+-----+
|             1.0|       1.0|  273|
|             0.0|       1.0|   79|
|             1.0|       0.0|  143|
|             0.0|       0.0|  589|
+----------------+----------+-----+

Confusion Matrix =  None


In [37]:
print("Decision Tree Testing Predictions")
print("-------------------------------")
evaluate_model(dt_test_predictions)

Decision Tree Testing Predictions
-------------------------------
Accuracy =  0.8230088495575221
F1 =  0.8225614879842409
Precision =  0.8222489027987624
Recall =  0.8230088495575221
+----------------+----------+-----+
|Survived_indexed|prediction|count|
+----------------+----------+-----+
|             1.0|       1.0|   63|
|             0.0|       1.0|   19|
|             1.0|       0.0|   21|
|             0.0|       0.0|  123|
+----------------+----------+-----+

Confusion Matrix =  None


### Random Forest

In [38]:
print("Random Forest Training Predictions")
print("-------------------------------")
evaluate_model(rf_train_predictions)

Random Forest Training Predictions
-------------------------------
Accuracy =  0.7970479704797048
F1 =  0.7834860205007605
Precision =  0.8143758810702613
Recall =  0.7970479704797048
+----------------+----------+-----+
|Survived_indexed|prediction|count|
+----------------+----------+-----+
|             1.0|       1.0|  225|
|             0.0|       1.0|   29|
|             1.0|       0.0|  191|
|             0.0|       0.0|  639|
+----------------+----------+-----+

Confusion Matrix =  None


In [39]:
print("Random Forest Testing Predictions")
print("-------------------------------")
evaluate_model(rf_test_predictions)

Random Forest Testing Predictions
-------------------------------
Accuracy =  0.827433628318584
F1 =  0.8164010569023166
Precision =  0.8448320958365867
Recall =  0.827433628318584
+----------------+----------+-----+
|Survived_indexed|prediction|count|
+----------------+----------+-----+
|             1.0|       1.0|   49|
|             0.0|       1.0|    4|
|             1.0|       0.0|   35|
|             0.0|       0.0|  138|
+----------------+----------+-----+

Confusion Matrix =  None


## Hyperparameter Tuning

### Logistic Regression

In [40]:
from pyspark.ml.tuning import ParamGridBuilder,CrossValidator
param_grid=ParamGridBuilder().addGrid(lr.maxIter,range(2,11)).build()
evaluator=MulticlassClassificationEvaluator(labelCol='Survived_indexed',metricName='accuracy')
cv = CrossValidator(estimator=lr,estimatorParamMaps=param_grid,evaluator=evaluator,numFolds=5,seed=42)
cv_model=cv.fit(train_df)

print(cv_model.bestModel)

LogisticRegressionModel: uid=LogisticRegression_b7f2cae9810d, numClasses=3, numFeatures=7


In [41]:
from pyspark.ml.tuning import ParamGridBuilder,CrossValidator
param_grid=ParamGridBuilder().addGrid(lr.maxIter,range(2,11)).build()
evaluator=MulticlassClassificationEvaluator(labelCol='Survived_indexed',metricName='f1')
cv = CrossValidator(estimator=lr,estimatorParamMaps=param_grid,evaluator=evaluator,numFolds=5,seed=42)
cv_model=cv.fit(train_df)

print(cv_model.bestModel)

LogisticRegressionModel: uid=LogisticRegression_b7f2cae9810d, numClasses=3, numFeatures=7


In [42]:
from pyspark.ml.tuning import ParamGridBuilder,CrossValidator
param_grid=ParamGridBuilder().addGrid(lr.maxIter,range(2,11)).build()
evaluator=MulticlassClassificationEvaluator(labelCol='Survived_indexed',metricName='weightedPrecision')
cv = CrossValidator(estimator=lr,estimatorParamMaps=param_grid,evaluator=evaluator,numFolds=5,seed=42)
cv_model=cv.fit(train_df)

print(cv_model.bestModel)

LogisticRegressionModel: uid=LogisticRegression_b7f2cae9810d, numClasses=3, numFeatures=7


In [43]:
from pyspark.ml.tuning import ParamGridBuilder,CrossValidator
param_grid=ParamGridBuilder().addGrid(lr.maxIter,range(2,11)).build()
evaluator=MulticlassClassificationEvaluator(labelCol='Survived_indexed',metricName='weightedRecall')
cv = CrossValidator(estimator=lr,estimatorParamMaps=param_grid,evaluator=evaluator,numFolds=5,seed=42)
cv_model=cv.fit(train_df)

print(cv_model.bestModel)

LogisticRegressionModel: uid=LogisticRegression_b7f2cae9810d, numClasses=3, numFeatures=7


### Decision Tree

In [44]:
from pyspark.ml.tuning import ParamGridBuilder,CrossValidator
param_grid=ParamGridBuilder().addGrid(dt.maxDepth,range(2,11)).build()
evaluator=MulticlassClassificationEvaluator(labelCol='Survived_indexed',metricName='weightedRecall')
cv = CrossValidator(estimator=dt,estimatorParamMaps=param_grid,evaluator=evaluator,numFolds=5,seed=42)
cv_model=cv.fit(train_df)

print(cv_model.bestModel)

DecisionTreeClassificationModel: uid=DecisionTreeClassifier_dea1c6886f78, depth=3, numNodes=11, numClasses=3, numFeatures=7


In [45]:
from pyspark.ml.tuning import ParamGridBuilder,CrossValidator
param_grid=ParamGridBuilder().addGrid(dt.maxDepth,range(2,11)).build()
evaluator=MulticlassClassificationEvaluator(labelCol='Survived_indexed',metricName='weightedPrecision')
cv = CrossValidator(estimator=dt,estimatorParamMaps=param_grid,evaluator=evaluator,numFolds=5,seed=42)
cv_model=cv.fit(train_df)

print(cv_model.bestModel)

DecisionTreeClassificationModel: uid=DecisionTreeClassifier_dea1c6886f78, depth=3, numNodes=11, numClasses=3, numFeatures=7


In [46]:
from pyspark.ml.tuning import ParamGridBuilder,CrossValidator
param_grid=ParamGridBuilder().addGrid(dt.maxDepth,range(2,11)).build()
evaluator=MulticlassClassificationEvaluator(labelCol='Survived_indexed',metricName='f1')
cv = CrossValidator(estimator=dt,estimatorParamMaps=param_grid,evaluator=evaluator,numFolds=5,seed=42)
cv_model=cv.fit(train_df)

print(cv_model.bestModel)

DecisionTreeClassificationModel: uid=DecisionTreeClassifier_dea1c6886f78, depth=3, numNodes=11, numClasses=3, numFeatures=7


In [47]:
from pyspark.ml.tuning import ParamGridBuilder,CrossValidator
param_grid=ParamGridBuilder().addGrid(dt.maxDepth,range(2,11)).build()
evaluator=MulticlassClassificationEvaluator(labelCol='Survived_indexed',metricName='accuracy')
cv = CrossValidator(estimator=dt,estimatorParamMaps=param_grid,evaluator=evaluator,numFolds=5,seed=42)
cv_model=cv.fit(train_df)

print(cv_model.bestModel)

DecisionTreeClassificationModel: uid=DecisionTreeClassifier_dea1c6886f78, depth=3, numNodes=11, numClasses=3, numFeatures=7


### Random Forest

In [48]:
from pyspark.ml.tuning import ParamGridBuilder,CrossValidator
param_grid=ParamGridBuilder().addGrid(rf.maxDepth,range(2,11)).addGrid(rf.numTrees, range(2,5)).build()
evaluator=MulticlassClassificationEvaluator(labelCol='Survived_indexed',metricName='weightedRecall')
cv = CrossValidator(estimator=rf,estimatorParamMaps=param_grid,evaluator=evaluator,numFolds=5,seed=42)
cv_model=cv.fit(train_df)

print(cv_model.bestModel)

RandomForestClassificationModel: uid=RandomForestClassifier_4552105a59cf, numTrees=2, numClasses=3, numFeatures=7


In [49]:
from pyspark.ml.tuning import ParamGridBuilder,CrossValidator
param_grid=ParamGridBuilder().addGrid(rf.maxDepth,range(2,11)).addGrid(rf.numTrees, range(2,5)).build()
evaluator=MulticlassClassificationEvaluator(labelCol='Survived_indexed',metricName='weightedPrecision')
cv = CrossValidator(estimator=rf,estimatorParamMaps=param_grid,evaluator=evaluator,numFolds=5,seed=42)
cv_model=cv.fit(train_df)

print(cv_model.bestModel)

RandomForestClassificationModel: uid=RandomForestClassifier_4552105a59cf, numTrees=4, numClasses=3, numFeatures=7


In [50]:
from pyspark.ml.tuning import ParamGridBuilder,CrossValidator
param_grid=ParamGridBuilder().addGrid(rf.maxDepth,range(2,11)).addGrid(rf.numTrees, range(2,5)).build()
evaluator=MulticlassClassificationEvaluator(labelCol='Survived_indexed',metricName='f1')
cv = CrossValidator(estimator=rf,estimatorParamMaps=param_grid,evaluator=evaluator,numFolds=5,seed=42)
cv_model=cv.fit(train_df)

print(cv_model.bestModel)

RandomForestClassificationModel: uid=RandomForestClassifier_4552105a59cf, numTrees=2, numClasses=3, numFeatures=7


In [51]:
from pyspark.ml.tuning import ParamGridBuilder,CrossValidator
param_grid=ParamGridBuilder().addGrid(rf.maxDepth,range(2,11)).addGrid(rf.numTrees, range(2,5)).build()
evaluator=MulticlassClassificationEvaluator(labelCol='Survived_indexed',metricName='accuracy')
cv = CrossValidator(estimator=rf,estimatorParamMaps=param_grid,evaluator=evaluator,numFolds=5,seed=42)
cv_model=cv.fit(train_df)

print(cv_model.bestModel)

RandomForestClassificationModel: uid=RandomForestClassifier_4552105a59cf, numTrees=2, numClasses=3, numFeatures=7


# Map Reduce

#### Q1. Find the total number of passengers by gender

In [52]:
df.columns

['pclass',
 'survived',
 'name',
 'sex',
 'age',
 'sibsp',
 'parch',
 'ticket',
 'fare',
 'cabin',
 'embarked',
 'boat',
 'body',
 'home.dest',
 'has_cabin_number']

In [53]:
from mrjob.job import MRJob

In [54]:
!python TotalGender.py --mapper titanic_clean.csv

"\"female\""	1
"\"male\""	1
"\"female\""	1
"\"male\""	1
"\"female\""	1
"\"male\""	1
"\"female\""	1
"\"male\""	1
"\"female\""	1
"\"male\""	1
"\"male\""	1
"\"female\""	1
"\"female\""	1
"\"female\""	1
"\"male\""	1
"\"male\""	1
"\"male\""	1
"\"female\""	1
"\"female\""	1
"\"male\""	1
"\"male\""	1
"\"female\""	1
"\"male\""	1
"\"female\""	1
"\"female\""	1
"\"male\""	1
"\"male\""	1
"\"female\""	1
"\"female\""	1
"\"male\""	1
"\"male\""	1
"\"male\""	1
"\"female\""	1
"\"female\""	1
"\"male\""	1
"\"female\""	1
"\"female\""	1
"\"male\""	1
"\"male\""	1
"\"male\""	1
"\"male\""	1
"\"female\""	1
"\"female\""	1
"\"female\""	1
"\"female\""	1
"\"male\""	1
"\"male\""	1
"\"male\""	1
"\"female\""	1
"\"male\""	1
"\"female\""	1
"\"male\""	1
"\"male\""	1
"\"male\""	1
"\"male\""	1
"\"female\""	1
"\"male\""	1
"\"female\""	1
"\"male\""	1
"\"female\""	1
"\"male\""	1
"\"female\""	1
"\"male\""	1
"\"female\""	1
"\"male\""	1
"\"female\""	1
"\"female\""	1
"\"female\""	1
"\"male\""	1
"\"female\""	1
"\"male\""	1
"\"male\"

In [55]:
!python TotalGender.py titanic_clean.csv

"\"female\""	144

No configs found; falling back on auto-configuration
No configs specified for inline runner
Creating temp directory C:\Users\amits\AppData\Local\Temp\TotalGender.amits.20251102.135126.178288
Running step 1 of 1...
job output is in C:\Users\amits\AppData\Local\Temp\TotalGender.amits.20251102.135126.178288\output
Streaming final output from C:\Users\amits\AppData\Local\Temp\TotalGender.amits.20251102.135126.178288\output...
Removing temp directory C:\Users\amits\AppData\Local\Temp\TotalGender.amits.20251102.135126.178288...



"\"male\""	179


#### Question 2: Calculate Average Age of Survivors vs Non-Survivors

In [56]:
from mrjob.job import MRJob

In [57]:
df.printSchema()

root
 |-- pclass: string (nullable = true)
 |-- survived: string (nullable = true)
 |-- name: string (nullable = true)
 |-- sex: string (nullable = true)
 |-- age: double (nullable = true)
 |-- sibsp: string (nullable = true)
 |-- parch: string (nullable = true)
 |-- ticket: string (nullable = true)
 |-- fare: string (nullable = true)
 |-- cabin: string (nullable = true)
 |-- embarked: string (nullable = true)
 |-- boat: string (nullable = true)
 |-- body: string (nullable = true)
 |-- home.dest: string (nullable = true)
 |-- has_cabin_number: integer (nullable = true)



In [58]:
df.columns

['pclass',
 'survived',
 'name',
 'sex',
 'age',
 'sibsp',
 'parch',
 'ticket',
 'fare',
 'cabin',
 'embarked',
 'boat',
 'body',
 'home.dest',
 'has_cabin_number']

In [59]:
!python averageage.py --mapper titanic_clean.csv

"1"	29.0
"1"	0.9167
"0"	2.0
"0"	30.0
"0"	25.0
"1"	48.0
"1"	63.0
"0"	39.0
"1"	53.0
"0"	71.0
"0"	47.0
"1"	18.0
"1"	24.0
"1"	26.0
"1"	80.0
"0"	29.8811345124283
"0"	24.0
"1"	50.0
"1"	32.0
"0"	36.0
"1"	37.0
"1"	47.0
"1"	26.0
"1"	42.0
"1"	29.0
"0"	25.0
"1"	25.0
"1"	19.0
"1"	35.0
"1"	28.0
"0"	45.0
"1"	40.0
"1"	30.0
"1"	58.0
"0"	42.0
"1"	45.0
"1"	22.0
"1"	29.8811345124283
"0"	41.0
"0"	48.0
"0"	29.8811345124283
"1"	44.0
"1"	59.0
"1"	60.0
"1"	41.0
"0"	45.0
"0"	29.8811345124283
"1"	42.0
"1"	53.0
"1"	36.0
"1"	58.0
"0"	33.0
"0"	28.0
"0"	17.0
"1"	11.0
"1"	14.0
"1"	36.0
"1"	36.0
"0"	49.0
"1"	29.8811345124283
"0"	36.0
"1"	76.0
"0"	46.0
"1"	47.0
"1"	27.0
"1"	33.0
"1"	36.0
"1"	30.0
"1"	45.0
"1"	29.8811345124283
"0"	29.8811345124283
"0"	27.0
"1"	26.0
"1"	22.0
"0"	29.8811345124283
"0"	47.0
"1"	39.0
"0"	37.0
"1"	64.0
"1"	55.0
"0"	29.8811345124283
"0"	70.0
"1"	36.0
"1"	64.0
"0"	39.0
"1"	38.0
"1"	51.0
"1"	27.0
"1"	33.0
"0"	31.0
"1"	27.0
"1"	31.0
"1"	17.0
"1"	53.0
"1"	4.0
"1"	54.0
"0"	50.0
"1"	27.0
"1"	48.0
"

In [60]:
!python averageage.py titanic_clean.csv

"0"	30.38936817968011
"1"	29.058812438814574


No configs found; falling back on auto-configuration
No configs specified for inline runner
Creating temp directory C:\Users\amits\AppData\Local\Temp\averageage.amits.20251102.135129.733378
Running step 1 of 1...
job output is in C:\Users\amits\AppData\Local\Temp\averageage.amits.20251102.135129.733378\output
Streaming final output from C:\Users\amits\AppData\Local\Temp\averageage.amits.20251102.135129.733378\output...
Removing temp directory C:\Users\amits\AppData\Local\Temp\averageage.amits.20251102.135129.733378...


# AirFlow

In [None]:
# etl.py

from pyspark.sql import SparkSession

spark = SparkSession.builder.appName('Transformer').getOrCreate()

df = spark.read.csv('sample.csv', header=True, inferSchema=True)
df_cleaned = df.filter(df['age']>20)
df_cleaned.write.mode('overwrite').csv('sample_data') 
#write.mode('overwrite').csv('sample_data') â†’ saves the cleaned data as a new CSV directory named sample_data.

In [None]:
from airflow import DAG
from datetime import datetime, timedelta
from airflow.providers.standard.operators.bash import BashOperator

with DAG(
    dag_id='spark_job_every_minute',
    description='Runs the etl spark job every minute',
    start_date=datetime(2025, 10, 1),
    catchup=False,
    schedule='* * * * *',
    default_args={
        'owner':'apoorva',
        'retries':1,
        'retry_delay':timedelta(minutes=2)
    },
    tags=['minute', 'spark', 'etl']
) as dag:

  spark_job = BashOperator(
      task_id='spark_task',
      bash_command='spark-submit etl.py'
  )


In [None]:
from airflow import DAG
from datetime import timedelta, datetime
from airflow.providers.standard.operators.bash import BashOperator
from airflow.providers.standard.sensors.filesystem import FileSensor

with DAG(
      dag_id='etl_on_file_arrival',
      description='This is an ETL process that first checks for the file and then transforms it',
      start_date=datetime(2025, 10, 1),
      catchup=False,
      schedule=None,
      default_args={
          'owner':'apoorva',
          'retries': 1,
          'retry_delay': timedelta(minutes=2)
      },
      tags=['etl', 'filesystem']
) as dag:

  wait_for_file = FileSensor(
      task_id='wait_for_file',
      filepath='sample.csv',
      poke_interval =30,
      timeout=600,
      mode='poke'
  )

  spark_task = BashOperator(
      task_id='spark_task',
      bash_command='spark-submit etl.py'
  )

  wait_for_file >> spark_task

In [None]:
from airflow import DAG
from datetime import datetime, timedelta
from airflow.providers.standard.operators.bash import BashOperator
from airflow.providers.standard.sensors.filesystem import FileSensor

with DAG(
    start_date=
    default_args=
    catchup=
    schedule=
    dag_id=
    description={}
    tags=[]
) as dag:

wait_for_file  = FileSensor(
    mode='poke',
    timeout=600,
    poke_interval=30,
    filepath='sample.csv',
    task_id='wait_for_file'
)

spark_job = BashOperator(
    task_id='spark_task'
    bash_command='spark-sumnit etl.py' 
)