# Project 1
## Machine Learning without Mllib Pipeline

%pip install 

In [89]:
import findspark
from pyspark.sql import SparkSession

In [90]:
findspark.init()
findspark.find()

'c:\\Users\\ellin\\Documents\\MSc_IoT_MAU\\DA642E-AI-and-Data-Management-for-IOT\\.venv\\Lib\\site-packages\\pyspark'

In [91]:
spark = SparkSession \
    .builder \
    .appName ("Titanic Data") \
    .getOrCreate()

In [92]:
spark

In [93]:
df = (spark.read
        .format("csv")
        .option("header","true")
        .load("data/train.csv")
)

In [94]:
df.show(5)

+-----------+--------+------+--------------------+------+---+-----+-----+----------------+-------+-----+--------+
|PassengerId|Survived|Pclass|                Name|   Sex|Age|SibSp|Parch|          Ticket|   Fare|Cabin|Embarked|
+-----------+--------+------+--------------------+------+---+-----+-----+----------------+-------+-----+--------+
|          1|       0|     3|Braund, Mr. Owen ...|  male| 22|    1|    0|       A/5 21171|   7.25| NULL|       S|
|          2|       1|     1|Cumings, Mrs. Joh...|female| 38|    1|    0|        PC 17599|71.2833|  C85|       C|
|          3|       1|     3|Heikkinen, Miss. ...|female| 26|    0|    0|STON/O2. 3101282|  7.925| NULL|       S|
|          4|       1|     1|Futrelle, Mrs. Ja...|female| 35|    1|    0|          113803|   53.1| C123|       S|
|          5|       0|     3|Allen, Mr. Willia...|  male| 35|    0|    0|          373450|   8.05| NULL|       S|
+-----------+--------+------+--------------------+------+---+-----+-----+---------------

In [95]:
from pyspark.sql.functions import col

In [96]:
dataset = df.select(col('Survived').cast('float'),
                    col('Pclass').cast('float'),
                    col('Sex'),
                    col('Age').cast('float'),
                    col('Fare').cast('float'),
                    col('Embarked')
                    )

In [97]:
dataset.show(4)

+--------+------+------+----+-------+--------+
|Survived|Pclass|   Sex| Age|   Fare|Embarked|
+--------+------+------+----+-------+--------+
|     0.0|   3.0|  male|22.0|   7.25|       S|
|     1.0|   1.0|female|38.0|71.2833|       C|
|     1.0|   3.0|female|26.0|  7.925|       S|
|     1.0|   1.0|female|35.0|   53.1|       S|
+--------+------+------+----+-------+--------+
only showing top 4 rows



In [98]:
from pyspark.sql.functions import isnull, when, count, col

In [99]:
dataset.select([count(when(isnull(c), c)).alias(c) for c in dataset.columns]).show()

+--------+------+---+---+----+--------+
|Survived|Pclass|Sex|Age|Fare|Embarked|
+--------+------+---+---+----+--------+
|       0|     0|  0|177|   0|       2|
+--------+------+---+---+----+--------+



In [100]:
dataset = dataset.replace('?', None)\
            .dropna(how='any')

In [101]:
dataset.select([count(when(isnull(c), c)).alias(c) for c in dataset.columns]).show()

+--------+------+---+---+----+--------+
|Survived|Pclass|Sex|Age|Fare|Embarked|
+--------+------+---+---+----+--------+
|       0|     0|  0|  0|   0|       0|
+--------+------+---+---+----+--------+



In [102]:
import numpy as np
from pyspark.ml.feature import StringIndexer

In [103]:
dataset = StringIndexer(
    inputCol='Sex',
    outputCol='Gender',
    handleInvalid='keep').fit(dataset).transform(dataset)

In [104]:
dataset = StringIndexer(
    inputCol='Embarked',
    outputCol='Boarded',
    handleInvalid='keep').fit(dataset).transform(dataset)

In [105]:
dataset.show(2)

+--------+------+------+----+-------+--------+------+-------+
|Survived|Pclass|   Sex| Age|   Fare|Embarked|Gender|Boarded|
+--------+------+------+----+-------+--------+------+-------+
|     0.0|   3.0|  male|22.0|   7.25|       S|   0.0|    0.0|
|     1.0|   1.0|female|38.0|71.2833|       C|   1.0|    1.0|
+--------+------+------+----+-------+--------+------+-------+
only showing top 2 rows



In [106]:
# Drop unnecessary columns
dataset = dataset.drop('Sex')
dataset = dataset.drop('Embarked')

In [107]:
# Assemble features with VectorAssembler
from pyspark.ml.feature import VectorAssembler

In [108]:
require_featured = ['Pclass', 'Age', 'Fare', 'Gender', 'Boarded']
assembler = VectorAssembler(inputCols=require_featured, outputCol='features')
transformed_data = assembler.transform(dataset)

In [109]:
transformed_data.show(5)

+--------+------+----+-------+------+-------+--------------------+
|Survived|Pclass| Age|   Fare|Gender|Boarded|            features|
+--------+------+----+-------+------+-------+--------------------+
|     0.0|   3.0|22.0|   7.25|   0.0|    0.0|[3.0,22.0,7.25,0....|
|     1.0|   1.0|38.0|71.2833|   1.0|    1.0|[1.0,38.0,71.2833...|
|     1.0|   3.0|26.0|  7.925|   1.0|    0.0|[3.0,26.0,7.92500...|
|     1.0|   1.0|35.0|   53.1|   1.0|    0.0|[1.0,35.0,53.0999...|
|     0.0|   3.0|35.0|   8.05|   0.0|    0.0|[3.0,35.0,8.05000...|
+--------+------+----+-------+------+-------+--------------------+
only showing top 5 rows



In [110]:
# split train adn test
(training_data, test_data) = transformed_data.randomSplit([0.8,0.2])
print("N training samples: " + str(training_data.count()))
print("N testing samples: " + str(test_data.count()))

N training samples: 569
N testing samples: 143


In [111]:
from pyspark.ml.classification import RandomForestClassifier

In [112]:
rf = RandomForestClassifier(labelCol='Survived',
                            featuresCol='features',
                            maxDepth=5)

In [113]:
model = rf.fit(training_data)

In [114]:
predictions = model.transform(test_data)

In [115]:
# Evaluation
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

In [116]:
evaluator = MulticlassClassificationEvaluator(
    labelCol='Survived',
    predictionCol='prediction',
    metricName='accuracy')

In [117]:
accuracy = evaluator.evaluate(predictions)
print('Training Accuracy= ', accuracy)

Training Accuracy=  0.7622377622377622


# Project 2
## Machine Learning Project with Mllib Pipeline

In [162]:
# setup pyspark env

# download JVM
## %pip apt-get install openjdk-8-jdk-headless -qq > /dev/null

In [175]:
# libraries

from pyspark.sql import SparkSession
import findspark

In [176]:
# Cell for debugging 
findspark.init()
findspark.find()

'c:\\Users\\ellin\\Documents\\MSc_IoT_MAU\\DA642E-AI-and-Data-Management-for-IOT\\.venv\\Lib\\site-packages\\pyspark'

set path_to_spark_archive to sparks PATH
path_to_spark_archive = 'c:/users/ellin/appdata/local/programs/python/python311/lib/site-packages'

#!tar xf "{path_tospark_archive}"

import tarfile
with tarfile.open(path_to_spark_archive, "r:*") as tar:
    tar.extractall(path=path_to_spark_archive)

In [177]:
spark = SparkSession.builder \
    .master("local") \
    .appName ("Titanic Data") \
    .getOrCreate()

spark

In [178]:
df = (spark.read
        .format("csv")
        .option("header", "true")
        .load("data/train.csv"))

df.show(3)

+-----------+--------+------+--------------------+------+---+-----+-----+----------------+-------+-----+--------+
|PassengerId|Survived|Pclass|                Name|   Sex|Age|SibSp|Parch|          Ticket|   Fare|Cabin|Embarked|
+-----------+--------+------+--------------------+------+---+-----+-----+----------------+-------+-----+--------+
|          1|       0|     3|Braund, Mr. Owen ...|  male| 22|    1|    0|       A/5 21171|   7.25| NULL|       S|
|          2|       1|     1|Cumings, Mrs. Joh...|female| 38|    1|    0|        PC 17599|71.2833|  C85|       C|
|          3|       1|     3|Heikkinen, Miss. ...|female| 26|    0|    0|STON/O2. 3101282|  7.925| NULL|       S|
+-----------+--------+------+--------------------+------+---+-----+-----+----------------+-------+-----+--------+
only showing top 3 rows



In [179]:
from pyspark.sql import functions as F
from pyspark.sql import types as T

from pyspark.ml.feature import StringIndexer, OneHotEncoder

from pyspark.ml.feature import VectorAssembler

from pyspark.ml.classification import RandomForestClassifier

In [180]:
from pyspark.ml import Pipeline

In [181]:
(train_df, test_df) = df.randomSplit([0.8, 0.2], 11)
print("N of train samples: " + str(train_df.count()))
print("N of test samples: " + str(test_df.count()))

N of train samples: 703
N of test samples: 188


In [186]:
Sex_indexer = StringIndexer(inputCol="Sex", outputCol="Gender")
Embarked_indexer = StringIndexer(inputCol="Embarked", outputCol="Boarded")

inputCols = ['Pclass', 'Age', 'Fare', 'Gender', 'Boarded']
outputCol="features"

from pyspark.sql.functions import col

# Convert to numeric types
train_df = train_df.withColumn("Pclass", col("Pclass").cast("double")) \
                   .withColumn("Age", col("Age").cast("double")) \
                   .withColumn("Fare", col("Fare").cast("double"))

test_df = test_df.withColumn("Pclass", col("Pclass").cast("double")) \
                 .withColumn("Age", col("Age").cast("double")) \
                 .withColumn("Fare", col("Fare").cast("double"))

# Fill missing values
train_df = train_df.fillna({"Age": train_df.select("Age").agg({"Age": "mean"}).collect()[0][0],
                            "Fare": train_df.select("Fare").agg({"Fare": "mean"}).collect()[0][0],
                            "Embarked": "missing",
                            "Sex": "missing"})

test_df = test_df.fillna({"Age": test_df.select("Age").agg({"Age": "mean"}).collect()[0][0],
                          "Fare": test_df.select("Fare").agg({"Fare": "mean"}).collect()[0][0],
                          "Embarked": "missing",
                          "Sex": "missing"})


vector_assembler = VectorAssembler(inputCols=inputCols, outputCol=outputCol)

dt_model = RandomForestClassifier(labelCol="Survived", featuresCol="features")



In [187]:
#train_df.select("Gender", "Boarded").show(5)
train_df.printSchema()
train_df.select("Sex", "Embarked").distinct().show()


root
 |-- PassengerId: string (nullable = true)
 |-- Survived: string (nullable = true)
 |-- Pclass: double (nullable = true)
 |-- Name: string (nullable = true)
 |-- Sex: string (nullable = false)
 |-- Age: double (nullable = false)
 |-- SibSp: string (nullable = true)
 |-- Parch: string (nullable = true)
 |-- Ticket: string (nullable = true)
 |-- Fare: double (nullable = false)
 |-- Cabin: string (nullable = true)
 |-- Embarked: string (nullable = false)

+------+--------+
|   Sex|Embarked|
+------+--------+
|female| missing|
|  male|       C|
|  male|       S|
|female|       Q|
|female|       S|
|  male|       Q|
|female|       C|
+------+--------+



In [188]:
Sex_indexer = StringIndexer(inputCol="Sex", outputCol="Gender")
Embarked_indexer = StringIndexer(inputCol="Embarked", outputCol="Boarded")

indexed_df = Sex_indexer.fit(train_df).transform(train_df)
indexed_df = Embarked_indexer.fit(indexed_df).transform(indexed_df)

indexed_df.printSchema()
indexed_df.select("Sex", "Gender", "Embarked", "Boarded").show(5)


root
 |-- PassengerId: string (nullable = true)
 |-- Survived: string (nullable = true)
 |-- Pclass: double (nullable = true)
 |-- Name: string (nullable = true)
 |-- Sex: string (nullable = false)
 |-- Age: double (nullable = false)
 |-- SibSp: string (nullable = true)
 |-- Parch: string (nullable = true)
 |-- Ticket: string (nullable = true)
 |-- Fare: double (nullable = false)
 |-- Cabin: string (nullable = true)
 |-- Embarked: string (nullable = false)
 |-- Gender: double (nullable = false)
 |-- Boarded: double (nullable = false)

+------+------+--------+-------+
|   Sex|Gender|Embarked|Boarded|
+------+------+--------+-------+
|  male|   0.0|       S|    0.0|
|female|   1.0|       C|    1.0|
|  male|   0.0|       S|    0.0|
|female|   1.0|       S|    0.0|
|  male|   0.0|       S|    0.0|
+------+------+--------+-------+
only showing top 5 rows



In [190]:
from pyspark.sql.functions import col

train_df = train_df.withColumn("Survived", col("Survived").cast("double"))
test_df = test_df.withColumn("Survived", col("Survived").cast("double"))


In [191]:
pipeline = Pipeline(stages=[Sex_indexer, Embarked_indexer, vector_assembler, dt_model])

final_pipeline = pipeline.fit(train_df)

test_predictions_pipeline = final_pipeline.transform(test_df)

test_predictions_pipeline.show(5, truncate=False)

+-----------+--------+------+-------------------------------+------+------------------+-----+-----+-------+-------+-----+--------+------+-------+--------------------------------------+---------------------------------------+----------------------------------------+----------+
|PassengerId|Survived|Pclass|Name                           |Sex   |Age               |SibSp|Parch|Ticket |Fare   |Cabin|Embarked|Gender|Boarded|features                              |rawPrediction                          |probability                             |prediction|
+-----------+--------+------+-------------------------------+------+------------------+-----+-----+-------+-------+-----+--------+------+-------+--------------------------------------+---------------------------------------+----------------------------------------+----------+
|103        |0.0     |1.0   |White, Mr. Richard Frasar      |male  |21.0              |0    |1    |35281  |77.2875|D26  |S       |0.0   |0.0    |[1.0,21.0,77.2875,0.0,0.