<a href="https://colab.research.google.com/github/DenzelMurage19/School-Project/blob/main/%5BSample_Notebook%5D_AfterWork_Machine_Learning_with_PySpark.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# [Sample Notebook]  AfterWork: Machine Learning with PySpark

# Pre-requisites

In [None]:
# Import Pandas for data manipulation
import pandas as pd

In [None]:
# Install PySpark
!pip install pyspark

In [None]:
# Import the PySpark library
import pyspark

You can download the dataset for this course : https://archive.org/download/mlpy-spark/MLPySpark.zip.

# 1. Supervised Learning

## 1.1 Regression Techniques

### 1.1.1 Linear Regression

We use Linear Regression to model the relationship between a dependent variable and one or more independent variables by fitting a linear equation to the observed data points. We use this technique to understand and predict the linear relationship between variables, making it a fundamental tool in predictive modeling. In real life, we can apply Linear Regression to predict house prices based on features like square footage, number of bedrooms, and location. To apply Linear Regression, we first collect and preprocess the data, then split it into training and testing sets. Next, we train a Linear Regression model on the training data using PySpark, adjusting the model parameters to minimize the error. Finally, we evaluate the model's performance on the test data to assess its predictive accuracy.

In [None]:
# Data Importation and Exploration: https://afterwork.ai/ds/e/houseprices_7iht1.csv
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName('HousePricesLinearRegression').getOrCreate()
df = spark.read.csv('houseprices_7iht1.csv', header=True, inferSchema=True)
df.show(5)

In [None]:
# Data Preparation (Encoding Techniques)
from pyspark.ml.feature import StringIndexer
indexer = StringIndexer(inputCol='Neighborhood', outputCol='NeighborhoodIndex')
df_encoded = indexer.fit(df).transform(df)
df_encoded.show(5)

In [None]:
# Machine Learning
from pyspark.ml.feature import VectorAssembler
assembler = VectorAssembler(inputCols=['Area(sqft)', 'Bedrooms', 'Bathrooms', 'YearBuilt', 'BackyardSize', 'GarageCapacity', 'Stories', 'NeighborhoodIndex'], outputCol='features')
output = assembler.transform(df_encoded)
final_data = output.select('features', 'Price')
final_data.show(5)

In [None]:
from pyspark.ml.regression import LinearRegression
train_data, test_data = final_data.randomSplit([0.7, 0.3])
lr = LinearRegression(featuresCol='features', labelCol='Price')
lr_model = lr.fit(train_data)
test_results = lr_model.evaluate(test_data)
print('Root Mean Squared Error:', test_results.rootMeanSquaredError)
print('R2:', test_results.r2)

In [None]:
# Predict a New Record
new_data = [(1800, 3, 2, 1990, 600, 2, 1, 0)]
new_df = spark.createDataFrame(new_data, ['Area(sqft)', 'Bedrooms', 'Bathrooms', 'YearBuilt', 'BackyardSize', 'GarageCapacity', 'Stories', 'NeighborhoodIndex'])
new_output = assembler.transform(new_df)
prediction = lr_model.transform(new_output)
prediction.select('prediction').show()
spark.stop()

#### <font color="green">Challenge</font>

Create a simple Linear Regression model to predict house prices based on the features provided in the dataset from the URL: https://afterwork.ai/ds/ch/realestateprices_yxdp3.csv. Use PySpark to train the model and evaluate its performance.

In [None]:
# Data Importation and Exploration: https://afterwork.ai/ds/e/realestateprices_yxdp3.csv
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName('RealEstatePrices').getOrCreate()
df = spark.read.csv('realestateprices_yxdp3.csv', header=True, inferSchema=True)
df.show(5)

In [None]:
# Data Preparation (Encoding Techniques)
# Write your code here


In [None]:
# Machine Learning
# Write your code here


In [None]:
from pyspark.ml.regression import LinearRegression
# Write your code here


In [None]:
# Predict a New Record
new_data = [(1800, 3, 2, 1990, 600, 2, 1, 0)]
new_df = spark.createDataFrame(new_data, ['Area(sqft)', 'Bedrooms', 'Bathrooms', 'YearBuilt', 'BackyardSize', 'GarageCapacity', 'Stories', 'NeighborhoodIndex'])
# Write your code here

spark.stop()

### 1.1.2 Decision Trees

In Decision Trees for Regression Analysis, we build a tree-like structure to predict continuous values. We use this technique to model the relationship between input features and a continuous target variable. Decision Trees for Regression Analysis are useful when we want to understand how different features contribute to the prediction of a continuous outcome. For example, in real estate, we can use Decision Trees for Regression Analysis to predict the selling price of a house based on features like location, size, and number of bedrooms. To apply Decision Trees for Regression Analysis, we recursively split the dataset based on feature values to create a tree structure that predicts the continuous target variable for new data points.

In [None]:
# Data Importation and Exploration: https://afterwork.ai/ds/e/houseprices_y206q.csv
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName('HousePrices').getOrCreate()
df = spark.read.csv('houseprices_y206q.csv', header=True, inferSchema=True)
df.show(5)

In [None]:
# Data Preparation (Encoding Techniques)
from pyspark.ml.feature import StringIndexer
indexer = StringIndexer(inputCols=['Location', 'BackyardSize', 'Pool'], outputCols=['LocationIndex', 'BackyardSizeIndex', 'PoolIndex'])
df_encoded = indexer.fit(df).transform(df)
df_encoded.show(5)

In [None]:
# Machine Learning (Splitting, Model Training)
from pyspark.ml.feature import VectorAssembler
assembler = VectorAssembler(inputCols=['NumBedrooms', 'NumBathrooms', 'SquareFeet', 'YearBuilt', 'NumGarageSpaces', 'LocationIndex', 'BackyardSizeIndex', 'PoolIndex'], outputCol='features')
output = assembler.transform(df_encoded)
from pyspark.ml.regression import DecisionTreeRegressor
dt = DecisionTreeRegressor(featuresCol='features', labelCol='Price')
train_data, test_data = output.randomSplit([0.7, 0.3])
model = dt.fit(train_data)

In [None]:
# Evaluation (RMSE)
from pyspark.ml.evaluation import RegressionEvaluator
predictions = model.transform(test_data)
evaluator = RegressionEvaluator(labelCol='Price', predictionCol='prediction', metricName='rmse')
rmse = evaluator.evaluate(predictions)
print('Root Mean Squared Error (RMSE):', rmse)
spark.stop()

### 1.1.3 Random Forests

In ensemble learning with Random Forests for regression analysis, we combine multiple decision trees to create a powerful predictive model. We use Random Forests to improve the accuracy and robustness of our regression analysis by reducing overfitting and increasing generalization. One real-life use case of Random Forests in regression is predicting housing prices based on various features like location, size, and amenities. To apply Random Forests, we first create a forest of decision trees where each tree is trained on a random subset of the data and features. During prediction, we aggregate the results from all trees to make the final regression prediction, resulting in a more reliable and accurate model.



In [None]:
# Data Importation and Exploration: https://afterwork.ai/ds/e/realestateprices_k9emr.csv
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName('RealEstatePrices').getOrCreate()
df = spark.read.csv('realestateprices_k9emr.csv', header=True, inferSchema=True)
df.show(5)

In [None]:
# Data Preparation (Encoding with StringIndexer)
from pyspark.ml.feature import StringIndexer
indexer = StringIndexer(inputCol='Location', outputCol='LocationIndex')
df_indexed = indexer.fit(df).transform(df)
df_indexed.show(5)

In [None]:
# Data Preparation (VectorAssembler)
from pyspark.ml.feature import VectorAssembler
feature_cols = ['Bedrooms', 'Bathrooms', 'SquareFeet', 'LotSize(Acres)', 'YearBuilt', 'HOAFees', 'LocationIndex']
assembler = VectorAssembler(inputCols=feature_cols, outputCol='features')
df_assembled = assembler.transform(df_indexed)
df_assembled.show(5)

In [None]:
# Machine Learning (Model Training and Test Set Prediction)
from pyspark.ml.regression import RandomForestRegressor
rf = RandomForestRegressor(featuresCol='features', labelCol='Price')
rf_model = rf.fit(df_assembled)
predictions = rf_model.transform(df_assembled)
predictions.select('Price', 'prediction').show()

In [None]:
# Evaluation (RMSE, R2)
from pyspark.ml.evaluation import RegressionEvaluator
evaluator = RegressionEvaluator(labelCol='Price', predictionCol='prediction', metricName='rmse')
rmse = evaluator.evaluate(predictions)
print('Root Mean Squared Error (RMSE):', rmse)

evaluator = RegressionEvaluator(labelCol='Price', predictionCol='prediction', metricName='r2')
r2 = evaluator.evaluate(predictions)
print('R-squared (R2):', r2)

In [None]:
# Predict a New Record (With Pre-processed Data)
new_data = [(2, 2, 1200, 0.5, 2010, 1300, 0)]
new_df = spark.createDataFrame(new_data, ['Bedrooms', 'Bathrooms', 'SquareFeet', 'LotSize(Acres)', 'YearBuilt', 'HOAFees', 'LocationIndex'])
new_assembled = assembler.transform(new_df)
prediction_new = rf_model.transform(new_assembled)
prediction_new.select('prediction').show()
spark.stop()

#### <font color="green">Challenge</font>

Create a Random Forest regression model to predict housing prices based on various features like location, size, and amenities using the dataset from the URL: https://afterwork.ai/ds/e/realestateprices_k9emr.csv.

In [None]:
# Data Importation and Exploration: https://afterwork.ai/ds/e/realestateprices_k9emr.csv
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName('RealEstatePrices').getOrCreate()
df = spark.read.csv('realestateprices_k9emr.csv', header=True, inferSchema=True)
df.show(5)

In [None]:
# Data Preparation (Encoding with StringIndexer)
# Write your code here


In [None]:
# Data Preparation (VectorAssembler)
# Write your code here


In [None]:
# Machine Learning (Model Training and Test Set Prediction)
# Write your code here


In [None]:
# Evaluation (RMSE, R2)
# Write your code here


In [None]:
# Predict a New Record (With Pre-processed Data)
new_data = [(2, 2, 1200, 0.5, 2005, 1300, 0)]
new_df = spark.createDataFrame(new_data, ['Bedrooms', 'Bathrooms', 'SquareFeet', 'LotSize(Acres)', 'YearBuilt', 'HOAFees', 'LocationIndex'])
# Write your code here

spark.stop()

## 1.2 Classification Techniques

### 1.2.1 Logistic Regression

We use Logistic Regression for binary classification tasks where we want to predict the probability of an instance belonging to a particular class. In Logistic Regression, we model the relationship between the independent variables and the probability of the target variable using the logistic function. We use this technique when we have a binary outcome, such as classifying emails as spam or not spam. For example, in email filtering, we can use Logistic Regression to predict whether an incoming email is spam or not based on features like sender, subject, and content. To apply Logistic Regression, we first preprocess the data, split it into training and testing sets, train the model using PySpark's MLlib library, evaluate the model's performance using metrics like accuracy or AUC, and make predictions on new data.



In [None]:
# Data Importation and Exploration: https://afterwork.ai/ds/e/customer_churn_p9uk5.csv
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName('CustomerChurn').getOrCreate()
df = spark.read.csv("customer_churn_p9uk5.csv", header=True, inferSchema=True)
df.show(5)

In [None]:
# Data Preparation (Encode All Categorical Variables with StringIndexer)
from pyspark.ml import Pipeline
from pyspark.ml.feature import StringIndexer
indexers = [StringIndexer(inputCol=col, outputCol=col+'_index', handleInvalid='skip') for col in ['Gender', 'Education', 'MaritalStatus', 'Churn']]
pipeline = Pipeline(stages=indexers)
df_indexed = pipeline.fit(df).transform(df)
df_indexed.show(5)

In [None]:
# Data Preparation (VectorAssembler)
from pyspark.ml.feature import VectorAssembler
assembler = VectorAssembler(inputCols=['Age', 'Income', 'FamilySize', 'Usage', 'SubscriptionLength', 'Gender_index', 'Education_index', 'MaritalStatus_index'], outputCol='features')
df_assembled = assembler.transform(df_indexed)
df_assembled.show(5)

In [None]:
# Machine Learning (Model Training and Test Set Prediction)
from pyspark.ml.classification import LogisticRegression
train_data, test_data = df_assembled.randomSplit([0.7, 0.3], seed=123)
lr = LogisticRegression(featuresCol='features', labelCol='Churn_index')
lr_model = lr.fit(train_data)
predictions = lr_model.transform(test_data)
predictions.select('Churn_index', 'prediction', 'probability').show()

In [None]:
# Evaluation (Accuracy)
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
evaluator = MulticlassClassificationEvaluator(labelCol='Churn_index', predictionCol='prediction', metricName='accuracy')
accuracy = evaluator.evaluate(predictions)
print('Accuracy:', accuracy)

In [None]:
# Predict a New Record (With Pre-processed Data)
new_data = [(31, 50000, 2, 90, 12, 0.0, 1.0, 0.0)]
new_df = spark.createDataFrame(new_data, ['Age', 'Income', 'FamilySize', 'Usage', 'SubscriptionLength', 'Gender_index', 'Education_index', 'MaritalStatus_index'])
new_assembled = assembler.transform(new_df)
prediction_new = lr_model.transform(new_assembled)
prediction_new.select('prediction', 'probability').show()
spark.stop()

### 1.2.2 Naive Bayes

We use Naive Bayes for Text Classification to predict the category of a given text document based on the words present in it. We assume that the presence of each word is independent of the presence of other words in the document, which is a naive assumption but works well in practice. For example, we can use Naive Bayes to classify emails as spam or non-spam based on the words used in the email content. To apply Naive Bayes for text classification, we first preprocess the text data by tokenizing the words and converting them into numerical features using techniques like TF-IDF. We then train the Naive Bayes model on the labeled text data and use it to predict the category of new text documents.



In [None]:
# Data Importation and Exploration: https://afterwork.ai/ds/e/customer_churn_2xb5r.csv
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName('CustomerChurn').getOrCreate()
df = spark.read.csv("customer_churn_2xb5r.csv", header=True, inferSchema=True)
df.show(5)

In [None]:
# Data Preparation (Encode All Categorical Variables with StringIndexer)
from pyspark.ml.feature import StringIndexer
indexers = [StringIndexer(inputCol=col, outputCol=col+'_index').fit(df) for col in ['Gender', 'Location', 'Subscription Type', 'Payment Method', 'Churn']]
for indexer in indexers:
    df = indexer.transform(df)

In [None]:
# Data Preparation (VectorAssembler)
from pyspark.ml.feature import VectorAssembler
assembler = VectorAssembler(inputCols=['Age', 'Monthly Charges', 'Data Usage', 'Contract Length', 'Gender_index', 'Location_index', 'Subscription Type_index', 'Payment Method_index'], outputCol='features')
output = assembler.transform(df)

In [None]:
# Machine Learning (Model Training and Test Set Prediction)
from pyspark.ml.classification import NaiveBayes
model = NaiveBayes(labelCol='Churn_index', featuresCol='features')
train_data, test_data = output.randomSplit([0.7, 0.3])
nb_model = model.fit(train_data)
predictions = nb_model.transform(test_data)

In [None]:
# Evaluation (Accuracy, Confusion Matrix)
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
evaluator = MulticlassClassificationEvaluator(labelCol='Churn_index', predictionCol='prediction', metricName='accuracy')
accuracy = evaluator.evaluate(predictions)
print('Accuracy:', accuracy)

In [None]:
# Predict a New Record (With Pre-processed Data)
new_data = spark.createDataFrame([(76, 110, 15, 9, 0.0, 2.0, 1.0, 0.0)], ['Age', 'Monthly Charges', 'Data Usage', 'Contract Length', 'Gender_index', 'Location_index', 'Subscription Type_index', 'Payment Method_index'])
new_output = assembler.transform(new_data)
prediction = nb_model.transform(new_output)
prediction.select('prediction').show()
spark.stop()

#### <font color="green">Challenge</font>

Create a Python code snippet using Naive Bayes to predict customer churn based on the provided dataset from the URL: https://afterwork.ai/ds/ch/customer_churn_ty6do.csv.

In [None]:
# Data Importation and Exploration: https://afterwork.ai/ds/ch/customer_churn_ty6do.csv
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName('CustomerChurn').getOrCreate()
df = spark.read.csv("customer_churn_ty6do.csv", header=True, inferSchema=True)
df.show(5)

In [None]:
# Data Preparation (Encode All Categorical Variables with StringIndexer)
# Write your code here


In [None]:
# Data Preparation (VectorAssembler)
# Write your code here


In [None]:
# Machine Learning (Model Training and Test Set Prediction)
# Write your code here


In [None]:
# Evaluation (Accuracy)
# Write your code here


In [None]:
# Predict a New Record (With Pre-processed Data)
new_data = [(51, 1, 55, 4, 3, 70, 140, 18)]
new_df = spark.createDataFrame(new_data, ['Age', 'Gender_index', 'Monthly Charges', 'Data Usage', 'Contract Length', 'Location_index', 'Subscription Type_index'])
# Write your code here

spark.stop()

### 1.2.3 Support Vector Classification (SVC)

We use Support Vector Classification (SVC) to classify data points into different categories by finding the hyperplane that best separates the classes. We choose SVC when we have a small to medium-sized dataset with complex decision boundaries. An example of using SVC is in image classification, where we can classify images into different categories based on their features. To apply SVC, we first preprocess the data, split it into training and testing sets, then train the SVC model on the training data. Finally, we evaluate the model's performance on the testing data to assess its classification accuracy.



In [None]:
# Data Importation and Exploration: https://afterwork.ai/ds/e/customer_churn_jpeli.csv
spark = SparkSession.builder.appName('CustomerChurnSVC').getOrCreate()
df = spark.read.csv("customer_churn_jpeli.csv", header=True, inferSchema=True)
df.show(5)

In [None]:
# Data Preparation (Encode All Categorical Variables with StringIndexer)
gender_indexer = StringIndexer(inputCol='Gender', outputCol='GenderIndex').fit(df)
education_indexer = StringIndexer(inputCol='Education', outputCol='EducationIndex').fit(df)
marital_indexer = StringIndexer(inputCol='MaritalStatus', outputCol='MaritalIndex').fit(df)
churn_indexer = StringIndexer(inputCol='Churn', outputCol='label').fit(df)
df_encoded = gender_indexer.transform(df)
df_encoded = education_indexer.transform(df_encoded)
df_encoded = marital_indexer.transform(df_encoded)
df_encoded = churn_indexer.transform(df_encoded)
df_encoded.show(5)

In [None]:
# Data Preparation (VectorAssembler)
assembler = VectorAssembler(inputCols=['GenderIndex', 'Age', 'Income', 'FamilySize', 'EducationIndex', 'MaritalIndex', 'Usage', 'SubscriptionLength'], outputCol='features')
output = assembler.transform(df_encoded)

In [None]:
# Machine Learning (Model Training and Test Set Prediction)
from pyspark.ml.classification import LinearSVC
train_data, test_data = output.randomSplit([0.7, 0.3])
svc = LinearSVC()
svc_model = svc.fit(train_data)
predictions = svc_model.transform(test_data)

In [None]:
# Evaluation (Accuracy)
evaluator = MulticlassClassificationEvaluator(metricName='accuracy')
accuracy = evaluator.evaluate(predictions)
print('Accuracy:', accuracy)

In [None]:
# Predict a New Record (With Pre-processed Data)
new_data = [(0.0, 40, 60000, 3, 1.0, 0.0, 100, 12)]
new_df = spark.createDataFrame(new_data, ['GenderIndex', 'Age', 'Income', 'FamilySize', 'EducationIndex', 'MaritalIndex', 'Usage', 'SubscriptionLength'])
new_output = assembler.transform(new_df)
new_prediction = svc_model.transform(new_output)
new_prediction.select('prediction').show()
spark.stop()

# 2. Unsupervised Learning

## 2.1 K-Means Clustering

In K-Means Clustering, we aim to partition a set of data points into K clusters based on their features. We use this technique to identify patterns and group similar data points together without any predefined labels. For example, we can apply K-Means Clustering to customer segmentation in marketing. By clustering customers based on their purchasing behavior, we can target specific groups with tailored marketing strategies. To apply K-Means Clustering in PySpark, we first initialize K centroids randomly, assign each data point to the nearest centroid, recalculate the centroids based on the mean of the data points in each cluster, and iterate until convergence is reached.



In [None]:
# Data Importation and Exploration: https://afterwork.ai/ds/e/customer_churn_3b8h4.csv
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName('CustomerChurnKMeans').getOrCreate()
df = spark.read.csv('customer_churn_3b8h4.csv', header=True, inferSchema=True)
df.show(5)

In [None]:
# Data Preparation (Encode Categorical Variables with StringIndexer)
from pyspark.ml.feature import StringIndexer
categorical_cols = ['Server Name', 'IP Address', 'Operating System', 'Location', 'Environment', 'Status']
indexers = [StringIndexer(inputCol=col, outputCol=col+'_index', handleInvalid='keep').fit(df) for col in categorical_cols]
df_indexed = df
for indexer in indexers:
    df_indexed = indexer.transform(df_indexed)

In [None]:
# Machine Learning
from pyspark.ml.feature import VectorAssembler
assembler = VectorAssembler(inputCols=['Memory (GB)', 'Storage (TB)', 'CPU Cores', 'Network Bandwidth (Gbps)',
                                      'Server Name_index', 'IP Address_index', 'Operating System_index',
                                      'Location_index', 'Environment_index', 'Status_index'],
                            outputCol='features')
df_assembled = assembler.transform(df_indexed)

In [None]:
from pyspark.ml.clustering import KMeans
kmeans = KMeans(featuresCol='features', k=3)
model = kmeans.fit(df_assembled)

In [None]:
# Predictions
predictions = model.transform(df_assembled)
predictions.select('Server Name', 'prediction').show()

# Evaluation with Silhouette Score
from pyspark.ml.evaluation import ClusteringEvaluator
evaluator = ClusteringEvaluator(featuresCol='features', metricName='silhouette', distanceMeasure='squaredEuclidean')
silhouette = evaluator.evaluate(predictions)
print(f'Silhouette Score: {silhouette}')

In [None]:
# Predict a New Record (With Pre-processed Data)
new_data = [(74, 8, 2, 0.5, 1, 1, 1, 3, 2, 1)]
new_df = spark.createDataFrame(new_data, ['Memory (GB)', 'Storage (TB)', 'CPU Cores', 'Network Bandwidth (Gbps)',
                                          'Server Name_index', 'IP Address_index', 'Operating System_index',
                                          'Location_index', 'Environment_index', 'Status_index'])
new_assembled = assembler.transform(new_df)
new_prediction = model.transform(new_assembled)
new_prediction.select('prediction').show()
spark.stop()

#### <font color="green"> Challenge </font>

Create a code using PySpark to perform K-Means Clustering on the provided dataset of customer churn available at the URL: https://afterwork.ai/ds/ch/customer_churn_a4m15.csv. Use the features such as Age, Income, FamilySize, and UsageDuration for clustering the customers.

In [None]:
# Data Importation and Exploration: https://afterwork.ai/ds/ch/customer_churn_a4m15.csv
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName('CustomerChurnKMeans').getOrCreate()
df = spark.read.csv('customer_churn_a4m15.csv', header=True, inferSchema=True)
df.show(5)

In [None]:
# Machine Learning
# Write your code here


In [None]:
# Predictions
# Write your code here


In [None]:
# Predict a New Record (With Pre-processed Data)
new_data = [(30, 50000, 4, 60)]
new_df = spark.createDataFrame(new_data, ['Age', 'Income', 'FamilySize', 'UsageDuration'])
# Write your code here
