In [7]:
# Example: Linear Regression with Spark MLlib
from pyspark.sql import SparkSession
from pyspark.ml.regression import LinearRegression
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.evaluation import RegressionEvaluator

# Initialize Spark Session
spark = SparkSession.builder.appName('MLlib Example').getOrCreate()

# Load sample data
data = [(1, 5.0, 20.0), (2, 10.0, 25.0), (3, 15.0, 30.0), (4, 20.0, 35.0)]
columns = ['ID', 'Feature', 'Target']
df = spark.createDataFrame(data, columns)

# Prepare data for modeling
assembler = VectorAssembler(inputCols=['Feature'], outputCol='Features')
df_transformed = assembler.transform(df)

# Train-test split
train_data, test_data = df_transformed.randomSplit([0.8, 0.2], seed=42) 
                                                    
# Train a linear regression model
lr = LinearRegression(featuresCol='Features', labelCol='Target')
model = lr.fit(df_transformed)

# Print model coefficients
print(f'Coefficients: {model.coefficients}')
print(f'Intercept: {model.intercept}')

# Predictions
predictions = model.transform(test_data)
predictions.show()

# Evaluation
evaluator = RegressionEvaluator(labelCol='Target', predictionCol='prediction')
rmse = evaluator.evaluate(predictions)
r2 = evaluator.evaluate(predictions, {evaluator.metricName : 'r2'})

print("RMSE : ", rmse)
print("R2 : ", r2)

25/11/25 10:27:33 WARN Instrumentation: [7e6e3561] regParam is zero, which might cause numerical instability and overfitting.


Coefficients: [0.9999999999999992]
Intercept: 15.000000000000009
+---+-------+------+--------+-----------------+
| ID|Feature|Target|Features|       prediction|
+---+-------+------+--------+-----------------+
|  2|   10.0|  25.0|  [10.0]|             25.0|
|  4|   20.0|  35.0|  [20.0]|34.99999999999999|
+---+-------+------+--------+-----------------+

RMSE :  5.0242958677880805e-15
R2 :  1.0


In [13]:
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.linalg import Vectors

# Example dataset
data = [(1, Vectors.dense(2.0, 3.0), 0), (2, Vectors.dense(1.0, 5.0), 1), (3, Vectors.dense(2.5, 4.5), 1), (4, Vectors.dense(3.0, 6.0), 0)]
columns = ['ID', 'Features', 'Label']
df = spark.createDataFrame(data, columns)

# Train logistic regression model
lr = LogisticRegression(featuresCol='Features', labelCol='Label')
model = lr.fit(df)

# Display coefficients and summary
print(f'Coefficients: {model.coefficients}')
print(f'Intercept: {model.intercept}')

                                                                                

Coefficients: [-12.262057929559646,4.087352266612518]
Intercept: 11.568912727035112


In [15]:
from pyspark.ml.clustering import KMeans
from pyspark.ml.linalg import Vectors

# Example dataset
data = [(1, Vectors.dense(1.0, 1.0)), (2, Vectors.dense(5.0, 5.0)), (3, Vectors.dense(10.0, 10.0)), (4, Vectors.dense(15.0, 15.0))]
columns = ['ID', 'Features']
df = spark.createDataFrame(data, columns)

# Train KMeans clustering model
kmeans = KMeans(featuresCol='Features', k=2)
model = kmeans.fit(df)

# Show cluster centers
centers = model.clusterCenters()
print(f'Cluster Centers: {centers}')
for c in centers:
    print(c)

                                                                                

Cluster Centers: [array([5.33333333, 5.33333333]), array([15., 15.])]
[5.33333333 5.33333333]
[15. 15.]


In [3]:
# Homework
from pyspark.sql import SparkSession
from pyspark.sql.functions import regexp_replace, col
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.linalg import Vectors

spark = SparkSession.builder.appName("AmazonML").getOrCreate()

# Load dataset
df = spark.read.csv("amazon.csv", header=True, inferSchema=True)

# Cleaning numeric columns
numeric_cols = ["discounted_price", "actual_price", "discount_percentage", "rating", "rating_count"]


df_clean = df
df_clean = df_clean.withColumn("discounted_price", regexp_replace("discounted_price", "[₹,]", "").cast("float"))
df_clean = df_clean.withColumn("actual_price", regexp_replace("actual_price", "[₹,]", "").cast("float"))
df_clean = df_clean.withColumn("discount_percentage", regexp_replace("discount_percentage", "%", "").cast("float"))
df_clean = df_clean.withColumn("rating", col("rating").cast("float"))
df_clean = df_clean.withColumn("rating_count", col("rating_count").cast("float"))

df_clean.show(5)

25/11/25 12:54:52 WARN Utils: Your hostname, rayfal resolves to a loopback address: 127.0.1.1; using 10.0.2.15 instead (on interface enp0s3)
25/11/25 12:54:52 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
25/11/25 12:54:57 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
                                                                                

+----------+--------------------+--------------------+----------------+------------+-------------------+------+------------+---------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+
|product_id|        product_name|            category|discounted_price|actual_price|discount_percentage|rating|rating_count|        about_product|             user_id|           user_name|           review_id|        review_title|      review_content|            img_link|        product_link|
+----------+--------------------+--------------------+----------------+------------+-------------------+------+------------+---------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+
|B07JW9H4J1|Wayona Nylon Brai...|Computers&Accesso...|           399.0|      1099.0|               64.0|   4.2|       

In [9]:
# Homework (Build a classification model + evaluate perfomance)
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.evaluation import BinaryClassificationEvaluator
from pyspark.ml.feature import VectorAssembler

df_log = df_clean.select("rating", "rating_count").na.drop()
df_log = df_log.withColumn("Label", (df_log.rating >= 4).cast("int"))

assembler = VectorAssembler(
    inputCols=["rating_count"],
    outputCol="Features"
)
df_log2 = assembler.transform(df_log)

train, test = df_log2.randomSplit([0.8, 0.2], seed=42)

lr = LogisticRegression(featuresCol="Features", labelCol="Label")
model = lr.fit(train)

pred = model.transform(test)

evaluator = BinaryClassificationEvaluator(labelCol="Label")

print("AUC:", evaluator.evaluate(pred))

AUC: 0.6607407407407407


In [12]:
# Homework (Explore hyperparameter tuning using crossvalidation)
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator
from pyspark.ml.evaluation import BinaryClassificationEvaluator

paramGrid = (ParamGridBuilder()
             .addGrid(lr.regParam, [0.0, 0.1, 0.01])
             .addGrid(lr.elasticNetParam, [0.0, 0.5, 1.0])
             .build())

evaluator = BinaryClassificationEvaluator(labelCol="Label")

cv = CrossValidator(estimator=lr,
                    estimatorParamMaps=paramGrid,
                    evaluator=evaluator,
                    numFolds=3)

cvModel = cv.fit(train)

print("Best Model Params:", cvModel.bestModel)
print("Best regParam:", cvModel.bestModel._java_obj.getRegParam())
print("Best elasticNetParam:", cvModel.bestModel._java_obj.getElasticNetParam())

Best Model Params: LogisticRegressionModel: uid=LogisticRegression_7b1495cf2fe1, numClasses=2, numFeatures=1
Best regParam: 0.0
Best elasticNetParam: 0.0
