In [3]:
# 1. Build a Classification Model with Spark with a dataset of your choice

from pyspark.sql import SparkSession
from pyspark.ml.feature import VectorAssembler, StringIndexer
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark.ml import Pipeline

# 1. Start Spark Session
spark = SparkSession.builder.appName("WineQualityClassification").getOrCreate()

# 2. Load dataset (replace with actual CSV path or URL)
wine_url = "/content/winequality-red.csv"  # Make sure this file is uploaded or available
data = spark.read.csv(wine_url, header=True, inferSchema=True, sep=';')

# 3. Index the target label column ('quality')
label_indexer = StringIndexer(inputCol="quality", outputCol="label")

# 4. Assemble features into a single vector
features = [
    "fixed acidity", "volatile acidity", "citric acid", "residual sugar",
    "chlorides", "free sulfur dioxide", "total sulfur dioxide", "density",
    "pH", "sulphates", "alcohol"
]
vector_assembler = VectorAssembler(inputCols=features, outputCol="features")

# 5. Create Logistic Regression model
lr_model = LogisticRegression(featuresCol="features", labelCol="label", maxIter=10)

# 6. Build pipeline
pipeline = Pipeline(stages=[label_indexer, vector_assembler, lr_model])

# 7. Train-test split
train_set, test_set = data.randomSplit([0.75, 0.25], seed=123)

# 8. Train the model
model = pipeline.fit(train_set)

# 9. Predict on test data
results = model.transform(test_set)

# 10. Evaluate model
evaluator = MulticlassClassificationEvaluator(labelCol="label", predictionCol="prediction", metricName="accuracy")
acc = evaluator.evaluate(results)
print(f"Test Accuracy: {acc:.2f}")

# 11. Show predictions
results.select("features", "label", "prediction").show(5, truncate=False)

# 12. Stop Spark session
spark.stop()


Test Accuracy: 0.59
+----------------------------------------------------------+-----+----------+
|features                                                  |label|prediction|
+----------------------------------------------------------+-----+----------+
|[4.9,0.42,0.0,2.1,0.048,16.0,42.0,0.99154,3.71,0.74,14.0] |2.0  |2.0       |
|[5.0,0.74,0.0,1.2,0.041,16.0,46.0,0.99258,4.01,0.59,12.5] |1.0  |1.0       |
|[5.1,0.585,0.0,1.7,0.044,14.0,86.0,0.99264,3.56,0.94,12.9]|2.0  |1.0       |
|[5.2,0.32,0.25,1.8,0.103,13.0,50.0,0.9957,3.38,0.55,9.2]  |0.0  |0.0       |
|[5.2,0.645,0.0,2.15,0.08,15.0,28.0,0.99444,3.78,0.61,12.5]|1.0  |1.0       |
+----------------------------------------------------------+-----+----------+
only showing top 5 rows



In [4]:
# 2. Build  a Clustering Model with Spark with a dataset of your choice

# Step 1: Start Spark Session
from pyspark.sql import SparkSession

spark = SparkSession.builder.appName("WineClustering").getOrCreate()

# Step 2: Load Dataset (Assumes winequality-red.csv is in same directory or uploaded in Colab)
df = spark.read.csv("winequality-red.csv", header=True, inferSchema=True, sep=';')

# Step 3: Assemble Features
from pyspark.ml.feature import VectorAssembler

feature_columns = df.columns
feature_columns.remove('quality')  # 'quality' is the label, we'll ignore it for clustering

assembler = VectorAssembler(inputCols=feature_columns, outputCol="features")
data = assembler.transform(df).select("features")

# Step 4: Apply KMeans Clustering
from pyspark.ml.clustering import KMeans

kmeans = KMeans(featuresCol="features", k=3, seed=42)  # You can try different k values
model = kmeans.fit(data)
predictions = model.transform(data)

# Step 5: Show Cluster Assignments
predictions.show(10)

# Step 6: Evaluate with Silhouette Score
from pyspark.ml.evaluation import ClusteringEvaluator

evaluator = ClusteringEvaluator(featuresCol="features", predictionCol="prediction", metricName="silhouette")
silhouette = evaluator.evaluate(predictions)
print(f"Silhouette Score = {silhouette:.2f}")

# Step 7: Stop Spark Session
spark.stop()


+--------------------+----------+
|            features|prediction|
+--------------------+----------+
|[7.4,0.7,0.0,1.9,...|         1|
|[7.8,0.88,0.0,2.6...|         2|
|[7.8,0.76,0.04,2....|         2|
|[11.2,0.28,0.56,1...|         2|
|[7.4,0.7,0.0,1.9,...|         1|
|[7.4,0.66,0.0,1.8...|         2|
|[7.9,0.6,0.06,1.6...|         2|
|[7.3,0.65,0.0,1.2...|         1|
|[7.8,0.58,0.02,2....|         1|
|[7.5,0.5,0.36,6.1...|         0|
+--------------------+----------+
only showing top 10 rows

Silhouette Score = 0.69


In [5]:
#Question 3
# Step 1: Import and Start Spark
from pyspark.sql import SparkSession
from pyspark.sql.functions import monotonically_increasing_id, rand

spark = SparkSession.builder.appName("WineRecommendationEngine").getOrCreate()

# Step 2: Load Dataset
df_raw = spark.read.csv("winequality-red.csv", header=True, inferSchema=True, sep=';')

# Step 3: Simulate wineId and userId
df_with_ids = df_raw.withColumn("wineId", monotonically_increasing_id()) \
                    .withColumn("userId", ((rand() * 10).cast("int") + 1))  # Simulating 10 tasters

# Step 4: Create rating DataFrame
ratings = df_with_ids.select("userId", "wineId", "quality").withColumnRenamed("quality", "rating")
ratings.show(5)

# Step 5: Build ALS Model
from pyspark.ml.recommendation import ALS

als = ALS(
    userCol="userId",
    itemCol="wineId",
    ratingCol="rating",
    maxIter=10,
    regParam=0.1,
    rank=10,
    nonnegative=True,
    coldStartStrategy="drop"
)

model = als.fit(ratings)

# Step 6: Generate Top-N Recommendations
userRecs = model.recommendForAllUsers(5)
itemRecs = model.recommendForAllItems(5)

print("Top-5 wine recommendations for each user:")
userRecs.show(truncate=False)

print("Top-5 user recommendations for each wine:")
itemRecs.show(truncate=False)

# Step 7: Evaluate the Model
from pyspark.ml.evaluation import RegressionEvaluator

predictions = model.transform(ratings)
evaluator = RegressionEvaluator(metricName="rmse", labelCol="rating", predictionCol="prediction")
rmse = evaluator.evaluate(predictions)
print(f"Root Mean Squared Error (RMSE): {rmse:.2f}")

# Optional: Show predictions
print("Sample predictions:")
predictions.select("userId", "wineId", "rating", "prediction").show(10)

# Step 8: Stop Spark
spark.stop()


+------+------+------+
|userId|wineId|rating|
+------+------+------+
|     5|     0|     5|
|     9|     1|     5|
|     5|     2|     5|
|     4|     3|     6|
|     5|     4|     5|
+------+------+------+
only showing top 5 rows

Top-5 wine recommendations for each user:
+------+----------------------------------------------------------------------------------------------+
|userId|recommendations                                                                               |
+------+----------------------------------------------------------------------------------------------+
|10    |[{481, 7.9429326}, {1405, 6.9500666}, {1177, 6.9500666}, {1088, 6.9500666}, {1058, 6.9500666}]|
|1     |[{1120, 7.9423184}, {1090, 7.9423184}, {1061, 7.9423184}, {267, 7.9423184}, {1555, 6.9495287}]|
|2     |[{1549, 7.945396}, {1403, 7.945396}, {498, 7.945396}, {1417, 6.9522214}, {1167, 6.9522214}]   |
|3     |[{455, 7.940487}, {1584, 6.9479256}, {1323, 6.9479256}, {1160, 6.9479256}, {1150, 6.9479256}] 