<a href="https://colab.research.google.com/github/7pyeshwanth/bda-assignment-2/blob/main/BDA_assignment_2.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# **Big Data Analytics Assignment**

In [None]:
!pip install pyspark



## ***💼 Classification Model using Logistic Regression in PySpark***

In [None]:
# Step 1: Import Required Libraries
from pyspark.sql import SparkSession
from sklearn.datasets import load_breast_cancer
import pandas as pd
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.classification import LogisticRegression
from pyspark.ml import Pipeline
from pyspark.ml.evaluation import BinaryClassificationEvaluator

# Step 2: Initialize Spark Session
spark = SparkSession.builder \
    .appName("Breast Cancer Classification") \
    .getOrCreate()

# Step 3: Load Breast Cancer Dataset from sklearn
breast_cancer_data = load_breast_cancer()
pandas_df = pd.DataFrame(breast_cancer_data.data, columns=breast_cancer_data.feature_names)
pandas_df['target'] = breast_cancer_data.target

# Step 4: Convert Pandas DataFrame to Spark DataFrame
spark_df = spark.createDataFrame(pandas_df)

# Step 5: Feature Engineering - Assemble Feature Columns
feature_columns = [col for col in spark_df.columns if col != 'target']
vector_assembler = VectorAssembler(inputCols=feature_columns, outputCol="features")

# Step 6: Initialize Logistic Regression Model
logistic_regression = LogisticRegression(featuresCol="features", labelCol="target")

# Step 7: Build Pipeline for Transformation and Modeling
pipeline = Pipeline(stages=[vector_assembler, logistic_regression])

# Step 8: Train the Model
model = pipeline.fit(spark_df)

# Step 9: Generate Predictions on Training Data
predictions = model.transform(spark_df)
predictions.select("features", "target", "prediction").show(5, truncate=False)

# Step 10: Evaluate Model Performance
evaluator = BinaryClassificationEvaluator(labelCol="target", rawPredictionCol="rawPrediction", metricName="areaUnderROC")
auc_score = evaluator.evaluate(predictions)
print(f"Area Under ROC Curve (AUC): {auc_score:.4f}")


+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+------+----------+
|features                                                                                                                                                                                                            |target|prediction|
+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+------+----------+
|[17.99,10.38,122.8,1001.0,0.1184,0.2776,0.3001,0.1471,0.2419,0.07871,1.095,0.9053,8.589,153.4,0.006399,0.04904,0.05373,0.01587,0.03003,0.006193,25.38,17.33,184.6,2019.0,0.1622,0.6656,0.7119,0.2654,0.4601,0.1189] |0     |0.0       |
|[20.57,17.77,132.9,1326.0,0.08474,0.07864,0.0869,0.07017,0.1812,0.0

# ***KMeans Clustering on Iris Dataset using PySpark***

In [None]:
# Import necessary libraries
from pyspark.sql import SparkSession
from pyspark.ml.clustering import KMeans
from pyspark.ml.feature import VectorAssembler
from pyspark.ml import Pipeline
from sklearn.datasets import load_iris
import pandas as pd

# Step 1: Load the Iris dataset from sklearn
iris = load_iris()
df = pd.DataFrame(iris.data, columns=iris.feature_names)
df['label'] = iris.target  # Add target labels for reference

# Step 2: Initialize Spark session
spark = SparkSession.builder \
    .appName("Iris KMeans Clustering") \
    .getOrCreate()

# Convert the Pandas DataFrame to a Spark DataFrame
spark_df = spark.createDataFrame(df)

# Show the first few rows of the dataset
spark_df.show(5)

# Step 3: Feature Engineering using VectorAssembler
feature_columns = [col for col in df.columns if col != 'label']  # Exclude the target column 'label'
assembler = VectorAssembler(inputCols=feature_columns, outputCol="features")

# Show the transformed dataframe with features vector
assembled_df = assembler.transform(spark_df)
assembled_df.select("label", "features").show(5)

# Step 4: Apply KMeans Clustering
kmeans = KMeans(k=3, seed=1, featuresCol="features", predictionCol="prediction")

# Create a pipeline
pipeline = Pipeline(stages=[assembler, kmeans])

# Fit the model
model = pipeline.fit(spark_df)

# Step 5: Make Predictions
predictions = model.transform(spark_df)

# Show the first 5 rows with cluster predictions
predictions.select("label", "prediction").show(5)

# Step 6: Evaluate Clustering Model (Corrected)
wssse = model.stages[-1].summary.trainingCost  # Accessing the WSSSE from the model summary
print(f"Within Set Sum of Squared Errors (WSSSE): {wssse}")


+-----------------+----------------+-----------------+----------------+-----+
|sepal length (cm)|sepal width (cm)|petal length (cm)|petal width (cm)|label|
+-----------------+----------------+-----------------+----------------+-----+
|              5.1|             3.5|              1.4|             0.2|    0|
|              4.9|             3.0|              1.4|             0.2|    0|
|              4.7|             3.2|              1.3|             0.2|    0|
|              4.6|             3.1|              1.5|             0.2|    0|
|              5.0|             3.6|              1.4|             0.2|    0|
+-----------------+----------------+-----------------+----------------+-----+
only showing top 5 rows

+-----+-----------------+
|label|         features|
+-----+-----------------+
|    0|[5.1,3.5,1.4,0.2]|
|    0|[4.9,3.0,1.4,0.2]|
|    0|[4.7,3.2,1.3,0.2]|
|    0|[4.6,3.1,1.5,0.2]|
|    0|[5.0,3.6,1.4,0.2]|
+-----+-----------------+
only showing top 5 rows

+-----+-------

## ***📚 Book Recommendation with PySpark***

In [None]:
# Step 1: Import required libraries
from pyspark.sql import SparkSession
from pyspark.ml.recommendation import ALS
from pyspark.ml.evaluation import RegressionEvaluator
import pandas as pd

# Step 2: Initialize a Spark Session
spark = SparkSession.builder \
    .appName("Book Recommendation Engine") \
    .getOrCreate()

# Step 3: Load the ratings data from the Goodbooks-10k dataset
# The dataset contains user-book ratings with columns: user_id, book_id, rating
url = "https://raw.githubusercontent.com/zygmuntz/goodbooks-10k/master/ratings.csv"
df_pd = pd.read_csv(url)

# Select relevant columns for recommendation system
df_pd = df_pd[['user_id', 'book_id', 'rating']]

# Convert to Spark DataFrame
df = spark.createDataFrame(df_pd)

# Display sample data
print("Sample Book Ratings:")
df.show(5)

# Step 4: Split data into training and testing sets
training_data, testing_data = df.randomSplit([0.8, 0.2])

# Step 5: Build ALS (Alternating Least Squares) recommendation model
als_model = ALS(
    userCol="user_id",
    itemCol="book_id",
    ratingCol="rating",
    coldStartStrategy="drop"  # Avoids NaN predictions
)

# Train the model on the training dataset
model = als_model.fit(training_data)

# Step 6: Generate predictions on test dataset
predictions = model.transform(testing_data)

# Step 7: Evaluate the model using Root Mean Squared Error (RMSE)
evaluator = RegressionEvaluator(
    metricName="rmse",
    labelCol="rating",
    predictionCol="prediction"
)
rmse = evaluator.evaluate(predictions)
print(f"Root Mean Squared Error (RMSE) on test data: {rmse:.4f}")

# Step 8: Generate Top-5 book recommendations for
recommendations = model.recommendForAllUsers(5)

# Display sample recommendations
print("Top-5 Movie Recommendations for Users:")
recommendations.show(5, truncate=False)

Sample Book Ratings:
+-------+-------+------+
|user_id|book_id|rating|
+-------+-------+------+
|      1|    258|     5|
|      2|   4081|     4|
|      2|    260|     5|
|      2|   9296|     5|
|      2|   2318|     3|
+-------+-------+------+
only showing top 5 rows

Root Mean Squared Error (RMSE) on test data: 0.8214
Top-5 Movie Recommendations for Users:
+-------+----------------------------------------------------------------------------------------------+
|user_id|recommendations                                                                               |
+-------+----------------------------------------------------------------------------------------------+
|1      |[{9566, 4.537521}, {3628, 4.52148}, {7254, 4.4257407}, {5580, 4.415051}, {6920, 4.411584}]    |
|3      |[{1338, 2.5264645}, {4154, 2.4865828}, {2236, 2.4503016}, {9347, 2.3921611}, {464, 2.3821602}]|
|5      |[{3628, 4.945353}, {6590, 4.8637643}, {9566, 4.8148603}, {5580, 4.799803}, {4483, 4.7857976}] |
|6      