<a href="https://colab.research.google.com/github/Alekhya-pvsns/assignments/blob/master/Lab_5_Project_2.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

Project 2: Machine Learning Project with Mllib
Pipline


Setting up the pyspark environment

In [None]:
# Downloading the JVM

!apt-get install openjdk-8-jdk-headless -qq > /dev/null

In [None]:
# Uploading the spark archive

path_to_spark_archive = '/content/drive/MyDrive/spark-3.0.1-bin-hadoop2.7.tgz'

In [None]:
# Unzipping the archived file

!tar xf "{path_to_spark_archive}"

In [None]:
# Adding environment variables

import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-3.0.1-bin-hadoop2.7"

In [None]:
# Installing findspark

!pip install -q findspark

In [None]:
# Importing findspark, using the spark installation directory

import findspark

path_to_spark = '/content/spark-3.0.1-bin-hadoop2.7'

findspark.init(path_to_spark)

findspark.find()

'/content/spark-3.0.1-bin-hadoop2.7'

Initializing Spark Session

In [None]:
# Importing SparkSession

from pyspark.sql import SparkSession

# Creating a SparkSession

spark = SparkSession.builder \
        .master("local") \
        .appName("Titanic Data") \
        .getOrCreate()

# Checking the session info
spark

Reading the data

In [None]:
df = (spark.read
           .format("csv")
           .option("header","true")
           .load("/content/drive/MyDrive/train.csv"))

df.show(3)

+-----------+--------+------+--------------------+------+---+-----+-----+----------------+-------+-----+--------+
|PassengerId|Survived|Pclass|                Name|   Sex|Age|SibSp|Parch|          Ticket|   Fare|Cabin|Embarked|
+-----------+--------+------+--------------------+------+---+-----+-----+----------------+-------+-----+--------+
|          1|       0|     3|Braund, Mr. Owen ...|  male| 22|    1|    0|       A/5 21171|   7.25| null|       S|
|          2|       1|     1|Cumings, Mrs. Joh...|female| 38|    1|    0|        PC 17599|71.2833|  C85|       C|
|          3|       1|     3|Heikkinen, Miss. ...|female| 26|    0|    0|STON/O2. 3101282|  7.925| null|       S|
+-----------+--------+------+--------------------+------+---+-----+-----+----------------+-------+-----+--------+
only showing top 3 rows



Importing functions

In [None]:
from pyspark.sql import functions as F
from pyspark.sql import types as T

from pyspark.ml.feature import StringIndexer, OneHotEncoder

# Creating vectors from features
from pyspark.ml.feature import VectorAssembler

# Using Decision tree classifier
from pyspark.ml.classification import RandomForestClassifier

Using the Pipeline

In [None]:
# Importing pipeline

from pyspark.ml import Pipeline


In [None]:
# Importing and selecting columns

from pyspark.sql.functions import col

dataset = df.select(col('Survived').cast('float'),
                    col('Pclass').cast('float'),
                    col('Sex'),
                    col('Age').cast('float'),
                    col('Fare').cast('float'),
                    col('Embarked')
                    )

dataset.show(4)

+--------+------+------+----+-------+--------+
|Survived|Pclass|   Sex| Age|   Fare|Embarked|
+--------+------+------+----+-------+--------+
|     0.0|   3.0|  male|22.0|   7.25|       S|
|     1.0|   1.0|female|38.0|71.2833|       C|
|     1.0|   3.0|female|26.0|  7.925|       S|
|     1.0|   1.0|female|35.0|   53.1|       S|
+--------+------+------+----+-------+--------+
only showing top 4 rows



In [None]:
# Removing null values

from pyspark.sql.functions import isnull, when, count, col

dataset.select([count(when(isnull(c), c)).alias(c) for c in dataset.columns]).show()

+--------+------+---+---+----+--------+
|Survived|Pclass|Sex|Age|Fare|Embarked|
+--------+------+---+---+----+--------+
|       0|     0|  0|177|   0|       2|
+--------+------+---+---+----+--------+



In [None]:
dataset = dataset.replace('?', None)\
          .dropna(how='any')

dataset.select([count(when(isnull(c), c)).alias(c) for c in dataset.columns]).show()

+--------+------+---+---+----+--------+
|Survived|Pclass|Sex|Age|Fare|Embarked|
+--------+------+---+---+----+--------+
|       0|     0|  0|  0|   0|       0|
+--------+------+---+---+----+--------+



In [None]:
(train_df, test_df) = dataset.randomSplit([0.8,0.2], 11)
print("Number of train samples: " + str(train_df.count()))
print("Number of test samples:" + str(test_df.count()))

Number of train samples: 562
Number of test samples:150


In [None]:
# Label encoding without fit or transform

Sex_indexer = StringIndexer(inputCol="Sex", outputCol="Gender")
Embarked_indexer = StringIndexer(inputCol="Embarked", outputCol="Boarded")

# Assembling the features

inputCols = ['Pclass', 'Age', 'Fare', 'Gender', 'Boarded']
outputCol = "features"
vector_assembler = VectorAssembler(inputCols = inputCols, outputCol = outputCol)

# Modeling using Decision tree classifier

dt_model = RandomForestClassifier(labelCol="Survived", featuresCol="features")

In [None]:
# Setting up the pipeline

pipeline = Pipeline(stages=[Sex_indexer, Embarked_indexer, vector_assembler, dt_model])

# Fitting the model

final_pipeline = pipeline.fit(train_df)

# Prediction on test data

test_predictions_from_pipeline = final_pipeline.transform(test_df)
test_predictions_from_pipeline.show(5, truncate=False)

+--------+------+----+----+-------+--------+------+-------+-----------------------------------+--------------------------------------+----------------------------------------+----------+
|Survived|Pclass|Sex |Age |Fare   |Embarked|Gender|Boarded|features                           |rawPrediction                         |probability                             |prediction|
+--------+------+----+----+-------+--------+------+-------+-----------------------------------+--------------------------------------+----------------------------------------+----------+
|0.0     |1.0   |male|19.0|263.0  |S       |0.0   |0.0    |[1.0,19.0,263.0,0.0,0.0]           |[11.225006325654135,8.774993674345865]|[0.5612503162827067,0.4387496837172932] |0.0       |
|0.0     |1.0   |male|21.0|77.2875|S       |0.0   |0.0    |[1.0,21.0,77.2874984741211,0.0,0.0]|[10.846314240712049,9.153685759287951]|[0.5423157120356025,0.45768428796439753]|0.0       |
|0.0     |1.0   |male|28.0|82.1708|C       |0.0   |1.0    |[1.0,2

Using Spark to load images as Dataframe

In [None]:
!pip install sparkdl
!pip install matplotlib scipy scikit-image

!pip install tensorframes

!pip install kafka-python image

!pip install tensorflowonspark

from sparkdl import readImages

#image_df.show()

Collecting image
  Downloading image-1.5.33.tar.gz (15 kB)
  Preparing metadata (setup.py) ... [?25l[?25hdone
Collecting django (from image)
  Downloading Django-5.0.3-py3-none-any.whl (8.2 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m8.2/8.2 MB[0m [31m13.8 MB/s[0m eta [36m0:00:00[0m
Collecting asgiref<4,>=3.7.0 (from django->image)
  Downloading asgiref-3.7.2-py3-none-any.whl (24 kB)
Building wheels for collected packages: image
  Building wheel for image (setup.py) ... [?25l[?25hdone
  Created wheel for image: filename=image-1.5.33-py2.py3-none-any.whl size=19482 sha256=4ecfa1b5edc1beb84ac64842424482ee83ed3f11ac9db1c1eeac932b89a870db
  Stored in directory: /root/.cache/pip/wheels/70/0c/a4/7cfa53a5c6225c2db2bfec08e782b43d0f25fdae2e995b69be
Successfully built image
Installing collected packages: asgiref, django, image
Successfully installed asgiref-3.7.2 django-5.0.3 image-1.5.33


In [None]:
from sparkdl import readImages
image_df = readImages(sample_img_dir)
image_df.show()

NameError: name 'sample_img_dir' is not defined

In [None]:
from pyspark.ml.classification import LogisticRegression
from pyspark.ml import Pipeline
from sparkdl import DeepImageFeaturizer




featurizer = DeepImageFeaturizer(inputCol="image", outputCol="features", modelName="InceptionV3")
lr = LogisticRegression(maxIter=10, regParam=0.05, elasticNetParam=0.3, labelCol="label")
p = Pipeline(stages=[featurizer, lr])

p_model = p.fit(train_df)

NameError: name 'image_df' is not defined