In [0]:
# Widgets
try: dbutils.widgets.removeAll()
except: pass

dbutils.widgets.text("catalog", "workspace", "Catalog")
dbutils.widgets.text("schema",  "default",   "Schema")
dbutils.widgets.text("table",   "advertising","Table")

dbutils.widgets.text("num_partitions", "12", "NPART (repartition by TV_bucket)")
dbutils.widgets.text("radio_threshold","20", "Radio >= threshold → 1")
dbutils.widgets.dropdown("join_mode", "broadcast", ["broadcast","shuffle"], "Join mode")

print("Set widgets, run next cell.")


Set widgets, run next cell.


In [0]:
from pyspark.sql import functions as F

CAT=dbutils.widgets.get("catalog"); SCH=dbutils.widgets.get("schema"); TAB=dbutils.widgets.get("table")
NPART=int(dbutils.widgets.get("num_partitions")); RAD=float(dbutils.widgets.get("radio_threshold"))
JOIN_MODE=dbutils.widgets.get("join_mode")
full=f"{CAT}.{SCH}.{TAB}"

df = spark.table(full).select("TV","radio","newspaper","sales")
ads_feat = (df
  .withColumn("TV_bucket", F.floor(F.col("TV")/10).cast("int"))
  .withColumn("is_high_radio", (F.col("radio")>=F.lit(RAD)).cast("int"))
)
ads_feat.groupBy("TV_bucket").count().orderBy("TV_bucket").show(10)
ads_feat.groupBy("is_high_radio").count().show()

print("Rows:", df.count(), "| Columns:", df.columns)
display(ads_feat.limit(5))


+---------+-----+
|TV_bucket|count|
+---------+-----+
|        0|    8|
|        1|   11|
|        2|    7|
|        3|    5|
|        4|    5|
|        5|    5|
|        6|    7|
|        7|   10|
|        8|    5|
|        9|    7|
+---------+-----+
only showing top 10 rows
+-------------+-----+
|is_high_radio|count|
+-------------+-----+
|            1|  112|
|            0|   88|
+-------------+-----+

Rows: 200 | Columns: ['TV', 'radio', 'newspaper', 'sales']


TV,radio,newspaper,sales,TV_bucket,is_high_radio
230.1,37.8,69.2,22.1,23,1
44.5,39.3,45.1,10.4,4,1
17.2,45.9,69.3,9.3,1,1
151.5,41.3,58.5,18.5,15,1
180.8,10.8,58.4,12.9,18,0


In [0]:
from pyspark.sql import functions as F
from time import perf_counter

# Key-based repartition (forces a shuffle by TV_bucket)
ads_rep = ads_feat.repartition(NPART, F.col("TV_bucket"))

# Materialise to make timings meaningful
t0 = perf_counter()
c = ads_rep.count()
dt = perf_counter() - t0

# Count *non-empty* partitions using spark_partition_id()
sizes = (ads_rep
         .withColumn("pid", F.spark_partition_id())
         .groupBy("pid").count()
         .orderBy("pid"))

non_empty = sizes.count()
empties   = max(NPART - non_empty, 0)

display(sizes)

print(f"Count = {c} (took {dt:.3f}s)")
print(f"Requested partitions (NPART) = {NPART}")
print(f"Non-empty partitions observed = {non_empty}")
print(f"Empty partitions = {empties}")



pid,count
0,16
1,27
2,8
3,55
4,9
5,17
6,33
7,11
8,6
9,14


Count = 200 (took 0.392s)
Requested partitions (NPART) = 12
Non-empty partitions observed = 11
Empty partitions = 1


In [0]:
from pyspark.sql import functions as F

# Tiny dimension
labels = [(i, f"{i*10}-{i*10+9}") for i in range(0, 20)]
label_df = spark.createDataFrame(labels, ["TV_bucket","bucket_label"])

if JOIN_MODE=="broadcast":
    joined = ads_feat.join(F.broadcast(label_df), "TV_bucket", "left")
    mode_note = "BroadcastHashJoin expected"
else:
    # Make RHS “too big to broadcast” so AQE can’t auto-broadcast it
    label_df_big = label_df.select(
        "TV_bucket","bucket_label",
        F.rpad(F.lit("x"), 1024*1024, "x").alias("pad")  # ~1MB/row → ~20MB
    )
    joined = (ads_feat
              .join(label_df_big.hint("shuffle_hash"), "TV_bucket", "left")
              .withColumn("pad_len", F.length("pad")))
    mode_note = "Shuffle join expected (look for ShuffleExchange on both sides)"
    
def explain_to_str(df, mode="extended"):
    import io, contextlib
    buf=io.StringIO()
    with contextlib.redirect_stdout(buf): df.explain(mode=mode)
    return buf.getvalue()

plan = explain_to_str(joined, "extended")
from time import perf_counter
t0=perf_counter(); n=joined.count(); dt=perf_counter()-t0

print(mode_note, "| count=", n, f"| time={dt:.3f}s")
print(plan)  # In UI, collapse if too long
display(joined.drop("pad","pad_len").orderBy("TV_bucket").limit(10))


Shuffle join expected (look for ShuffleExchange on both sides) | count= 200 | time=0.522s
== Parsed Logical Plan ==
Project [TV_bucket#17470, TV#17464, radio#17465, newspaper#17466, sales#17467, is_high_radio#17472, bucket_label#17484, pad#17482, length(pad#17482) AS pad_len#17486]
+- Project [TV_bucket#17470, TV#17464, radio#17465, newspaper#17466, sales#17467, is_high_radio#17472, bucket_label#17484, pad#17482]
   +- Join LeftOuter, (cast(TV_bucket#17470 as bigint) = TV_bucket#17483L)
      :- Project [TV#17464, radio#17465, newspaper#17466, sales#17467, TV_bucket#17470, cast((radio#17465 >= 20.0) as int) AS is_high_radio#17472]
      :  +- Project [TV#17464, radio#17465, newspaper#17466, sales#17467, cast(FLOOR((TV#17464 / cast(10 as double))) as int) AS TV_bucket#17470]
      :     +- Project [TV#17464, radio#17465, newspaper#17466, sales#17467]
      :        +- SubqueryAlias workspace.default.advertising
      :           +- Relation workspace.default.advertising[TV#17464,radio#1

TV_bucket,TV,radio,newspaper,sales,is_high_radio,bucket_label
0,4.1,11.6,5.7,3.2,0,0-9
0,7.8,38.9,50.6,6.6,1,0-9
0,5.4,29.9,9.4,5.3,1,0-9
0,8.4,27.2,2.1,5.7,1,0-9
0,8.6,2.1,1.0,4.8,0,0-9
0,0.7,39.6,8.7,1.6,1,0-9
0,8.7,48.9,75.0,7.2,1,0-9
0,7.3,28.1,41.4,5.5,1,0-9
1,17.2,45.9,69.3,9.3,1,10-19
1,13.2,15.9,49.6,5.6,0,10-19


In [0]:
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.regression import LinearRegression
from pyspark.ml.evaluation import RegressionEvaluator

assembler = VectorAssembler(inputCols=["TV","radio","newspaper"], outputCol="features")
ds = assembler.transform(df).select("features", F.col("sales").alias("label"))

train, test = ds.randomSplit([0.6, 0.4], seed=6012)
lr = LinearRegression(featuresCol="features", labelCol="label", regParam=0.0, elasticNetParam=0.0)
model = lr.fit(train)
pred = model.transform(test)

rmse = RegressionEvaluator(labelCol="label", predictionCol="prediction", metricName="rmse").evaluate(pred)
r2   = RegressionEvaluator(labelCol="label", predictionCol="prediction", metricName="r2").evaluate(pred)

print(f"LR → RMSE={rmse:.3f}, R^2={r2:.3f}")
print("Coefficients [TV, radio, newspaper] =", list(map(float, model.coefficients)))
print("Intercept =", float(model.intercept))
display(pred.select("label","prediction").limit(10))


LR → RMSE=1.871, R^2=0.873
Coefficients [TV, radio, newspaper] = [0.045291614966318346, 0.1825709912779116, -0.0024798323708939756]
Intercept = 3.2574182443683912


label,prediction
1.6,10.497359087823336
5.5,8.61562682837682
5.7,8.598591124865782
4.8,4.027845382391449
7.3,10.412111294464848
8.7,11.779501842185988
8.0,10.879243045103657
6.6,7.001911173290617
7.6,7.772653672089056
9.5,11.51408980629538
