## Spark

#### PySpark is a Python interface to Apache Spark. Besides allowing us to write Spark applications using the Python Application Programming Interface, it also provides a PySpark shell for interactively analyzing data in a distributed environment. PySpark supports most Spark features such as Spark SQL, DataFrame, Streaming, MLlib (machine learning), Spark Core.

### Importing findspark package

#### Provide findspark.init() to make pyspark importable as a standard library.

In [1]:
pip install findspark





[notice] A new release of pip is available: 23.0.1 -> 23.1.1
[notice] To update, run: python.exe -m pip install --upgrade pip


In [2]:
import findspark
findspark.init()

### Invoking Spark Components

#### SparkContext is the entry point for all Spark functionality. Running a Spark application starts a driver program with a main function, which starts the SparkContext. The driver program then executes the operation within the worker node's executor.

#### SparkContext uses Py4J to launch a JVM and create a JavaSparkContext. By default PySpark has SparkContext as "sc" so creating a new his SparkContext doesn't work.

### Configuration initialization

#### pyspark.SparkConf is the Main access factor for DataFrame and SQL functionality.

### Initialization of Spark Session--SparkSession

#### Entry point for programming Spark using the Dataset and DataFrame APIs. In environments where this is pre-built (REPL, Notebook, etc.), use the builder to get an existing session.


#### After all these processes we are going to display a dataset which we have collected from external resources

In [None]:

from pyspark import SparkConf, SparkContext
from pyspark.sql import SQLContext
from pyspark.sql import SparkSession
sc = SparkContext.getOrCreate()
sqlContext = SQLContext(sc)
spark = SparkSession.builder.master("local").appName("sample").getOrCreate()
df = sqlContext.read.format('com.databricks.spark.csv').options(header='true', inferschema='true').load('C:/Users/JASHWANTH/project/Performance Evaluation of Distributed Machine Learning/M2/preprocessed/preprocessed2.csv')
df.take(1)
df.show()

### Display Total Number of Rows and Columns

In [None]:

print("Rows= ",df.count())
print("Columns= ",len(df.columns))

### We have to check any Empty values have been observed in the Dataset

In [None]:
'''check null values'''
from pyspark.sql.functions import isnull, when, count, col
df.select([count(when(isnull(c), c)).alias(c) for c in df.columns]).show()

### The Column names are Displayed to show the fields used in our Dataset

In [None]:

print(df.count(),len(df.columns))
CONT_COLS_NAMES=[]
STRING_COLS_NAMES=[]
for i in df.dtypes:
    if i[1] == "string":
        STRING_COLS_NAMES.append(i[0])
    else:
        CONT_COLS_NAMES.append(i[0])
print(STRING_COLS_NAMES)
print(CONT_COLS_NAMES)

### Indexing the labels---StringIndexer

#### A label indexer that maps labeled string columns to ML columns with labeled indexes. If the input column is numeric, convert it to a string and index the string value. The index is in [0, numLabels) . By default, this is ordered by label frequency, so the most common label has index 0. Sorting behavior is controlled by the stringOrderType setting. The default value is "frequencyDesc".

### Handle the InvalidValues---HandleValid

#### setHandleInvalid ---> set the value of handleInvalid.

#### handleInvalid = Param(parent='undefined', name='handleInvalid', doc="How to handle invalid data (invisible or null values) in string type features and label columns. Option is 'skip' (exclude invalid rows data), error (print an error), or 'keep' (put invalid data in a special extra bucket at index numLabels).

In [None]:
'''label'''
from pyspark.ml.feature import StringIndexer
stage_1 = StringIndexer(inputCol= 'age', outputCol= 'age_index')
stage_1.setHandleInvalid("keep")
stage_2 = StringIndexer(inputCol= 'sex', outputCol= 'sex_index')
stage_2.setHandleInvalid("keep")
stage_3 = StringIndexer(inputCol= 'cp', outputCol= 'cp_index')
stage_3.setHandleInvalid("keep")
stage_4 = StringIndexer(inputCol= 'trestbps', outputCol= 'trestbps_index')
stage_4.setHandleInvalid("keep")
stage_5 = StringIndexer(inputCol= 'chol', outputCol= 'chol_index')
stage_5.setHandleInvalid("keep")
stage_6 = StringIndexer(inputCol= 'fbs', outputCol= 'fbs_index')
stage_6.setHandleInvalid("keep")
stage_7 = StringIndexer(inputCol= 'restecg', outputCol= 'restecg_index')
stage_7.setHandleInvalid("keep")
stage_8= StringIndexer(inputCol= 'thalach', outputCol= 'thalach_index')
stage_8.setHandleInvalid("keep")

###  Encode into Vector - -OneHotEncoder

#### A one-hot encoder that maps columns of category indices to columns of binary vectors. 
#### Used at most one value per row that specifies the input category index. 
#### For example, for five categories, an input value of 2.0 maps to an output vector of [0.0, 0.0, 1.0, 0.0]. The last category is not included by default (configurable with dropLast). This is because the vector entries sum to 1 and are linearly dependent. So an input value of 4.0 is equivalent to [0.0, 0.0, 0.0, 0.0].

#### If handleInvalid is set to 'keep', 'Category' is added as the last category to indicate invalid values. So if dropLast is true, all invalid values are encoded as a vector of zeros.

### Assembling the Vector--VectorAssembler

#### A feature transformer that merges multiple columns into a vector column.

### Creating Pipeline

#### A simple pipeline that acts as an estimator. A pipeline consists of a series of stages, where each stage is either an estimator or a transformer. 
#### When Pipeline.fit() is called, the phases are executed in order. If the stage is an estimator, the Estimator.fit() method is called with the input dataset to fit the model. Then use a model, a transformer, to transform the dataset as input for the next stage. 
#### If the stage is a transformer, its Transformer.transform() method is called to create the dataset for the next stage. 

#### Regression pipelines allow you to predict the value of some numeric attribute for each of your users. If you know the value of that attribute for a subset of users, you can use a Regression pipeline to leverage that information into broad insights about your entire set of users.

In [None]:

from pyspark.ml.feature import OneHotEncoder, VectorAssembler
import os
stage_14 = OneHotEncoder(inputCols=[
                                stage_1.getOutputCol(),stage_2.getOutputCol(), 
                                stage_3.getOutputCol(),stage_4.getOutputCol(),
                                stage_5.getOutputCol(),stage_6.getOutputCol(),
                                stage_7.getOutputCol(),stage_8.getOutputCol(),
                                
                               
                                ],
                                outputCols= [
                                'age_encoded','sex_encoded',
                                'cp_encoded','trestbps_encoded',
                                'chol_encoded','fbs_encoded',
                                'restecg_encoded','thalach_encoded',
                             
                                
                                ])

stage_15 = VectorAssembler(inputCols=[
                                'age_encoded','sex_encoded',
                                'cp_encoded','trestbps_encoded',
                                'chol_encoded','fbs_encoded',
                                'restecg_encoded','thalach_encoded',
                                'exang','oldpeak','slope','ca','thal'
                                ],outputCol='features')

from pyspark.ml import Pipeline
regression_pipeline = Pipeline(stages= [stage_1, stage_2, stage_3, stage_4, stage_5,
                                        stage_6,stage_7,stage_8,stage_14,stage_15])
ppmodel = regression_pipeline.fit(df)
data = ppmodel.transform(df)
print(data.select('features').show(1))

### Rectification of Errors--Standard Scaler

#### Standardize the features by removing the mean and scaling to unit variance using the column summary statistics of the training set samples.

#### "Unit Std" is calculated using the corrected sample standard deviation, calculated as the square root of the unbiased sample variance.

In [None]:
'''Standard Scaling'''
from pyspark.ml.feature import StandardScaler
standardscaler=StandardScaler().setInputCol("features").setOutputCol("Scaled_features")
scalar_fit=standardscaler.fit(data)
data=scalar_fit.transform(data)

data.select("features","Scaled_features").show(5)


### Initiation of Testing and Training

#### A random value is set to be the instance case for analysis

In [None]:
train, test = data.randomSplit([0.8, 0.2], seed=12345)

### 1st ML classifier Algorithm --> Decision Tree
#### Decision trees and their ensembles are common methods for classification and regression machine learning tasks. 
#### Decision trees are widely used because they are easy to interpret, handle categorical features, scale to multiclass classification settings, do not require feature scaling, and can capture nonlinearities and feature interactions.

#### The Decision Tree Pipeline API offers slightly more functionality than the original API. 
#### In particular, classification allows users to obtain predicted probabilities (also called conditional class probabilities) for each class.

In [None]:

from pyspark.ml.classification import DecisionTreeClassifier

dt = DecisionTreeClassifier(featuresCol = 'Scaled_features', labelCol = 'target')
dtModel = dt.fit(train)
dt_predictions = dtModel.transform(test)
dt_predictions.select('target', 'prediction', 'probability').show(10)



###  Multi class Classification - initial Training Accuracy Measurement

### MulticlassClassificationEvaluator

#### Evaluator for Multiclass Classification, which expects input columns: prediction, label, weight (optional) and probabilityCol (only for logLoss).

In [None]:

from pyspark.ml.evaluation import MulticlassClassificationEvaluator
evaluator = MulticlassClassificationEvaluator(
    labelCol='target', 
    predictionCol='prediction', 
    metricName='accuracy')
accuracy = evaluator.evaluate(dt_predictions)
print('Train Accuracy = ', accuracy)
DT_SC=accuracy*100


### 2nd ML algorithm for classifier analysis --> Binary Regression

#### logistic regression. This class supports multinomial logistic (softmax) and binomial logistic regression.

In [None]:
from pyspark.ml.classification import LogisticRegression

lr = LogisticRegression(featuresCol = 'Scaled_features', labelCol = 'target', maxIter=10)
lrModel = lr.fit(train)
lr_predictions = lrModel.transform(test)
print(lr_predictions)
lr_predictions.select('target', 'prediction', 'probability').show(10)

### Accuracy Check --> 2nd Round after Binary Regression

In [None]:
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
evaluator = MulticlassClassificationEvaluator(
    labelCol='target', 
    predictionCol='prediction', 
    metricName='accuracy')
accuracy = evaluator.evaluate(lr_predictions)
print('Train Accuracy = ', accuracy)
LR_SC=accuracy*100

### Algorithm 3 --> Random Forest Algorithm 
#### A random forest learning algorithm for classification. It supports both binary and multiclass labels, as well as continuous and categorical features.

In [None]:
from pyspark.ml.classification import RandomForestClassifier
rf = RandomForestClassifier(featuresCol = 'Scaled_features', labelCol = 'target')
rfModel = rf.fit(train)
rf_predictions = rfModel.transform(test)

rf_predictions.select('target', 'prediction', 'probability').show(8)

### Accuracy Check -->3rd Round 

In [None]:
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
evaluator = MulticlassClassificationEvaluator(
    labelCol='target', 
    predictionCol='prediction', 
    metricName='accuracy')
accuracy = evaluator.evaluate(rf_predictions)
model=accuracy
print('Train Accuracy = ', accuracy)
RF_SC=accuracy*100

### Feature Engineering

#### The feature engineering process should select the minimum required features to create a valid model. 
#### This is because the more features a model contains, the more complex the model (sparse data) and the more susceptible it is to errors due to deviations. 
#### A common approach to excluding features is to account for their relative importance to the model, then eliminate weak features or feature combinations, and re-evaluate whether the model performs better during cross-validation. is to

In [None]:
'''Feature_Importance'''
print(rfModel.featureImportances)
import pandas as pd

def ExtractFeatureImp(featureImp, dataset, featuresCol):
    list_extract = []
    for i in dataset.schema[featuresCol].metadata["ml_attr"]["attrs"]:
        list_extract = list_extract + dataset.schema[featuresCol].metadata["ml_attr"]["attrs"][i]
    varlist = pd.DataFrame(list_extract)
    varlist['score'] = varlist['idx'].apply(lambda x: featureImp[x])
    return(varlist.sort_values('score', ascending = False))

ExtractFeatureImp(rfModel.featureImportances, train, "features").head(10)

### Model Testing

#### Model testing is called the process of evaluating the performance of a fully trained model on a test set.

In [None]:
'''model_testing'''

print(test.count(),len(test.columns))
rf_predictions.select('target', 'prediction', 'probability').show()

a=rf_predictions.select('target', 'prediction', 'probability').toPandas()
print(a)
a.to_csv("./output/RANFOR_test.csv")

### Bias Checking and Prediction

#### Prediction bias is the difference between the model's apparent prediction error and its actual prediction error. 
#### Predictive bias can occur when the model contains many independent variables relative to the sample size, or when different sets of independent variables are tested in stepwise procedures.

In [None]:

'''bias'''
predictions = rfModel.transform(train)

print(train.count(),len(train.columns))

predictions.select('target', 'prediction', 'probability').show()

a=predictions.select('target', 'prediction', 'probability').toPandas()
print(a)
a.to_csv("./output/RANFOR_Train.csv")


### Confusion Matrix

#### A multi-class classification evaluator.

### Importing Float Datatype module from Pyspark-SQL package

#### Inorder to filter and fetch the float data type value, FloatType is imported

In [None]:
'''confusion matrix'''
import pyspark.sql.functions as F
from pyspark.mllib.evaluation import MulticlassMetrics
from pyspark.sql.types import FloatType


#### To select the random value predicted with some constraints needed for measurement

In [None]:
pred_and_labels = predictions.select(['prediction','target']).withColumn('target',F.col('target').cast(FloatType())).orderBy('prediction')
pred_and_labels=pred_and_labels.select(['prediction','target'])


### Graphical Representation

In [None]:
import numpy as np
import matplotlib.pyplot as plt

#### Now we compare the accuracy level of ML algorithms output and based on that we are going to select and execute the process

In [None]:
height = [RF_SC,DT_SC,LR_SC]
bars = ( 'RF', 'DT','LR')
x_pos = np.arange(len(bars))
plt.bar(x_pos, height, color=['#E32227', '#267055', '#050A30'])
plt.xticks(x_pos, bars)
plt.show()


#### It will be stored as pickle file in 2nd Module which is the key factor for our Analysis Program

In [None]:
import pickle
with open('./model/heart.pkl','wb') as f:
    pickle.dump(model,f)
    f.close()
model = pickle.load(open('./model/heart.pkl', 'rb'))