# Setup dependencies
I will be using pandas and sklearn for managing data and machine learning.
<details>
    <summary>pip install...</summary>

```python
# Allows to install a python package
pip install package-name
# or install python package with a specific version
pip install package-name==version
```
</details>


!pip install pandas
!pip install scikit-learn
!pip install numpy

# Modules

In [1]:
from pyspark.ml.regression import LinearRegression
from pyspark.ml.classification import LogisticRegression

from pyspark.ml.feature import VectorAssembler
from pyspark.ml.feature import StandardScaler
from pyspark.ml.feature import StringIndexer

from pyspark.sql import SparkSession


# import functions/Classes for pipeline creation

from pyspark.ml import Pipeline

# import functions/Classes for metrics
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.evaluation import MulticlassClassificationEvaluator


#Ignore any warnings by SparkSession command
spark = SparkSession.builder.appName("ML Pipeline Example").getOrCreate()

# Linear Regression

## Reading dataset

In [2]:
# Load mpg dataset
mpg_data = spark.read.csv("data/mpg.csv", header=True, inferSchema=True)

## Split into training and testing sets

In [3]:
# Split the data into training and testing sets
(trainingData, testData) = mpg_data.randomSplit([0.7, 0.3], seed=42)

## Define pipeline stages

In [4]:
# Stage 1 - assemble the input columns into a single vector 
vectorAssembler = VectorAssembler(inputCols=["Weight", "Horsepower", "Engine Disp"], outputCol="features")
# Stage 2 - scale the features using standard scaler
scaler = StandardScaler(inputCol="features", outputCol="scaledFeatures")
# Stage 3 - create a linear regression instance
lr = LinearRegression(featuresCol="scaledFeatures", labelCol="MPG")

# Build the pipeline with previous stages
pipeline = Pipeline(stages=[vectorAssembler, scaler, lr])

## Fit the pipeline

In [5]:
model = pipeline.fit(trainingData)

## Predict and evaluate the model

In [6]:
predictions = model.transform(testData)
evaluator = RegressionEvaluator(labelCol="MPG", predictionCol="prediction", metricName="rmse")
rmse = evaluator.evaluate(predictions)
print("Root Mean Squared Error (RMSE) =", rmse)

Root Mean Squared Error (RMSE) = 3.875664618383922


## Save the model

In [16]:
!mkdir model_storage

In [None]:
model.write().overwrite().save("./model_storage")

## Load the model

In [None]:
from pyspark.ml.pipeline import PipelineModel

# Load persisted model
loaded_model = PipelineModel.load("./model_storage/")

In [None]:
# Make predictions on test data
predictions = loaded_model.transform(testData)

# Logistic Regression

## Reading dataset

In [7]:
# Load mpg dataset
iris_data = spark.read.csv("data/iris.csv", header=True, inferSchema=True)
iris_data.show(5)

+-------------+------------+-------------+------------+------+-----------+
|SepalLengthCm|SepalWidthCm|PetalLengthCm|PetalWidthCm|Detail|    Species|
+-------------+------------+-------------+------------+------+-----------+
|          5.1|         3.5|          1.4|         0.2|    mn|Iris-setosa|
|          4.9|         3.0|          1.4|         0.2|    mn|Iris-setosa|
|          4.7|         3.2|          1.3|         0.2|    mn|Iris-setosa|
|          4.6|         3.1|          1.5|         0.2|    mn|Iris-setosa|
|          5.0|         3.6|          1.4|         0.2|    mn|Iris-setosa|
+-------------+------------+-------------+------------+------+-----------+
only showing top 5 rows



## Split the data

In [8]:
(trainingData, testData) = iris_data.randomSplit([0.7, 0.3], seed=42)

## Defining pipeline

In [9]:
indexer = StringIndexer(inputCol="Species", outputCol="label")

indexer_x = StringIndexer(inputCol="Detail", outputCol="DetailIdx")
vectorAssembler = VectorAssembler(inputCols=["SepalLengthCm", "SepalWidthCm", "PetalLengthCm","PetalWidthCm","DetailIdx"], outputCol="features")

subpipeline = Pipeline(stages=[indexer_x, vectorAssembler])

scaler = StandardScaler(inputCol="features", outputCol="scaledFeatures")

classifier = LogisticRegression(featuresCol="scaledFeatures", labelCol="label")

pipeline = Pipeline(stages=[indexer, subpipeline, scaler, classifier])

## Fit the pipeline

In [13]:
model = pipeline.fit(trainingData)

## Evaluate the model

In [11]:
predictions = model.transform(testData)

evaluator = MulticlassClassificationEvaluator(labelCol="label", predictionCol="prediction", metricName="accuracy")
accuracy = evaluator.evaluate(predictions)
print("Accuracy =", accuracy)

Accuracy = 1.0


# Conclusion
- This model explains in around 93% of the data.
- The rate of increasing of dependent variable is around 44.24.
- 0.1 is the stating value of dependent variable.