In [1]:
!pip install pyspark




In [2]:
from pyspark.sql import SparkSession
spark = SparkSession.builder\
        .master("local")\
        .appName("Colab")\
        .config('spark.ui.port', '4050')\
        .getOrCreate()

In [3]:
spark

Q1) BUILDING CLASSIFICATION MODEL USING SPARK

In [6]:
from sklearn.datasets import load_iris
import pandas as pd

iris = load_iris()
df = pd.DataFrame(iris.data, columns=iris.feature_names)
df['species'] = iris.target


In [7]:
spark_df = spark.createDataFrame(df)
spark_df.show(5)


+-----------------+----------------+-----------------+----------------+-------+
|sepal length (cm)|sepal width (cm)|petal length (cm)|petal width (cm)|species|
+-----------------+----------------+-----------------+----------------+-------+
|              5.1|             3.5|              1.4|             0.2|      0|
|              4.9|             3.0|              1.4|             0.2|      0|
|              4.7|             3.2|              1.3|             0.2|      0|
|              4.6|             3.1|              1.5|             0.2|      0|
|              5.0|             3.6|              1.4|             0.2|      0|
+-----------------+----------------+-----------------+----------------+-------+
only showing top 5 rows



In [8]:
from pyspark.ml.feature import StringIndexer

indexer = StringIndexer(inputCol="species", outputCol="label")
indexed_df = indexer.fit(spark_df).transform(spark_df)
indexed_df.show(5)


+-----------------+----------------+-----------------+----------------+-------+-----+
|sepal length (cm)|sepal width (cm)|petal length (cm)|petal width (cm)|species|label|
+-----------------+----------------+-----------------+----------------+-------+-----+
|              5.1|             3.5|              1.4|             0.2|      0|  0.0|
|              4.9|             3.0|              1.4|             0.2|      0|  0.0|
|              4.7|             3.2|              1.3|             0.2|      0|  0.0|
|              4.6|             3.1|              1.5|             0.2|      0|  0.0|
|              5.0|             3.6|              1.4|             0.2|      0|  0.0|
+-----------------+----------------+-----------------+----------------+-------+-----+
only showing top 5 rows



In [9]:
from pyspark.ml.feature import VectorAssembler

assembler = VectorAssembler(
    inputCols=iris.feature_names,
    outputCol="features"
)
assembled_df = assembler.transform(indexed_df)
assembled_df.show(5)


+-----------------+----------------+-----------------+----------------+-------+-----+-----------------+
|sepal length (cm)|sepal width (cm)|petal length (cm)|petal width (cm)|species|label|         features|
+-----------------+----------------+-----------------+----------------+-------+-----+-----------------+
|              5.1|             3.5|              1.4|             0.2|      0|  0.0|[5.1,3.5,1.4,0.2]|
|              4.9|             3.0|              1.4|             0.2|      0|  0.0|[4.9,3.0,1.4,0.2]|
|              4.7|             3.2|              1.3|             0.2|      0|  0.0|[4.7,3.2,1.3,0.2]|
|              4.6|             3.1|              1.5|             0.2|      0|  0.0|[4.6,3.1,1.5,0.2]|
|              5.0|             3.6|              1.4|             0.2|      0|  0.0|[5.0,3.6,1.4,0.2]|
+-----------------+----------------+-----------------+----------------+-------+-----+-----------------+
only showing top 5 rows



In [10]:
train_data, test_data = assembled_df.randomSplit([0.8, 0.2], seed=42)


In [14]:
from pyspark.ml.classification import DecisionTreeClassifier

# Initialize Decision Tree Classifier with regularization parameters
dt = DecisionTreeClassifier(
    labelCol="label",
    featuresCol="features",
    maxDepth=5,  # Limit tree depth
    minInstancesPerNode=10,  # Minimum instances per node
    minInfoGain=0.01,  # Minimum information gain for a split
    maxBins=32  # Maximum number of bins
)

# Train the model
model = dt.fit(train_data)

# Make predictions on training data
train_predictions = model.transform(train_data)

# Evaluate training accuracy
evaluator = MulticlassClassificationEvaluator(
    labelCol="label", predictionCol="prediction", metricName="accuracy"
)
train_accuracy = evaluator.evaluate(train_predictions)
print(f"Training Accuracy = {train_accuracy:.2f}")

# Make predictions on test data
test_predictions = model.transform(test_data)

# Evaluate test accuracy
test_accuracy = evaluator.evaluate(test_predictions)
print(f"Test Accuracy = {test_accuracy:.2f}")


Training Accuracy = 0.95
Test Accuracy = 0.96


In [15]:
import pandas as pd

# Extract feature importances
importances = model.featureImportances
importance_values = importances.toArray()

# Create a DataFrame for better visualization
importance_df = pd.DataFrame({
    "Feature": iris.feature_names,
    "Importance": importance_values
}).sort_values(by="Importance", ascending=False)

print("\nFeature Importance:")
print(importance_df)



Feature Importance:
             Feature  Importance
2  petal length (cm)         1.0
0  sepal length (cm)         0.0
1   sepal width (cm)         0.0
3   petal width (cm)         0.0


In [16]:
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

# Define a parameter grid
paramGrid = (ParamGridBuilder()
    .addGrid(dt.maxDepth, [5, 10])
    .addGrid(dt.minInstancesPerNode, [5, 10])
    .addGrid(dt.minInfoGain, [0.0, 0.01])
    .addGrid(dt.maxBins, [32, 64])
    .build())

# Initialize CrossValidator
cv = CrossValidator(
    estimator=dt,
    estimatorParamMaps=paramGrid,
    evaluator=evaluator,
    numFolds=3  # Use 3-fold cross-validation
)

# Train the model with cross-validation
cv_model = cv.fit(train_data)

# Get the best model
best_model = cv_model.bestModel

# Evaluate on test data
test_predictions = best_model.transform(test_data)
test_accuracy = evaluator.evaluate(test_predictions)
print(f"Test Accuracy with Best Model = {test_accuracy:.2f}")


Test Accuracy with Best Model = 1.00


Q2)Build a Clustering Model With Spark

In [18]:
from sklearn.datasets import load_iris
import pandas as pd

iris = load_iris()
df = pd.DataFrame(iris.data, columns=iris.feature_names)
spark_df = spark.createDataFrame(df)
spark_df.show(5)



+-----------------+----------------+-----------------+----------------+
|sepal length (cm)|sepal width (cm)|petal length (cm)|petal width (cm)|
+-----------------+----------------+-----------------+----------------+
|              5.1|             3.5|              1.4|             0.2|
|              4.9|             3.0|              1.4|             0.2|
|              4.7|             3.2|              1.3|             0.2|
|              4.6|             3.1|              1.5|             0.2|
|              5.0|             3.6|              1.4|             0.2|
+-----------------+----------------+-----------------+----------------+
only showing top 5 rows



In [19]:
from pyspark.ml.feature import StandardScaler
from pyspark.ml.linalg import Vectors
from pyspark.sql.functions import col

# Convert features to a single vector column
assembler = VectorAssembler(inputCols=iris.feature_names, outputCol="features")
assembled_df = assembler.transform(spark_df)

# Scale features
scaler = StandardScaler(inputCol="features", outputCol="scaled_features")
scaler_model = scaler.fit(assembled_df)
scaled_df = scaler_model.transform(assembled_df)
scaled_df.show(5)


+-----------------+----------------+-----------------+----------------+-----------------+--------------------+
|sepal length (cm)|sepal width (cm)|petal length (cm)|petal width (cm)|         features|     scaled_features|
+-----------------+----------------+-----------------+----------------+-----------------+--------------------+
|              5.1|             3.5|              1.4|             0.2|[5.1,3.5,1.4,0.2]|[6.15892840883878...|
|              4.9|             3.0|              1.4|             0.2|[4.9,3.0,1.4,0.2]|[5.9174018045706,...|
|              4.7|             3.2|              1.3|             0.2|[4.7,3.2,1.3,0.2]|[5.67587520030241...|
|              4.6|             3.1|              1.5|             0.2|[4.6,3.1,1.5,0.2]|[5.55511189816831...|
|              5.0|             3.6|              1.4|             0.2|[5.0,3.6,1.4,0.2]|[6.03816510670469...|
+-----------------+----------------+-----------------+----------------+-----------------+--------------------+
o

In [20]:
from pyspark.ml.clustering import KMeans

kmeans = KMeans(k=3, featuresCol="scaled_features", predictionCol="prediction")
model = kmeans.fit(scaled_df)
predictions = model.transform(scaled_df)
predictions.select("scaled_features", "prediction").show(5)


+--------------------+----------+
|     scaled_features|prediction|
+--------------------+----------+
|[6.15892840883878...|         1|
|[5.9174018045706,...|         1|
|[5.67587520030241...|         1|
|[5.55511189816831...|         1|
|[6.03816510670469...|         1|
+--------------------+----------+
only showing top 5 rows



In [21]:
from pyspark.ml.evaluation import ClusteringEvaluator

evaluator = ClusteringEvaluator(featuresCol="scaled_features", predictionCol="prediction")
silhouette_score = evaluator.evaluate(predictions)
print(f"Silhouette Score = {silhouette_score:.2f}")


Silhouette Score = 0.64


In [22]:
silhouette_scores = []
for k in range(2, 10):
    kmeans.setK(k)
    model = kmeans.fit(scaled_df)
    predictions = model.transform(scaled_df)
    score = evaluator.evaluate(predictions)
    silhouette_scores.append(score)
    print(f"Silhouette Score for k={k} = {score:.2f}")


Silhouette Score for k=2 = 0.77
Silhouette Score for k=3 = 0.64
Silhouette Score for k=4 = 0.58
Silhouette Score for k=5 = 0.52
Silhouette Score for k=6 = 0.52
Silhouette Score for k=7 = 0.51
Silhouette Score for k=8 = 0.53
Silhouette Score for k=9 = 0.52


Q3)Build a Recommendation Engine with Spark with a dataset .


In [29]:
!unzip /content/ml-1m.zip -d /content/


Archive:  /content/ml-1m.zip
   creating: /content/ml-1m/
  inflating: /content/ml-1m/movies.dat  
  inflating: /content/ml-1m/ratings.dat  
  inflating: /content/ml-1m/README   
  inflating: /content/ml-1m/users.dat  


In [30]:
ratings_raw = spark.read.text("/content/ml-1m/ratings.dat")


In [31]:
# Step 1: Unzip the dataset (only do this once)
!unzip /content/ml-1m.zip -d /content/

# Step 2: Read the unzipped ratings.dat file
ratings_raw = spark.read.text("/content/ml-1m/ratings.dat")

# Step 3: Parse data into structured format
ratings = ratings_raw.select(
    split(ratings_raw.value, "::").getItem(0).cast("int").alias("userId"),
    split(ratings_raw.value, "::").getItem(1).cast("int").alias("movieId"),
    split(ratings_raw.value, "::").getItem(2).cast("float").alias("rating")
).filter(col("userId").isNotNull() & col("movieId").isNotNull())


Archive:  /content/ml-1m.zip
replace /content/ml-1m/movies.dat? [y]es, [n]o, [A]ll, [N]one, [r]ename: y
  inflating: /content/ml-1m/movies.dat  
replace /content/ml-1m/ratings.dat? [y]es, [n]o, [A]ll, [N]one, [r]ename: y
  inflating: /content/ml-1m/ratings.dat  
replace /content/ml-1m/README? [y]es, [n]o, [A]ll, [N]one, [r]ename: y
  inflating: /content/ml-1m/README   
replace /content/ml-1m/users.dat? [y]es, [n]o, [A]ll, [N]one, [r]ename: y
  inflating: /content/ml-1m/users.dat  


In [32]:
print(ratings)

DataFrame[userId: int, movieId: int, rating: float]


In [34]:
ratings_raw.show(5, truncate=False)
print(f"Total records: {ratings_raw.count()}")

+---------------------+
|value                |
+---------------------+
|1::1193::5::978300760|
|1::661::3::978302109 |
|1::914::3::978301968 |
|1::3408::4::978300275|
|1::2355::5::978824291|
+---------------------+
only showing top 5 rows

Total records: 1000209


In [35]:
from pyspark.sql.functions import split, col

# Split the 'value' column using "::" and cast the parts into appropriate types
ratings = ratings_raw.select(
    split(col("value"), "::").getItem(0).cast("int").alias("userId"),
    split(col("value"), "::").getItem(1).cast("int").alias("movieId"),
    split(col("value"), "::").getItem(2).cast("float").alias("rating")
).na.drop()  # drop rows with nulls (just in case)

# Optional: check the parsed result
ratings.show(5)
ratings.printSchema()
print(f"Parsed records: {ratings.count()}")


+------+-------+------+
|userId|movieId|rating|
+------+-------+------+
|     1|   1193|   5.0|
|     1|    661|   3.0|
|     1|    914|   3.0|
|     1|   3408|   4.0|
|     1|   2355|   5.0|
+------+-------+------+
only showing top 5 rows

root
 |-- userId: integer (nullable = true)
 |-- movieId: integer (nullable = true)
 |-- rating: float (nullable = true)

Parsed records: 1000209


In [36]:
from pyspark.ml.recommendation import ALS
from pyspark.ml.evaluation import RegressionEvaluator

train, test = ratings.randomSplit([0.8, 0.2], seed=42)

als = ALS(
    userCol="userId",
    itemCol="movieId",
    ratingCol="rating",
    coldStartStrategy="drop",
    nonnegative=True
)

model = als.fit(train)

predictions = model.transform(test)

evaluator = RegressionEvaluator(
    metricName="rmse",
    labelCol="rating",
    predictionCol="prediction"
)

rmse = evaluator.evaluate(predictions)
print(f"RMSE: {rmse:.4f}")


RMSE: 0.8711


In [39]:
# Load movie titles
movies_raw = spark.read.text("ml-1m/movies.dat")
movies = movies_raw.select(
    split(col("value"), "::").getItem(0).cast("int").alias("movieId"),
    split(col("value"), "::").getItem(1).alias("title")
)

# Explode recommendations into rows
from pyspark.sql.functions import explode
exploded_recs = user_recs.select("userId", explode("recommendations").alias("rec"))
exploded_recs = exploded_recs.select("userId", col("rec.movieId"), col("rec.rating"))

# Join with movie titles
final_recs = exploded_recs.join(movies, on="movieId", how="left")
final_recs.orderBy("userId").show(truncate=False)


+-------+------+---------+------------------------------------------+
|movieId|userId|rating   |title                                     |
+-------+------+---------+------------------------------------------+
|3603   |148   |4.708707 |Gay Deceivers, The (1969)                 |
|3314   |148   |4.495983 |Big Trees, The (1952)                     |
|1198   |148   |4.4924755|Raiders of the Lost Ark (1981)            |
|439    |148   |4.4616227|Dangerous Game (1993)                     |
|598    |148   |4.450812 |Window to Paris (1994)                    |
|3338   |463   |4.1977086|For All Mankind (1989)                    |
|2309   |463   |4.167856 |Inheritors, The (Die Siebtelbauern) (1998)|
|2905   |463   |4.1588063|Sanjuro (1962)                            |
|858    |463   |4.141514 |Godfather, The (1972)                     |
|2760   |463   |4.1289897|Gambler, The (A J�t�kos) (1997)           |
|2309   |471   |4.7961698|Inheritors, The (Die Siebtelbauern) (1998)|
|3245   |471   |4.67

In [40]:
#movie recommendation for the 5 unique users with the movie names:
user_subset = ratings.select("userId").distinct().limit(5)
user_recs = model.recommendForUserSubset(user_subset, 5)
movies_raw = spark.read.text("ml-1m/movies.dat")

movies = movies_raw.select(
    split(col("value"), "::").getItem(0).cast("int").alias("movieId"),
    split(col("value"), "::").getItem(1).alias("title")
)
from pyspark.sql.functions import explode

# Explode the nested recommendations array
exploded_recs = user_recs.select(
    col("userId"),
    explode(col("recommendations")).alias("rec")
).select(
    col("userId"),
    col("rec.movieId"),
    col("rec.rating")
)

# Join with movie titles
final_recs = exploded_recs.join(movies, on="movieId", how="left")

# Show the recommendations
final_recs.orderBy("userId").show(truncate=False)


+-------+------+---------+------------------------------------------+
|movieId|userId|rating   |title                                     |
+-------+------+---------+------------------------------------------+
|3603   |148   |4.708707 |Gay Deceivers, The (1969)                 |
|3314   |148   |4.495983 |Big Trees, The (1952)                     |
|1198   |148   |4.4924755|Raiders of the Lost Ark (1981)            |
|439    |148   |4.4616227|Dangerous Game (1993)                     |
|598    |148   |4.450812 |Window to Paris (1994)                    |
|3338   |463   |4.1977086|For All Mankind (1989)                    |
|2309   |463   |4.167856 |Inheritors, The (Die Siebtelbauern) (1998)|
|2905   |463   |4.1588063|Sanjuro (1962)                            |
|858    |463   |4.141514 |Godfather, The (1972)                     |
|2760   |463   |4.1289897|Gambler, The (A J�t�kos) (1997)           |
|2309   |471   |4.7961698|Inheritors, The (Die Siebtelbauern) (1998)|
|3245   |471   |4.67