<a href="https://colab.research.google.com/github/Adithya0503/BDA_ASSIGNMENT_2/blob/main/BDA_2_Assignment.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [1]:
from pyspark.sql import SparkSession
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.evaluation import BinaryClassificationEvaluator
from pyspark.sql.functions import when

# Start Spark session
spark = SparkSession.builder.appName("IrisBinaryClassification").getOrCreate()

# Sample Iris data (sepal_length, sepal_width, petal_length, petal_width, species)
data = [
    (5.1, 3.5, 1.4, 0.2, "setosa"),
    (4.9, 3.0, 1.4, 0.2, "setosa"),
    (7.0, 3.2, 4.7, 1.4, "versicolor"),
    (6.4, 3.2, 4.5, 1.5, "versicolor"),
    (6.3, 3.3, 6.0, 2.5, "virginica"),
    (5.8, 2.7, 5.1, 1.9, "virginica"),
    (5.0, 3.6, 1.4, 0.2, "setosa"),
    (6.9, 3.1, 5.4, 2.1, "virginica"),
    (6.0, 2.2, 4.0, 1.0, "versicolor"),
    (5.1, 2.5, 3.0, 1.1, "versicolor")
]

columns = ["sepal_length", "sepal_width", "petal_length", "petal_width", "species"]

# Create DataFrame
df = spark.createDataFrame(data, schema=columns)

# Binary label: Setosa -> 0, Others -> 1
df = df.withColumn("label", when(df.species == "setosa", 0).otherwise(1))

# Assemble features
assembler = VectorAssembler(
    inputCols=["sepal_length", "sepal_width", "petal_length", "petal_width"],
    outputCol="features"
)
df_prepared = assembler.transform(df).select("features", "label")

# Split dataset
train_data, test_data = df_prepared.randomSplit([0.7, 0.3], seed=123)

# Logistic Regression model
lr = LogisticRegression(featuresCol="features", labelCol="label")
model = lr.fit(train_data)

# Make predictions
predictions = model.transform(test_data)
predictions.select("features", "label", "prediction", "probability").show()

# Evaluate model
evaluator = BinaryClassificationEvaluator()
auc = evaluator.evaluate(predictions)
print(f"Test AUC: {auc:.2f}")

# Stop Spark session
spark.stop()


+-----------------+-----+----------+--------------------+
|         features|label|prediction|         probability|
+-----------------+-----+----------+--------------------+
|[6.3,3.3,6.0,2.5]|    1|       1.0|[9.09928379531368...|
|[5.8,2.7,5.1,1.9]|    1|       1.0|[6.46539029609226...|
+-----------------+-----+----------+--------------------+

Test AUC: 1.00


In [2]:
from pyspark.sql import SparkSession
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.clustering import KMeans

# Start Spark session
spark = SparkSession.builder.appName("InventoryClustering").getOrCreate()

# Simulated dataset: (weight_kg, volume_m3)
data = [
    (10.0, 0.5),
    (12.0, 0.6),
    (30.0, 1.2),
    (32.0, 1.3),
    (45.0, 2.0),
    (48.0, 2.1),
    (100.0, 5.5),
    (102.0, 5.6),
    (105.0, 5.8),
    (110.0, 6.0),
    (15.0, 0.7),
    (16.0, 0.8),
    (55.0, 2.4),
    (60.0, 2.7),
    (120.0, 6.5)
]

columns = ["weight_kg", "volume_m3"]
df = spark.createDataFrame(data, schema=columns)

# Assemble features
assembler = VectorAssembler(inputCols=columns, outputCol="features")
df_features = assembler.transform(df).select("features")

# KMeans clustering with 3 clusters
kmeans = KMeans(k=3, seed=42)
model = kmeans.fit(df_features)
predictions = model.transform(df_features)

# Show results
predictions.show(truncate=False)

# Print cluster centers
print("Cluster Centers:")
for center in model.clusterCenters():
    print(center)

# Stop Spark session
spark.stop()


+-----------+----------+
|features   |prediction|
+-----------+----------+
|[10.0,0.5] |1         |
|[12.0,0.6] |1         |
|[30.0,1.2] |2         |
|[32.0,1.3] |2         |
|[45.0,2.0] |2         |
|[48.0,2.1] |2         |
|[100.0,5.5]|0         |
|[102.0,5.6]|0         |
|[105.0,5.8]|0         |
|[110.0,6.0]|0         |
|[15.0,0.7] |1         |
|[16.0,0.8] |1         |
|[55.0,2.4] |2         |
|[60.0,2.7] |2         |
|[120.0,6.5]|0         |
+-----------+----------+

Cluster Centers:
[107.4    5.88]
[13.25  0.65]
[45.    1.95]


In [3]:
from pyspark.sql import SparkSession
from pyspark.ml.recommendation import ALS
from pyspark.ml.evaluation import RegressionEvaluator

# Start Spark session
spark = SparkSession.builder.appName("MovieRecommendation").getOrCreate()

# New dataset: user_id, movie_id, rating
data = [
    (1, 201, 4.0),
    (1, 202, 3.5),
    (1, 203, 5.0),
    (2, 201, 5.0),
    (2, 204, 2.5),
    (2, 205, 3.0),
    (3, 202, 4.0),
    (3, 203, 4.5),
    (3, 206, 3.0),
    (4, 204, 4.5),
    (4, 205, 4.0),
    (4, 206, 5.0),
    (5, 201, 2.0),
    (5, 203, 3.5),
    (5, 205, 5.0),
]

columns = ["user_id", "movie_id", "rating"]

# Create DataFrame
df = spark.createDataFrame(data, schema=columns)

# Split into training and test sets
train_data, test_data = df.randomSplit([0.8, 0.2], seed=123)

# ALS model
als = ALS(
    userCol="user_id",
    itemCol="movie_id",
    ratingCol="rating",
    coldStartStrategy="drop",  # Drop NaNs
    nonnegative=True,
    implicitPrefs=False,  # Using explicit ratings
    rank=10,
    maxIter=10,
    regParam=0.1
)

# Train the model
model = als.fit(train_data)

# Predict ratings on test data
predictions = model.transform(test_data)
predictions.show()

# Evaluate using RMSE
evaluator = RegressionEvaluator(metricName="rmse", labelCol="rating", predictionCol="prediction")
rmse = evaluator.evaluate(predictions)
print(f"Test RMSE: {rmse:.2f}")

# Recommend top 3 movies for each user
user_recs = model.recommendForAllUsers(3)
user_recs.show(truncate=False)

# Stop Spark session
spark.stop()

+-------+--------+------+----------+
|user_id|movie_id|rating|prediction|
+-------+--------+------+----------+
|      1|     203|   5.0|   1.73391|
|      3|     202|   4.0| 1.9175711|
|      4|     204|   4.5|  2.062612|
+-------+--------+------+----------+

Test RMSE: 2.64
+-------+------------------------------------------------------+
|user_id|recommendations                                       |
+-------+------------------------------------------------------+
|1      |[{201, 3.950053}, {202, 3.4318361}, {206, 3.1784315}] |
|2      |[{201, 4.802418}, {202, 4.1709914}, {206, 3.7882853}] |
|3      |[{203, 4.351847}, {205, 3.8526382}, {206, 3.0097258}] |
|4      |[{206, 4.8505797}, {201, 4.0603642}, {205, 4.0125313}]|
|5      |[{205, 4.7558384}, {203, 3.509558}, {206, 3.3225448}] |
+-------+------------------------------------------------------+

