In [0]:
N = 266667   

big_df_800k = df.crossJoin(
    spark.range(N).select(col("id").alias("rid"))
).drop("rid")

print("Rows:", big_df_800k.count())

bench_800k = run_benchmark(big_df_800k)
bench_800k


Rows: 800001


{'rows': 800001,
 'cols': 3,
 'unique_cities': 3,
 'avg_age': 20.333333333333332,
 'stats_sec': 1.3527014255523682,
 'prep_sec': 0.0005958080291748047,
 'lr_sec': 2.427687406539917,
 'kmeans_sec': 3.416335105895996,
 'classif_sec': 8.515571355819702,
 'total_sec': 15.71289610862732}

In [0]:
N = 133334   

big_df_400k = df.crossJoin(
    spark.range(N).select(col("id").alias("rid"))
).drop("rid")

print("Rows:", big_df_400k.count())

bench_400k = run_benchmark(big_df_400k)
bench_400k


Rows: 400002


{'rows': 400002,
 'cols': 3,
 'unique_cities': 3,
 'avg_age': 20.333333333333332,
 'stats_sec': 2.0248830318450928,
 'prep_sec': 0.0007207393646240234,
 'lr_sec': 2.488649606704712,
 'kmeans_sec': 3.353914976119995,
 'classif_sec': 7.114344120025635,
 'total_sec': 14.982517957687378}

In [0]:
N = 66667   

big_df_200k = df.crossJoin(
    spark.range(N).select(col("id").alias("rid"))
).drop("rid")

print("Rows:", big_df_200k.count())

bench_200k = run_benchmark(big_df_200k)
bench_200k


Rows: 200001


{'rows': 200001,
 'cols': 3,
 'unique_cities': 3,
 'avg_age': 20.333333333333332,
 'stats_sec': 1.405022144317627,
 'prep_sec': 0.0005502700805664062,
 'lr_sec': 2.5804924964904785,
 'kmeans_sec': 3.010709285736084,
 'classif_sec': 7.290122985839844,
 'total_sec': 14.2869131565094}

In [0]:
bench_result = run_benchmark(big_df)
bench_result


{'rows': 600000,
 'cols': 3,
 'unique_cities': 3,
 'avg_age': 20.333333333333332,
 'stats_sec': 1.4642035961151123,
 'prep_sec': 0.0005917549133300781,
 'lr_sec': 2.6159207820892334,
 'kmeans_sec': 3.531095504760742,
 'classif_sec': 8.11721420288086,
 'total_sec': 15.729032039642334}

In [0]:
import time
from pyspark.sql.functions import when, array
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.regression import LinearRegression
from pyspark.ml.clustering import KMeans
from pyspark.ml.classification import LogisticRegression

def run_benchmark(input_df):
    times = {}
    total_start = time.time()

    t0 = time.time()
    rows = input_df.count()
    cols = len(input_df.columns)
    unique_cities = input_df.select("city").distinct().count()
    avg_age = input_df.selectExpr("avg(age) as avg_age").collect()[0]["avg_age"]
    times["stats_sec"] = time.time() - t0

    t1 = time.time()
    assembler = VectorAssembler(inputCols=["age"], outputCol="features")
    ml_df = assembler.transform(input_df).select("features", "age", "name", "city")
    times["prep_sec"] = time.time() - t1

    t2 = time.time()
    lr_df = ml_df.select("features", col("age").cast("double").alias("label"))
    train_lr, test_lr = lr_df.randomSplit([0.8, 0.2], seed=42)
    lr = LinearRegression(featuresCol="features", labelCol="label")
    lr_model = lr.fit(train_lr)
    _ = lr_model.transform(test_lr).count()
    times["lr_sec"] = time.time() - t2

    t3 = time.time()
    kmeans = KMeans(k=2, seed=1, featuresCol="features")
    km_model = kmeans.fit(ml_df.select("features"))
    _ = km_model.transform(ml_df.select("features")).count()
    times["kmeans_sec"] = time.time() - t3

    t4 = time.time()
    cls_df = ml_df.select("features", when(col("age") >= 21, 1).otherwise(0).cast("double").alias("label"))
    train_c, test_c = cls_df.randomSplit([0.8, 0.2], seed=42)
    logreg = LogisticRegression(featuresCol="features", labelCol="label")
    cls_model = logreg.fit(train_c)
    _ = cls_model.transform(test_c).count()
    times["classif_sec"] = time.time() - t4

    times["total_sec"] = time.time() - total_start

    result = {
        "rows": rows,
        "cols": cols,
        "unique_cities": unique_cities,
        "avg_age": avg_age,
        **times
    }
    return result

bench_result = run_benchmark(big_df)
bench_result


{'rows': 600000,
 'cols': 3,
 'unique_cities': 3,
 'avg_age': 20.333333333333332,
 'stats_sec': 1.6856682300567627,
 'prep_sec': 0.0005981922149658203,
 'lr_sec': 3.949824094772339,
 'kmeans_sec': 5.010276794433594,
 'classif_sec': 13.1082124710083,
 'total_sec': 23.754586219787598}

In [0]:
from pyspark.sql.functions import monotonically_increasing_id, col

N = 200000

big_df = df.crossJoin(spark.range(N).select(col("id").alias("rid"))).drop("rid")

print("Rows in big_df:", big_df.count())
display(big_df.limit(5))


Rows in big_df: 600000


name,age,city
Ali,20.0,Gaza
Sara,22.0,Rafah
Omar,19.0,Khanyounis
Ali,20.0,Gaza
Sara,22.0,Rafah


In [0]:
from pyspark.sql.functions import array
from pyspark.ml.fpm import FPGrowth
import time

fp_df = df.select(
    array("name", "city").alias("items")
)

display(fp_df)

start = time.time()

fp = FPGrowth(
    itemsCol="items",
    minSupport=0.2,
    minConfidence=0.2
)

fp_model = fp.fit(fp_df)
fp_time = time.time() - start

itemsets = fp_model.freqItemsets
display(itemsets)

rules = fp_model.associationRules
display(rules)

print(f"FPGrowth Time: {fp_time:.4f} seconds")



items
"List(Ali, Gaza)"
"List(Sara, Rafah)"
"List(Omar, Khanyounis)"


[0;31m---------------------------------------------------------------------------[0m
[0;31mSparkException[0m                            Traceback (most recent call last)
File [0;32m<command-7214644775981576>, line 20[0m
[1;32m     12[0m start [38;5;241m=[39m time[38;5;241m.[39mtime()
[1;32m     14[0m fp [38;5;241m=[39m FPGrowth(
[1;32m     15[0m     itemsCol[38;5;241m=[39m[38;5;124m"[39m[38;5;124mitems[39m[38;5;124m"[39m,
[1;32m     16[0m     minSupport[38;5;241m=[39m[38;5;241m0.2[39m,
[1;32m     17[0m     minConfidence[38;5;241m=[39m[38;5;241m0.2[39m
[1;32m     18[0m )
[0;32m---> 20[0m fp_model [38;5;241m=[39m fp[38;5;241m.[39mfit(fp_df)
[1;32m     21[0m fp_time [38;5;241m=[39m time[38;5;241m.[39mtime() [38;5;241m-[39m start
[1;32m     23[0m [38;5;66;03m# Frequent itemsets[39;00m

File [0;32m/databricks/python_shell/lib/dbruntime/MLWorkloadsInstrumentation/_pyspark.py:30[0m, in [0;36m_create_patch_function.<locals>.patched_m

In [0]:
from pyspark.sql.functions import when
from pyspark.ml.classification import LogisticRegression
import time

cls_df = ml_df.withColumn(
    "label",
    when(ml_df.age >= 21, 1).otherwise(0).cast("double")
)

display(cls_df)

train, test = cls_df.select("features", "label").randomSplit([0.8, 0.2], seed=42)

start = time.time()

lr_cls = LogisticRegression(
    featuresCol="features",
    labelCol="label"
)

cls_model = lr_cls.fit(train)
cls_time = time.time() - start

predictions = cls_model.transform(test)

display(predictions)

print(f"Classification Time: {cls_time:.4f} seconds")


features,age,label
"{""type"":""1"",""size"":null,""indices"":null,""values"":[""20.0""]}",20.0,0.0
"{""type"":""1"",""size"":null,""indices"":null,""values"":[""22.0""]}",22.0,1.0
"{""type"":""1"",""size"":null,""indices"":null,""values"":[""19.0""]}",19.0,0.0


features,label,rawPrediction,probability,prediction
"{""type"":""1"",""size"":null,""indices"":null,""values"":[""22.0""]}",1.0,"{""type"":""1"",""size"":null,""indices"":null,""values"":[""Infinity"",""-Infinity""]}","{""type"":""1"",""size"":null,""indices"":null,""values"":[""1.0"",""0.0""]}",0.0


Classification Time: 1.3216 seconds


In [0]:
from pyspark.ml.clustering import KMeans
import time

start = time.time()

kmeans = KMeans(
    k=2,
    seed=1,
    featuresCol="features"
)

kmeans_model = kmeans.fit(ml_df)

kmeans_time = time.time() - start

clusters = kmeans_model.transform(ml_df)

display(clusters)

print(f"KMeans Time: {kmeans_time:.4f} seconds")


features,age,prediction
"{""type"":""1"",""size"":null,""indices"":null,""values"":[""20.0""]}",20.0,0
"{""type"":""1"",""size"":null,""indices"":null,""values"":[""22.0""]}",22.0,1
"{""type"":""1"",""size"":null,""indices"":null,""values"":[""19.0""]}",19.0,0


KMeans Time: 3.8737 seconds


In [0]:
from pyspark.ml.regression import LinearRegression
from pyspark.ml.evaluation import RegressionEvaluator
import time

lr_df = ml_df.withColumnRenamed("age", "label")

train, test = lr_df.randomSplit([0.8, 0.2], seed=42)

start = time.time()

lr = LinearRegression(featuresCol="features", labelCol="label")
lr_model = lr.fit(train)

lr_time = time.time() - start

predictions = lr_model.transform(test)

evaluator = RegressionEvaluator(
    labelCol="label",
    predictionCol="prediction",
    metricName="rmse"
)

rmse = evaluator.evaluate(predictions)

print(f"Linear Regression Time: {lr_time:.4f} seconds")
print(f"RMSE: {rmse}")


Linear Regression Time: 2.1464 seconds
RMSE: 0.0


In [0]:
from pyspark.ml.feature import VectorAssembler

assembler = VectorAssembler(
    inputCols=["age"],
    outputCol="features"
)

ml_df = assembler.transform(df).select("features", "age")
display(ml_df)


features,age
"{""type"":""1"",""size"":null,""indices"":null,""values"":[""20.0""]}",20.0
"{""type"":""1"",""size"":null,""indices"":null,""values"":[""22.0""]}",22.0
"{""type"":""1"",""size"":null,""indices"":null,""values"":[""19.0""]}",19.0


In [0]:
df.printSchema()
display(df.limit(5))


root
 |-- name: string (nullable = true)
 |-- age: double (nullable = true)
 |-- city: string (nullable = true)



name,age,city
Ali,20.0,Gaza
Sara,22.0,Rafah
Omar,19.0,Khanyounis


In [0]:
dbutils.fs.head(
    "/Volumes/workspace/default/datasets/results/aggregation_results.json",
    1000
)


'{\n    "algorithm": "TimeBasedAggregation",\n    "metrics": {\n        "average_age": 20.333333333333332,\n        "min_age": 19.0,\n        "max_age": 22.0\n    },\n    "execution_time_seconds": 1.3182177543640137\n}'

In [0]:
from pyspark.sql import functions as F
import json
import time

start_time = time.time()

path = "/Volumes/workspace/default/datasets/sample.csv"
df = spark.read.option("header", "true").csv(path)

df = df.withColumn("age", F.col("age").cast("double"))

aggregations = df.agg(
    F.avg("age").alias("average_age"),
    F.min("age").alias("min_age"),
    F.max("age").alias("max_age")
).collect()[0]

execution_time = time.time() - start_time

result_agg = {
    "algorithm": "TimeBasedAggregation",
    "metrics": {
        "average_age": aggregations["average_age"],
        "min_age": aggregations["min_age"],
        "max_age": aggregations["max_age"]
    },
    "execution_time_seconds": execution_time
}

# حفظ النتائج
out_dir = "/Volumes/workspace/default/datasets/results"
dbutils.fs.put(
    f"{out_dir}/aggregation_results.json",
    json.dumps(result_agg, indent=4),
    overwrite=True
)

result_agg


Wrote 208 bytes.


{'algorithm': 'TimeBasedAggregation',
 'metrics': {'average_age': 20.333333333333332,
  'min_age': 19.0,
  'max_age': 22.0},
 'execution_time_seconds': 5.431603670120239}

In [0]:
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.regression import DecisionTreeRegressor
from pyspark.sql import functions as F
import json
import time

start_time = time.time()

path = "/Volumes/workspace/default/datasets/sample.csv"
df = spark.read.option("header", "true").csv(path)

df = df.withColumn("age", F.col("age").cast("double"))

assembler = VectorAssembler(
    inputCols=["age"],
    outputCol="features"
)

data = assembler.transform(df).withColumnRenamed("age", "label")

dt = DecisionTreeRegressor(
    featuresCol="features",
    labelCol="label",
    maxDepth=3
)

model = dt.fit(data)

predictions = model.transform(data)

execution_time = time.time() - start_time

results = predictions.select(
    "name",
    F.col("prediction").alias("predicted_age"),
    "label"
).toPandas().to_dict(orient="records")

result_dt = {
    "algorithm": "DecisionTreeRegression",
    "results": results,
    "max_depth": 3,
    "execution_time_seconds": execution_time
}

out_dir = "/Volumes/workspace/default/datasets/results"
dbutils.fs.put(
    f"{out_dir}/decision_tree_results.json",
    json.dumps(result_dt, indent=4),
    overwrite=True
)

result_dt


Wrote 466 bytes.


{'algorithm': 'DecisionTreeRegression',
 'results': [{'name': 'Ali', 'predicted_age': 20.0, 'label': 20.0},
  {'name': 'Sara', 'predicted_age': 22.0, 'label': 22.0},
  {'name': 'Omar', 'predicted_age': 19.0, 'label': 19.0}],
 'max_depth': 3,
 'execution_time_seconds': 5.536059856414795}

In [0]:
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.regression import LinearRegression
from pyspark.sql import functions as F
import json
import time

start_time = time.time()

path = "/Volumes/workspace/default/datasets/sample.csv"
df = spark.read.option("header", "true").csv(path)

df = df.withColumn("age", F.col("age").cast("double"))

assembler = VectorAssembler(
    inputCols=["age"],
    outputCol="features"
)

data = assembler.transform(df).withColumnRenamed("age", "label")

lr = LinearRegression(featuresCol="features", labelCol="label")
model = lr.fit(data)

predictions = model.transform(data)

execution_time = time.time() - start_time

results = predictions.select(
    "name",
    F.col("prediction").alias("predicted_age"),
    "label"
).toPandas().to_dict(orient="records")

result_lr = {
    "algorithm": "LinearRegression",
    "results": results,
    "coefficients": model.coefficients.tolist(),
    "intercept": model.intercept,
    "execution_time_seconds": execution_time
}

out_dir = "/Volumes/workspace/default/datasets/results"
dbutils.fs.put(
    f"{out_dir}/linear_regression_results.json",
    json.dumps(result_lr, indent=4),
    overwrite=True
)

result_lr


Wrote 579 bytes.


{'algorithm': 'LinearRegression',
 'results': [{'name': 'Ali',
   'predicted_age': 19.999999999999982,
   'label': 20.0},
  {'name': 'Sara', 'predicted_age': 22.000000000000096, 'label': 22.0},
  {'name': 'Omar', 'predicted_age': 18.999999999999922, 'label': 19.0}],
 'coefficients': [1.0000000000000582],
 'intercept': -1.1821308240946304e-12,
 'execution_time_seconds': 2.10819935798645}

In [0]:
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.clustering import KMeans
from pyspark.sql import functions as F
import json
import time

start_time = time.time()

path = "/Volumes/workspace/default/datasets/sample.csv"
df = spark.read.option("header", "true").csv(path)

df = df.withColumn("age", F.col("age").cast("double"))

assembler = VectorAssembler(
    inputCols=["age"],
    outputCol="features"
)

features_df = assembler.transform(df)

kmeans = KMeans(
    k=2,
    seed=42,
    featuresCol="features"
)

model = kmeans.fit(features_df)

predictions = model.transform(features_df)

execution_time = time.time() - start_time

clusters = predictions.select("name", "age", "prediction").toPandas().to_dict(orient="records")

result_kmeans = {
    "algorithm": "KMeans",
    "k": 2,
    "clusters": clusters,
    "execution_time_seconds": execution_time
}

out_dir = "/Volumes/workspace/default/datasets/results"
dbutils.fs.put(
    f"{out_dir}/kmeans_results.json",
    json.dumps(result_kmeans, indent=4),
    overwrite=True
)

result_kmeans


Wrote 419 bytes.


{'algorithm': 'KMeans',
 'k': 2,
 'clusters': [{'name': 'Ali', 'age': 20.0, 'prediction': 1},
  {'name': 'Sara', 'age': 22.0, 'prediction': 0},
  {'name': 'Omar', 'age': 19.0, 'prediction': 1}],
 'execution_time_seconds': 6.665626764297485}

In [0]:
import json
import time
from pyspark.sql import functions as F

start_time = time.time()

path = "/Volumes/workspace/default/datasets/sample.csv"
df = spark.read.option("header", "true").csv(path)

rows = df.count()

cols = len(df.columns)

dtypes = {c: t for c, t in df.dtypes}

null_percent = {}
for c in df.columns:
    nulls = df.filter(F.col(c).isNull() | (F.col(c) == "")).count()
    null_percent[c] = (nulls / rows) * 100 if rows else 0

unique_counts = {c: df.select(c).distinct().count() for c in df.columns}

execution_time = time.time() - start_time

result_stats = {
    "dataset_path": path,
    "statistics": {
        "rows": rows,
        "columns": cols,
        "data_types": dtypes,
        "null_percentages": null_percent,
        "unique_value_counts": unique_counts
    },
    "execution_time_seconds": execution_time
}

out_dir = "/Volumes/workspace/default/datasets/results"
dbutils.fs.mkdirs(out_dir)
dbutils.fs.put(
    f"{out_dir}/descriptive_stats.json",
    json.dumps(result_stats, indent=4),
    overwrite=True
)

result_stats


Wrote 535 bytes.


{'dataset_path': '/Volumes/workspace/default/datasets/sample.csv',
 'statistics': {'rows': 3,
  'columns': 3,
  'data_types': {'name': 'string', 'age': 'string', 'city': 'string'},
  'null_percentages': {'name': 0.0, 'age': 0.0, 'city': 0.0},
  'unique_value_counts': {'name': 3, 'age': 3, 'city': 3}},
 'execution_time_seconds': 4.42710542678833}

In [0]:
import json
import time

start_time = time.time()

df = spark.read.option("header", "true").csv(
    "/Volumes/workspace/default/datasets/sample.csv"
)

stats = {
    "rows": df.count(),
    "columns": len(df.columns),
    "column_names": df.columns
}

execution_time = time.time() - start_time

result = {
    "statistics": stats,
    "execution_time_seconds": execution_time
}

dbutils.fs.mkdirs("/Volumes/workspace/default/datasets/results")

dbutils.fs.put(
    "/Volumes/workspace/default/datasets/results/output.json",
    json.dumps(result, indent=4),
    overwrite=True
)

result


Wrote 214 bytes.


{'statistics': {'rows': 3,
  'columns': 3,
  'column_names': ['name', 'age', 'city']},
 'execution_time_seconds': 1.3308570384979248}