<a href="https://colab.research.google.com/github/Anirudh3104/BDA-Assignment-2/blob/main/BDAAssignment2.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# **Big Data Analytics Assignment**

In [12]:
!pip install pyspark



## ***📧 Spam Classification using Decision Tree in PySpark***

In [13]:
# Step 1: Install and Load SMS Spam Collection Dataset
import nltk
import pandas as pd
import urllib.request
import zipfile

nltk.download('punkt')
nltk.download('stopwords')

# Download and extract the dataset
dataset_url = "https://archive.ics.uci.edu/ml/machine-learning-databases/00228/smsspamcollection.zip"
urllib.request.urlretrieve(dataset_url, "smsspamcollection.zip")

with zipfile.ZipFile("smsspamcollection.zip", 'r') as zip_ref:
    zip_ref.extractall(".")

# Load into pandas DataFrame
df = pd.read_csv("SMSSpamCollection", sep='\t', names=["label", "message"])
df["label"] = df["label"].map({"ham": 0, "spam": 1})  # Binary Encoding

# Step 2: Initialize Spark Session and Convert to Spark DataFrame
from pyspark.sql import SparkSession

spark = SparkSession.builder \
    .appName("SMS Spam Detection") \
    .getOrCreate()

spark_df = spark.createDataFrame(df)
spark_df.show(5, truncate=False)

# Step 3: Preprocess Text and Extract Features
from pyspark.ml.feature import Tokenizer, HashingTF, IDF

tokenizer = Tokenizer(inputCol="message", outputCol="words")
hashingTF = HashingTF(inputCol="words", outputCol="rawFeatures", numFeatures=10000)
idf = IDF(inputCol="rawFeatures", outputCol="features")

# Step 4: Initialize and Apply Decision Tree Classifier
from pyspark.ml.classification import DecisionTreeClassifier
from pyspark.ml import Pipeline

decision_tree = DecisionTreeClassifier(featuresCol="features", labelCol="label")
pipeline = Pipeline(stages=[tokenizer, hashingTF, idf, decision_tree])

# Train the Model
model = pipeline.fit(spark_df)

# Step 5: Generate Predictions and Evaluate Model
predictions = model.transform(spark_df)
predictions.select("message", "label", "prediction").show(5, truncate=False)

from pyspark.ml.evaluation import MulticlassClassificationEvaluator

evaluator = MulticlassClassificationEvaluator(labelCol="label", predictionCol="prediction", metricName="accuracy")
accuracy = evaluator.evaluate(predictions)
print(f"Model Accuracy: {accuracy:.4f}")


[nltk_data] Downloading package punkt to /root/nltk_data...
[nltk_data]   Package punkt is already up-to-date!
[nltk_data] Downloading package stopwords to /root/nltk_data...
[nltk_data]   Package stopwords is already up-to-date!


+-----+-----------------------------------------------------------------------------------------------------------------------------------------------------------+
|label|message                                                                                                                                                    |
+-----+-----------------------------------------------------------------------------------------------------------------------------------------------------------+
|0    |Go until jurong point, crazy.. Available only in bugis n great world la e buffet... Cine there got amore wat...                                            |
|0    |Ok lar... Joking wif u oni...                                                                                                                              |
|1    |Free entry in 2 a wkly comp to win FA Cup final tkts 21st May 2005. Text FA to 87121 to receive entry question(std txt rate)T&C's apply 08452810075over18's|
|0    |U dun say

## ***KMeans Clustering on Wine Dataset using PySpark***

In [14]:
# Step 1: Import Required Libraries
from pyspark.sql import SparkSession
from pyspark.ml.clustering import KMeans
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.evaluation import ClusteringEvaluator
from sklearn.datasets import load_wine
import pandas as pd

# Step 2: Initialize Spark Session
spark = SparkSession.builder \
    .appName("KMeans Clustering - Wine Dataset") \
    .getOrCreate()

# Step 3: Load Wine Dataset from scikit-learn
wine_data = load_wine()

# Convert the wine data into a Pandas DataFrame for easier manipulation and inspection
df = pd.DataFrame(wine_data.data, columns=wine_data.feature_names)
df['target'] = wine_data.target  # Add the target column (which indicates the wine class)

# Step 4: Convert Pandas DataFrame to Spark DataFrame
spark_df = spark.createDataFrame(df)

# Show the first few rows to understand the data
spark_df.show(5)

# Step 5: Feature Engineering - Assemble Features
assembler = VectorAssembler(inputCols=wine_data.feature_names, outputCol="features")
spark_df = assembler.transform(spark_df)

# Step 6: Apply KMeans Clustering
kmeans = KMeans().setK(3).setSeed(1)  # 3 clusters (Wine dataset has 3 classes), set seed for reproducibility
model = kmeans.fit(spark_df)

# Step 7: Make Predictions
predictions = model.transform(spark_df)

# Step 8: Show the Results
predictions.select("features", "target", "prediction").show(5, truncate=False)

# Step 9: Evaluate the Clustering Performance
evaluator = ClusteringEvaluator()
silhouette_score = evaluator.evaluate(predictions)
print(f"Silhouette Score: {silhouette_score:.4f}")

# Step 10: Show Cluster Centers (Centroids)
centers = model.clusterCenters()
print("Cluster Centers (Centroids):")
for center in centers:
    print(center)


+-------+----------+----+-----------------+---------+-------------+----------+--------------------+---------------+---------------+----+----------------------------+-------+------+
|alcohol|malic_acid| ash|alcalinity_of_ash|magnesium|total_phenols|flavanoids|nonflavanoid_phenols|proanthocyanins|color_intensity| hue|od280/od315_of_diluted_wines|proline|target|
+-------+----------+----+-----------------+---------+-------------+----------+--------------------+---------------+---------------+----+----------------------------+-------+------+
|  14.23|      1.71|2.43|             15.6|    127.0|          2.8|      3.06|                0.28|           2.29|           5.64|1.04|                        3.92| 1065.0|     0|
|   13.2|      1.78|2.14|             11.2|    100.0|         2.65|      2.76|                0.26|           1.28|           4.38|1.05|                         3.4| 1050.0|     0|
|  13.16|      2.36|2.67|             18.6|    101.0|          2.8|      3.24|                 

## ***Movie recommendation with PySpark***

In [15]:
# Step 1: Import necessary libraries
from pyspark.sql import SparkSession
from pyspark.ml.recommendation import ALS
from pyspark.ml.evaluation import RegressionEvaluator
import pandas as pd

# Step 2: Initialize Spark Session
spark = SparkSession.builder \
    .appName("MovieLens Recommendation Engine") \
    .getOrCreate()

# Step 3: Load the MovieLens 100k dataset
# The dataset contains 100,000 ratings from 943 users on 1682 movies
url = "https://files.grouplens.org/datasets/movielens/ml-100k/u.data"
columns = ['userId', 'movieId', 'rating', 'timestamp']

# Read using pandas
df_pd = pd.read_csv(url, sep='\t', names=columns)

# Drop unnecessary columns and convert to Spark DataFrame
df = spark.createDataFrame(df_pd.drop('timestamp', axis=1))

# Display sample data
print("Sample Ratings:")
df.show(5)

# Step 4: Split data into training and testing sets (80-20 split)
training_data, testing_data = df.randomSplit([0.8, 0.2])

# Step 5: Build the ALS (Alternating Least Squares) recommendation model
als_model = ALS(
    userCol="userId",
    itemCol="movieId",
    ratingCol="rating",
    coldStartStrategy="drop"  # Drop NaNs during prediction
)

# Train the ALS model on the training data
model = als_model.fit(training_data)

# Step 6: Predict ratings on the test data
predictions = model.transform(testing_data)

# Step 7: Evaluate the model using Root Mean Squared Error (RMSE)
evaluator = RegressionEvaluator(
    metricName="rmse",
    labelCol="rating",
    predictionCol="prediction"
)
rmse = evaluator.evaluate(predictions)
print(f"Root Mean Squared Error (RMSE) on test data: {rmse:.4f}")

# Step 8: Generate Top-5 movie recommendations for all users
recommendations = model.recommendForAllUsers(5)

# Display sample recommendations
print("Top-5 Movie Recommendations for Users:")
recommendations.show(5, truncate=False)


Sample Ratings:
+------+-------+------+
|userId|movieId|rating|
+------+-------+------+
|   196|    242|     3|
|   186|    302|     3|
|    22|    377|     1|
|   244|     51|     2|
|   166|    346|     1|
+------+-------+------+
only showing top 5 rows

Root Mean Squared Error (RMSE) on test data: 0.9262
Top-5 Movie Recommendations for Users:
+------+----------------------------------------------------------------------------------------------+
|userId|recommendations                                                                               |
+------+----------------------------------------------------------------------------------------------+
|1     |[{1589, 5.3769145}, {1449, 5.1196933}, {408, 5.0904303}, {1643, 5.0285673}, {1405, 4.9508567}]|
|2     |[{1449, 4.7590003}, {318, 4.6494007}, {963, 4.6178613}, {408, 4.6051507}, {1398, 4.5990634}]  |
|3     |[{1268, 4.514106}, {320, 4.509019}, {838, 4.491013}, {902, 4.4837756}, {1591, 4.390137}]      |
|4     |[{838, 5.7553835}, {