<a href="https://colab.research.google.com/github/TVRCharan-cbit/BDAassignment/blob/main/BDAassignment2.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

##Build a Classification Model with Spark with a dataset of your choice

Setting up PySpark in Colab

In [1]:
!apt-get install openjdk-11-jdk -qq > /dev/null
!pip install -q pyspark
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-11-openjdk-amd64"

from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("PlantSurvival").getOrCreate()

Creating a synthetic dataset

In [10]:
from pyspark.sql import Row

data = [
    Row(moisture=30, sunlight=5, temperature=15, survived=1),
    Row(moisture=80, sunlight=9, temperature=35, survived=0),
    Row(moisture=50, sunlight=6, temperature=22, survived=1),
    Row(moisture=90, sunlight=4, temperature=18, survived=0),
    Row(moisture=60, sunlight=8, temperature=28, survived=1),
    Row(moisture=20, sunlight=3, temperature=10, survived=0),
    Row(moisture=40, sunlight=7, temperature=20, survived=1),
    Row(moisture=70, sunlight=2, temperature=12, survived=0),
    Row(moisture=45, sunlight=6, temperature=21, survived=1),
    Row(moisture=85, sunlight=10, temperature=33, survived=0),
    Row(moisture=35, sunlight=5, temperature=17, survived=1),
    Row(moisture=55, sunlight=4, temperature=19, survived=1),
    Row(moisture=75, sunlight=3, temperature=14, survived=0),

]

df = spark.createDataFrame(data)
df.show()


+--------+--------+-----------+--------+
|moisture|sunlight|temperature|survived|
+--------+--------+-----------+--------+
|      30|       5|         15|       1|
|      80|       9|         35|       0|
|      50|       6|         22|       1|
|      90|       4|         18|       0|
|      60|       8|         28|       1|
|      20|       3|         10|       0|
|      40|       7|         20|       1|
|      70|       2|         12|       0|
|      45|       6|         21|       1|
|      85|      10|         33|       0|
|      35|       5|         17|       1|
|      55|       4|         19|       1|
|      75|       3|         14|       0|
+--------+--------+-----------+--------+



Preprocessing

In [11]:
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.classification import LogisticRegression

assembler = VectorAssembler(
    inputCols=["moisture", "sunlight", "temperature"],
    outputCol="features"
)

assembled_df = assembler.transform(df).select("features", "survived")
assembled_df.show()


+----------------+--------+
|        features|survived|
+----------------+--------+
| [30.0,5.0,15.0]|       1|
| [80.0,9.0,35.0]|       0|
| [50.0,6.0,22.0]|       1|
| [90.0,4.0,18.0]|       0|
| [60.0,8.0,28.0]|       1|
| [20.0,3.0,10.0]|       0|
| [40.0,7.0,20.0]|       1|
| [70.0,2.0,12.0]|       0|
| [45.0,6.0,21.0]|       1|
|[85.0,10.0,33.0]|       0|
| [35.0,5.0,17.0]|       1|
| [55.0,4.0,19.0]|       1|
| [75.0,3.0,14.0]|       0|
+----------------+--------+



In [12]:
train, test = assembled_df.randomSplit([0.75, 0.25], seed=123)

In [13]:
from pyspark.ml.classification import DecisionTreeClassifier

dt = DecisionTreeClassifier(labelCol="survived", featuresCol="features")
model = dt.fit(train)

In [14]:
predictions = model.transform(test)
predictions.select("features", "survived", "prediction", "probability").show()

evaluator = BinaryClassificationEvaluator(labelCol="survived")
accuracy = evaluator.evaluate(predictions)
print(f"Decision Tree Test Accuracy: {accuracy:.2f}")

+---------------+--------+----------+-----------+
|       features|survived|prediction|probability|
+---------------+--------+----------+-----------+
|[50.0,6.0,22.0]|       1|       1.0|  [0.0,1.0]|
|[45.0,6.0,21.0]|       1|       1.0|  [0.0,1.0]|
+---------------+--------+----------+-----------+

Decision Tree Test Accuracy: 1.00


##Build a Clustering Model with Spark with a dataset of your choice

In [15]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("CustomerClustering").getOrCreate()

In [17]:
from pyspark.sql import Row

data = [
    Row(food=120, electronics=300, entertainment=200),
    Row(food=80, electronics=500, entertainment=150),
    Row(food=200, electronics=150, entertainment=100),
    Row(food=300, electronics=80, entertainment=120),
    Row(food=400, electronics=100, entertainment=200),
    Row(food=100, electronics=800, entertainment=300),
    Row(food=50, electronics=900, entertainment=250),
    Row(food=280, electronics=150, entertainment=100),
    Row(food=320, electronics=120, entertainment=80),
    Row(food=70, electronics=850, entertainment=260),
]

df = spark.createDataFrame(data)
df.show()


+----+-----------+-------------+
|food|electronics|entertainment|
+----+-----------+-------------+
| 120|        300|          200|
|  80|        500|          150|
| 200|        150|          100|
| 300|         80|          120|
| 400|        100|          200|
| 100|        800|          300|
|  50|        900|          250|
| 280|        150|          100|
| 320|        120|           80|
|  70|        850|          260|
+----+-----------+-------------+



In [19]:
from pyspark.ml.feature import VectorAssembler

assembler = VectorAssembler(
    inputCols=["food", "electronics", "entertainment"],
    outputCol="features"
)

assembled_df = assembler.transform(df).select("features")
assembled_df.show()

+-------------------+
|           features|
+-------------------+
|[120.0,300.0,200.0]|
| [80.0,500.0,150.0]|
|[200.0,150.0,100.0]|
| [300.0,80.0,120.0]|
|[400.0,100.0,200.0]|
|[100.0,800.0,300.0]|
| [50.0,900.0,250.0]|
|[280.0,150.0,100.0]|
| [320.0,120.0,80.0]|
| [70.0,850.0,260.0]|
+-------------------+



In [20]:
from pyspark.ml.clustering import KMeans

kmeans = KMeans(featuresCol="features", k=2, seed=1)  # Try k=3 later too!
model = kmeans.fit(assembled_df)
clusters = model.transform(assembled_df)
clusters.show()

+-------------------+----------+
|           features|prediction|
+-------------------+----------+
|[120.0,300.0,200.0]|         0|
| [80.0,500.0,150.0]|         0|
|[200.0,150.0,100.0]|         0|
| [300.0,80.0,120.0]|         0|
|[400.0,100.0,200.0]|         0|
|[100.0,800.0,300.0]|         1|
| [50.0,900.0,250.0]|         1|
|[280.0,150.0,100.0]|         0|
| [320.0,120.0,80.0]|         0|
| [70.0,850.0,260.0]|         1|
+-------------------+----------+



In [21]:
centers = model.clusterCenters()
print("Cluster Centers:")
for idx, center in enumerate(centers):
    print(f"Cluster {idx}: {center}")


Cluster Centers:
Cluster 0: [242.85714286 200.         135.71428571]
Cluster 1: [ 73.33333333 850.         270.        ]


##Build a Recommendation Engine with Spark with a dataset of your


In [22]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("MovieRecommender").getOrCreate()

In [23]:
from pyspark.sql import Row

ratings_data = [
    Row(user_id=1, movie_id=101, rating=4.0),
    Row(user_id=1, movie_id=102, rating=3.5),
    Row(user_id=1, movie_id=103, rating=2.0),
    Row(user_id=2, movie_id=101, rating=5.0),
    Row(user_id=2, movie_id=104, rating=3.0),
    Row(user_id=2, movie_id=105, rating=4.5),
    Row(user_id=3, movie_id=102, rating=4.0),
    Row(user_id=3, movie_id=103, rating=3.0),
    Row(user_id=3, movie_id=105, rating=4.0),
    Row(user_id=4, movie_id=101, rating=2.5),
    Row(user_id=4, movie_id=104, rating=4.5),
]

df = spark.createDataFrame(ratings_data)
df.show()


+-------+--------+------+
|user_id|movie_id|rating|
+-------+--------+------+
|      1|     101|   4.0|
|      1|     102|   3.5|
|      1|     103|   2.0|
|      2|     101|   5.0|
|      2|     104|   3.0|
|      2|     105|   4.5|
|      3|     102|   4.0|
|      3|     103|   3.0|
|      3|     105|   4.0|
|      4|     101|   2.5|
|      4|     104|   4.5|
+-------+--------+------+



In [24]:
from pyspark.ml.recommendation import ALS

als = ALS(
    userCol="user_id",
    itemCol="movie_id",
    ratingCol="rating",
    coldStartStrategy="drop",
    nonnegative=True,
    rank=10,
    maxIter=10,
    regParam=0.1
)

model = als.fit(df)

In [25]:
predictions = model.transform(df)
predictions.show()

+-------+--------+------+----------+
|user_id|movie_id|rating|prediction|
+-------+--------+------+----------+
|      1|     101|   4.0| 3.8921983|
|      1|     102|   3.5|   3.40597|
|      1|     103|   2.0| 2.0751464|
|      2|     101|   5.0|  4.821928|
|      2|     104|   3.0| 3.0272722|
|      3|     102|   4.0|  3.948411|
|      3|     103|   3.0|  2.816791|
|      3|     105|   4.0| 3.9935288|
|      4|     101|   2.5|  2.545436|
|      4|     104|   4.5|   4.34013|
|      2|     105|   4.5| 4.4708824|
+-------+--------+------+----------+



In [26]:
user_recs = model.recommendForAllUsers(3)
user_recs.show(truncate=False)

+-------+-----------------------------------------------------+
|user_id|recommendations                                      |
+-------+-----------------------------------------------------+
|1      |[{101, 3.8921983}, {105, 3.549892}, {102, 3.40597}]  |
|2      |[{101, 4.821928}, {105, 4.4708824}, {102, 4.269059}] |
|3      |[{105, 3.9935286}, {102, 3.948411}, {101, 3.8892796}]|
|4      |[{104, 4.34013}, {101, 2.545436}, {102, 2.3206692}]  |
+-------+-----------------------------------------------------+

