<a href="https://colab.research.google.com/github/Vinay247-g/BDA_Assignment_02/blob/main/BDA__ASSIGNMENT__02.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
!pip install pyspark



Build a Classification Model with Spark with a dataset of your choice

In [None]:
from pyspark.sql import SparkSession

spark = SparkSession.builder.appName("TweetSentimentClassifier").getOrCreate()


Loading Tweet_sentiment dataset

In [None]:
df = spark.read.csv("/content/tweet_sentiment.csv", header=True, inferSchema=True)
df.show(5)


+--------------------+---------+
|               tweet|sentiment|
+--------------------+---------+
|The event starts ...|  neutral|
|I hate how this t...| negative|
|Fantastic experie...| positive|
|Fantastic experie...| positive|
|This is the worst...| negative|
+--------------------+---------+
only showing top 5 rows



In [None]:
from pyspark.sql.functions import col, lower, regexp_replace

df_clean = df.withColumn("tweet_clean", lower(col("tweet")))
df_clean = df_clean.withColumn("tweet_clean", regexp_replace("tweet_clean", "http\\S+|www\\S+", ""))
df_clean = df_clean.withColumn("tweet_clean", regexp_replace("tweet_clean", "[^a-zA-Z\\s]", ""))
df_clean = df_clean.withColumn("tweet_clean", regexp_replace("tweet_clean", "\\s+", " "))
df_clean.select("tweet_clean", "sentiment").show(5, truncate=False)


+----------------------------+---------+
|tweet_clean                 |sentiment|
+----------------------------+---------+
|the event starts at pm      |neutral  |
|i hate how this turned out  |negative |
|fantastic experience        |positive |
|fantastic experience        |positive |
|this is the worst thing ever|negative |
+----------------------------+---------+
only showing top 5 rows



In [None]:
from pyspark.ml.feature import Tokenizer, StopWordsRemover, HashingTF, IDF, StringIndexer
from pyspark.ml.classification import LogisticRegression
from pyspark.ml import Pipeline

tokenizer = Tokenizer(inputCol="tweet_clean", outputCol="words")
remover = StopWordsRemover(inputCol="words", outputCol="filtered")
tf = HashingTF(inputCol="filtered", outputCol="rawFeatures", numFeatures=5000)
idf = IDF(inputCol="rawFeatures", outputCol="features")
label_indexer = StringIndexer(inputCol="sentiment", outputCol="label")
lr = LogisticRegression(maxIter=20, regParam=0.3, elasticNetParam=0)

pipeline = Pipeline(stages=[tokenizer, remover, tf, idf, label_indexer, lr])


In [None]:
train_data, test_data = df_clean.randomSplit([0.8, 0.2], seed=42)
model = pipeline.fit(train_data)
predictions = model.transform(test_data)


In [None]:
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

predictions.select("tweet", "sentiment", "prediction").show(5)

evaluator = MulticlassClassificationEvaluator(labelCol="label", predictionCol="prediction", metricName="accuracy")
accuracy = evaluator.evaluate(predictions)
print(f"Test Accuracy: {accuracy:.4f}")


+--------------------+---------+----------+
|               tweet|sentiment|prediction|
+--------------------+---------+----------+
|Absolutely loved ...| positive|       0.0|
|Absolutely loved ...| positive|       0.0|
|Absolutely loved ...| positive|       0.0|
|Absolutely loved ...| positive|       0.0|
|Absolutely loved ...| positive|       0.0|
+--------------------+---------+----------+
only showing top 5 rows

Test Accuracy: 1.0000


Build a Clustering Model with Spark with a dataset of your choice

In [None]:
from pyspark.sql import SparkSession

spark = SparkSession.builder.appName("CustomerClustering").getOrCreate()


Loading customer_segmentation dataset

In [None]:
df = spark.read.csv("/content/customer_segmentation.csv", header=True, inferSchema=True)
df.show(5)
df.printSchema()


+----+----------+----------+--------------+------+-------+--------+-----------+-------+--------+---------+---------------+---------------+----------------+------------+-----------------+---------------+-------------------+-----------------+-----------------+------------+------------+------------+------------+------------+--------+-------------+---------+--------+
|  ID|Year_Birth| Education|Marital_Status|Income|Kidhome|Teenhome|Dt_Customer|Recency|MntWines|MntFruits|MntMeatProducts|MntFishProducts|MntSweetProducts|MntGoldProds|NumDealsPurchases|NumWebPurchases|NumCatalogPurchases|NumStorePurchases|NumWebVisitsMonth|AcceptedCmp3|AcceptedCmp4|AcceptedCmp5|AcceptedCmp1|AcceptedCmp2|Complain|Z_CostContact|Z_Revenue|Response|
+----+----------+----------+--------------+------+-------+--------+-----------+-------+--------+---------+---------------+---------------+----------------+------------+-----------------+---------------+-------------------+-----------------+-----------------+----------

In [None]:
from pyspark.sql.functions import col

# Drop non-numeric columns if any (e.g., customer name/ID)
numeric_cols = [field.name for field in df.schema.fields if str(field.dataType) in ['IntegerType', 'DoubleType']]
df_numeric = df.select(*numeric_cols)
df_numeric = df_numeric.dropna()
df_numeric.show(5)


++
||
++
||
||
||
||
||
++
only showing top 5 rows



In [None]:
from pyspark.ml.feature import VectorAssembler

assembler = VectorAssembler(inputCols=df_numeric.columns, outputCol="features")
assembled_data = assembler.transform(df_numeric)
assembled_data.select("features").show(5, truncate=False)


+--------+
|features|
+--------+
|[]      |
|[]      |
|[]      |
|[]      |
|[]      |
+--------+
only showing top 5 rows



In [None]:
from pyspark.ml.evaluation import ClusteringEvaluator

evaluator = ClusteringEvaluator()
silhouette = evaluator.evaluate(predictions)
print(f"Silhouette Score: {silhouette:.4f}")


Silhouette Score: 0.1440


Build a Recommendation Engine with Spark with a dataset of your choice

In [None]:
from pyspark.sql import SparkSession

spark = SparkSession.builder.appName("MovieRecommender").getOrCreate()


Loading movies and ratings dataset

In [None]:
movies = spark.read.csv("/content/movies.csv", header=True, inferSchema=True)
ratings = spark.read.csv("/content/ratings.csv", header=True, inferSchema=True)

movies.show(3)
ratings.show(3)


+-------+--------------------+--------------------+
|movieId|               title|              genres|
+-------+--------------------+--------------------+
|      1|    Toy Story (1995)|Adventure|Animati...|
|      2|      Jumanji (1995)|Adventure|Childre...|
|      3|Grumpier Old Men ...|      Comedy|Romance|
+-------+--------------------+--------------------+
only showing top 3 rows

+------+-------+------+---------+
|userId|movieId|rating|timestamp|
+------+-------+------+---------+
|     1|      1|   4.0|964982703|
|     1|      3|   4.0|964981247|
|     1|      6|   4.0|964982224|
+------+-------+------+---------+
only showing top 3 rows



In [None]:
from pyspark.ml.recommendation import ALS
from pyspark.ml.evaluation import RegressionEvaluator

# Drop any NA
ratings_clean = ratings.dropna()

# Split into train/test
(train, test) = ratings_clean.randomSplit([0.8, 0.2], seed=42)


In [None]:
als = ALS(
    userCol="userId",
    itemCol="movieId",
    ratingCol="rating",
    coldStartStrategy="drop",  # avoids NaN predictions
    nonnegative=True,
    implicitPrefs=False
)

model = als.fit(train)


In [None]:
predictions = model.transform(test)

evaluator = RegressionEvaluator(metricName="rmse", labelCol="rating", predictionCol="prediction")
rmse = evaluator.evaluate(predictions)
print(f"Root Mean Squared Error = {rmse:.4f}")


Root Mean Squared Error = 0.9363


In [None]:
user_recs = model.recommendForAllUsers(5)
user_recs.show(3, truncate=False)


+------+-------------------------------------------------------------------------------------------------+
|userId|recommendations                                                                                  |
+------+-------------------------------------------------------------------------------------------------+
|1     |[{53123, 5.6336946}, {1284, 5.591563}, {171, 5.5268326}, {104374, 5.526624}, {142488, 5.5156565}]|
|2     |[{86320, 5.21019}, {5181, 4.9875655}, {94959, 4.913584}, {142488, 4.9078627}, {131724, 4.892934}]|
|3     |[{56145, 5.327325}, {6835, 4.897882}, {5746, 4.897882}, {5181, 4.8322754}, {4518, 4.7941036}]    |
+------+-------------------------------------------------------------------------------------------------+
only showing top 3 rows



In [None]:
user_id = 123
model.recommendForUserSubset(ratings.filter(ratings.userId == user_id), 5).show(truncate=False)


+------+-------------------------------------------------------------------------------------------------+
|userId|recommendations                                                                                  |
+------+-------------------------------------------------------------------------------------------------+
|123   |[{89904, 4.643761}, {177593, 4.6376953}, {183897, 4.5980153}, {1658, 4.576263}, {3358, 4.574531}]|
+------+-------------------------------------------------------------------------------------------------+

