# Pyspark implementation for Distributed Computing

In [None]:
import os
import numpy as np
import pandas as pd
import missingno as msgno
import seaborn as sns
import matplotlib.pyplot as plt

In [None]:
from google.colab import files
uploaded = files.upload()

Saving student_cleaned_data.csv to student_cleaned_data.csv


In [None]:
import io
student_df = pd.read_csv(io.BytesIO(uploaded['student_cleaned_data.csv']))

In [None]:
student_df.head()

Unnamed: 0.1,Unnamed: 0,newTarget,Marital.status,Application.mode,Application.order,Course,Day.eve.attendance,Previous.qualification,Nationality,Mother.qualification,...,Age.at.enrollment,Fsem.without.eval,Ssem.credit,Ssem.enrolled,Ssem.eval,Ssem.grade,Ssem.without.eval,Unemployment.rate,Inflation.rate,GDP
0,1,0,1,8,5,2,1,1,1,13,...,20,0,0,0,0,0.0,0,10.8,1.4,1.74
1,2,1,1,6,1,11,1,1,1,1,...,19,0,0,6,6,13.666667,0,13.9,-0.3,0.79
2,3,0,1,1,5,5,1,1,1,22,...,19,0,0,6,0,0.0,0,10.8,1.4,1.74
3,4,1,1,8,2,15,1,1,1,23,...,20,0,0,6,10,12.4,0,9.4,-0.8,-3.12
4,5,1,2,12,1,3,0,1,1,22,...,45,0,0,6,6,13.0,0,13.9,-0.3,0.79


In [None]:
student_df = pd.DataFrame(student_df)

In [None]:
!pip install pyspark
from pyspark.sql import SparkSession
from pyspark.conf import SparkConf
import time


Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/


In [None]:
from pyspark.sql import SparkSession
from pyspark.ml.feature import StringIndexer, VectorAssembler
from pyspark.ml.classification import DecisionTreeClassifier
from pyspark.ml import Pipeline

# Create a Spark session
spark = SparkSession.builder.appName("DDA_Assignment").getOrCreate()

In [None]:
student_df.columns = [column.replace(".", "_") for column in student_df.columns]


In [None]:
data = spark.createDataFrame(student_df)
# Assemble your features using a VectorAssembler
feature_columns = [
    "Marital_status", "Application_mode", "Application_order", "Course", "Day_eve_attendance",
    "Previous_qualification", "Nationality", "Mother_qualification", "Father_qualification",
    "Mother_occupation", "Father_occupation", "Displaced", "Educational_special_needs", "Debtor",
    "Tuition_fees_up_to_date", "Gender", "Scholarship_holder", "Age_at_enrollment", "Fsem_without_eval",
    "Ssem_credit", "Ssem_enrolled", "Ssem_eval", "Ssem_grade", "Ssem_without_eval",
    "Unemployment_rate", "Inflation_rate", "GDP"
]


assembler = VectorAssembler(inputCols=feature_columns, outputCol="features")

# Use StringIndexer to convert categorical variables into numerical indices
indexers = [StringIndexer(inputCol=column, outputCol=column+"_index") for column in feature_columns]

# Apply the Decision Tree Classifier
dt = DecisionTreeClassifier(labelCol="newTarget", featuresCol="features")

# Create a pipeline with the indexers, assembler, and decision tree
pipeline = Pipeline(stages=indexers + [assembler, dt])

# Split the data into training and testing sets
train_data, test_data = data.randomSplit([0.7, 0.3])

# Train the model
start_time = time.time()
model = pipeline.fit(train_data)
end_time = time.time()


training_time = end_time - start_time
print(f"Time taken to train the Decision Tree model: {training_time:.2f} seconds")


# Make predictions on the test data
predictions = model.transform(test_data)



Time taken to train the Decision Tree model: 36.26 seconds


In [None]:
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

# Instantiate the evaluators for accuracy, precision, and recall
accuracy_evaluator = MulticlassClassificationEvaluator(labelCol='newTarget', predictionCol='prediction', metricName='accuracy')
precision_evaluator = MulticlassClassificationEvaluator(labelCol='newTarget', predictionCol='prediction', metricName='weightedPrecision')
recall_evaluator = MulticlassClassificationEvaluator(labelCol='newTarget', predictionCol='prediction', metricName='weightedRecall')

# Evaluate the model on test data
accuracy = accuracy_evaluator.evaluate(predictions)
precision = precision_evaluator.evaluate(predictions)
recall = recall_evaluator.evaluate(predictions)

print(f"Accuracy: {accuracy:.3f}")
print(f"Weighted Precision: {precision:.3f}")
print(f"Weighted Recall: {recall:.3f}")


Accuracy: 0.862
Weighted Precision: 0.863
Weighted Recall: 0.862
