In [0]:
dbutils.widgets.text("table_name", "sales_dataset_big", "Table Name")

dbutils.widgets.dropdown(
    "ml_type",
    "linear_regression",
    ["linear_regression", "kmeans", "decision_tree", "classification"],
    "ML Job"
)


In [0]:
table_name = dbutils.widgets.get("table_name")
ml_type = dbutils.widgets.get("ml_type")

print("Selected table:", table_name)
print("Selected ML job:", ml_type)


Selected table: sales_dataset_big
Selected ML job: decision_tree


In [0]:
df = spark.table("sales_dataset_big")
display(df)


order_id,product,category,price,quantity,region,total_sales
1,Desk,Electronics,245,9,North America,2205
2,Chair,Furniture,831,5,Europe,4155
3,Desk,Furniture,1059,5,South America,5295
4,Phone,Furniture,248,1,Africa,248
5,Tablet,Furniture,416,5,North America,2080
6,Monitor,Accessories,974,3,Africa,2922
7,Desk,Furniture,560,2,Europe,1120
8,Chair,Electronics,395,8,South America,3160
9,Laptop,Electronics,1440,3,Africa,4320
10,Desk,Accessories,461,2,Africa,922


In [0]:
# num_rows
num_rows = df.count()

# num_columns
num_columns = len(df.columns)

print("Number of rows:", num_rows)
print("Number of columns:", num_columns)
print("Column names:", df.columns)


Number of rows: 100000
Number of columns: 7
Column names: ['order_id', 'product', 'category', 'price', 'quantity', 'region', 'total_sales']


In [0]:
from pyspark.sql.functions import col, sum

missing_values = df.select(
    [sum(col(c).isNull().cast("int")).alias(c) for c in df.columns]
)

missing_values.show()


+--------+-------+--------+-----+--------+------+-----------+
|order_id|product|category|price|quantity|region|total_sales|
+--------+-------+--------+-----+--------+------+-----------+
|       0|      0|       0|    0|       0|     0|          0|
+--------+-------+--------+-----+--------+------+-----------+



In [0]:
min_max_df = df.select(
    "price", "quantity", "total_sales"
).summary("min", "max")

min_max_df.show()


+-------+-----+--------+-----------+
|summary|price|quantity|total_sales|
+-------+-----+--------+-----------+
|    min|  100|       1|        100|
|    max| 1499|       9|      13491|
+-------+-----+--------+-----------+



In [0]:
from pyspark.sql.functions import sum

df.groupBy("category") \
  .agg(sum("total_sales").alias("total_sales_by_category")) \
  .show()


+-----------+-----------------------+
|   category|total_sales_by_category|
+-----------+-----------------------+
|Electronics|              134172454|
|  Furniture|              134193617|
|Accessories|              131578753|
+-----------+-----------------------+



In [0]:
from pyspark.ml.feature import VectorAssembler

assembler = VectorAssembler(
    inputCols=["price", "quantity"],
    outputCol="features"
)

lr_data = assembler.transform(df).select("features", "total_sales")


In [0]:
train_data, test_data = lr_data.randomSplit([0.8, 0.2], seed=42)


In [0]:
from pyspark.ml.regression import LinearRegression

lr = LinearRegression(
    featuresCol="features",
    labelCol="total_sales"
)

lr_model = lr.fit(train_data)


In [0]:
print("Coefficients:", lr_model.coefficients)
print("Intercept:", lr_model.intercept)


Coefficients: [5.0090144973574064,799.7085988593947]
Intercept: -4006.4243574653883


In [0]:
summary = lr_model.summary
print("RMSE:", summary.rootMeanSquaredError)
print("R2:", summary.r2)


RMSE: 1050.8074369209826
R2: 0.8839056387899269


In [0]:
assembler_km = VectorAssembler(
    inputCols=["price", "quantity", "total_sales"],
    outputCol="features"
)

km_data = assembler_km.transform(df).select("features")


In [0]:
from pyspark.ml.clustering import KMeans

kmeans = KMeans(
    k=3,
    seed=1,
    featuresCol="features"
)

kmeans_model = kmeans.fit(km_data)


In [0]:
centers = kmeans_model.clusterCenters()

for i, center in enumerate(centers):
    print(f"Cluster {i} center:", center)


Cluster 0 center: [ 582.32016805    3.57996941 1600.87777584]
Cluster 1 center: [1.23079695e+03 7.73657000e+00 9.43606002e+03]
Cluster 2 center: [ 929.44462422    5.85717458 5027.37438533]


In [0]:
assembler_dt = VectorAssembler(
    inputCols=["price", "quantity"],
    outputCol="features"
)

dt_data = assembler_dt.transform(df).select("features", "total_sales")


In [0]:
train_dt, test_dt = dt_data.randomSplit([0.8, 0.2], seed=42)


In [0]:
from pyspark.ml.regression import DecisionTreeRegressor

dt = DecisionTreeRegressor(
    featuresCol="features",
    labelCol="total_sales",
    maxDepth=5
)

dt_model = dt.fit(train_dt)


In [0]:
dt_predictions = dt_model.transform(test_dt)
dt_predictions.select("total_sales", "prediction").show(5)


+-----------+-----------------+
|total_sales|       prediction|
+-----------+-----------------+
|        100|287.0758649437266|
|        200|575.8005965061781|
|        200|575.8005965061781|
|        300|700.2373660030628|
|        400|700.2373660030628|
+-----------+-----------------+
only showing top 5 rows


In [0]:
from pyspark.ml.evaluation import RegressionEvaluator

evaluator = RegressionEvaluator(
    labelCol="total_sales",
    predictionCol="prediction",
    metricName="rmse"
)

rmse_dt = evaluator.evaluate(dt_predictions)
print("Decision Tree RMSE:", rmse_dt)


Decision Tree RMSE: 505.6354850765355


In [0]:
from pyspark.ml.feature import StringIndexer, VectorAssembler
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.evaluation import MulticlassClassificationEvaluator


In [0]:
from pyspark.ml.feature import StringIndexer

indexer = StringIndexer(
    inputCol="category",
    outputCol="label"
)

df_labeled = indexer.fit(df).transform(df)


In [0]:
assembler = VectorAssembler(
    inputCols=["price", "quantity", "total_sales"],
    outputCol="features"
)

final_data = assembler.transform(df_labeled).select("features", "label")


In [0]:
train_data, test_data = final_data.randomSplit([0.8, 0.2], seed=42)


In [0]:
lr_classifier = LogisticRegression(
    featuresCol="features",
    labelCol="label"
)

lr_model = lr_classifier.fit(train_data)


In [0]:
predictions = lr_model.transform(test_data)
predictions.select("label", "prediction").show(5)



+-----+----------+
|label|prediction|
+-----+----------+
|  1.0|       2.0|
|  1.0|       2.0|
|  1.0|       2.0|
|  0.0|       2.0|
|  0.0|       2.0|
+-----+----------+
only showing top 5 rows


In [0]:
evaluator = MulticlassClassificationEvaluator(
    labelCol="label",
    predictionCol="prediction",
    metricName="accuracy"
)

accuracy = evaluator.evaluate(predictions)
print("Classification Accuracy:", accuracy)


Classification Accuracy: 0.33733021543360364


In [0]:
basic_stats = spark.createDataFrame(
    [
        ("basic_statistics", "Number of rows", str(num_rows)),
        ("basic_statistics", "Number of columns", str(num_columns))
    ],
    ["result_type", "metric", "value"]
)


In [0]:
missing_long = missing_values.selectExpr(
    "'missing_values' as result_type",
    "stack(7, " +
    "'order_id', order_id, " +
    "'product', product, " +
    "'category', category, " +
    "'price', price, " +
    "'quantity', quantity, " +
    "'region', region, " +
    "'total_sales', total_sales) as (metric, value)"
)


In [0]:
min_max_long = min_max_df.selectExpr(
    "'min_max' as result_type",
    "summary as metric",
    "concat('price=', price, ', quantity=', quantity, ', total_sales=', total_sales) as value"
)


In [0]:
lr_results = predictions.selectExpr(
    "'linear_regression' as result_type",
    "cast(prediction as string) as metric",
    "cast(prediction as string) as value"
)


In [0]:
dt_results = dt_predictions.selectExpr(
    "'decision_tree' as result_type",
    "cast(prediction as string) as metric",
    "cast(prediction as string) as value"
)


In [0]:
class_results = predictions.selectExpr(
    "'classification' as result_type",
    "cast(label as string) as metric",
    "cast(prediction as string) as value"
)




In [0]:
kmeans_rows = []
for i, center in enumerate(centers):
    kmeans_rows.append(
        ("kmeans", f"cluster_{i}", str(center))
    )

kmeans_df = spark.createDataFrame(
    kmeans_rows,
    ["result_type", "metric", "value"]
)


In [0]:
def clean(big_df):
    return big_df.selectExpr(
        "cast(result_type as string) as result_type",
        "cast(metric as string) as metric",
        "cast(value as string) as value"
    )



In [0]:
final_results = (
    clean(basic_stats)
    .unionByName(clean(missing_long))
    .unionByName(clean(min_max_long))
    .unionByName(clean(lr_results))
    .unionByName(clean(dt_results))
    .unionByName(clean(class_results))
    .unionByName(clean(kmeans_df))
)


In [0]:
final_results_str = final_results.selectExpr(
    "cast(result_type as string) as result_type",
    "cast(metric as string) as metric",
    "cast(value as string) as value"
)

In [0]:
display(final_results_str)


result_type,metric,value
basic_statistics,Number of rows,100000
basic_statistics,Number of columns,7
missing_values,order_id,0
missing_values,product,0
missing_values,category,0
missing_values,price,0
missing_values,quantity,0
missing_values,region,0
missing_values,total_sales,0
min_max,min,"price=100, quantity=1, total_sales=100"


In [0]:
import time
from pyspark.sql import Row

#numOfCusters (simulation)
cluster_sizes = [1, 2, 4, 8]

results = []
baseline_time = None  # T1

for n in cluster_sizes:
   
    spark.conf.set("spark.sql.shuffle.partitions", str(n))
    
    start = time.time()
    
    df.count()  
    
    end = time.time()
    
    exec_time = end - start
    
    if n == 1:
        baseline_time = exec_time
    
    speedup = baseline_time / exec_time
    efficiency = speedup / n
    
    results.append(
        Row(
            cluster_size=n,
            execution_time_sec=round(exec_time, 4),
            speedup=round(speedup, 4),
            efficiency=round(efficiency, 4)
        )
    )

# toDataFrame
performance_df = spark.createDataFrame(results)

# display the final result
display(performance_df)


cluster_size,execution_time_sec,speedup,efficiency
1,0.3726,1.0,1.0
2,0.3285,1.1341,0.5671
4,0.2454,1.5184,0.3796
8,0.2794,1.3335,0.1667
