In [0]:
from pyspark.sql import functions as F
from pyspark.sql.window import Window

# ------------------------------
# 1️⃣ Read the feature table
# ------------------------------
df_prints = spark.table("toner_regression_features")

# ------------------------------
# 2️⃣ Reconstruct cumulative print count
# ------------------------------
window_pc = Window.partitionBy("deviceId", "color").orderBy("timestamp")
df_with_prints = (
    df_prints
    .withColumn(
        "cumulative_print_count",
        F.sum("delta_print_count").over(window_pc)
    )
)

# ------------------------------
# 3️⃣ Prepare regression input (label + x)
# ------------------------------
df_math_prints = df_with_prints.select(
    "deviceId",
    "color",
    "cumulative_print_count",
    F.col("toner_pct_remaining").alias("typical"),
    "timestamp"
)

# ------------------------------
# 4️⃣ Compute regression stats per device & color (least squares)
# ------------------------------
df_stats_prints = (
    df_math_prints
    .groupBy("deviceId", "color")
    .agg(
        F.count("*").alias("n"),
        F.sum("cumulative_print_count").alias("sum_x"),
        F.sum("typical").alias("sum_y"),
        F.sum(F.col("cumulative_print_count") * F.col("typical")).alias("sum_xy"),
        F.sum(F.col("cumulative_print_count") ** 2).alias("sum_x2")
    )
)

# ------------------------------
# 5️⃣ Compute slope (m) and intercept (c)
# ------------------------------
df_regression_prints = (
    df_stats_prints
    .withColumn(
        "m",
        (
            F.col("n") * F.col("sum_xy") -
            F.col("sum_x") * F.col("sum_y")
        ) /
        (
            F.col("n") * F.col("sum_x2") -
            F.col("sum_x") ** 2
        )
    )
    .withColumn(
        "c",
        (F.col("sum_y") - F.col("m") * F.col("sum_x")) / F.col("n")
    )
)

# ------------------------------
# 6️⃣ Predict Print Count at toner empty
# ------------------------------
df_prediction_prints = (
    df_regression_prints
    .withColumn(
        "predicted_print_count",
        -F.col("c") / F.col("m")
    )
    .filter(
        (F.col("m") < 0) & 
        (F.col("predicted_print_count") > 0)
    )
)

# ------------------------------
# 7️⃣ Get latest toner_pct_remaining per device & color
# ------------------------------
window_last = Window.partitionBy("deviceId", "color").orderBy(F.col("timestamp").desc())
df_with_latest_toner = df_with_prints.withColumn(
    "latest_toner_pct_remaining",
    F.first("toner_pct_remaining").over(window_last)
)

# Select one row per device & color
df_latest_toner = df_with_latest_toner.select(
    "deviceId",
    "color",
    "latest_toner_pct_remaining"
).distinct()

# ------------------------------
# 8️⃣ Join predicted print count with latest toner %
# ------------------------------
df_final = df_prediction_prints.join(
    df_latest_toner,
    on=["deviceId", "color"],
    how="left"
).select(
    "deviceId",
    "color",
    "predicted_print_count",
    "latest_toner_pct_remaining"
)

# ------------------------------
# 9️⃣ Display final result
# ------------------------------
df_final.display()


deviceId,color,predicted_print_count,latest_toner_pct_remaining
mn=QlA1MEM1NQ==:sn=NDMwMDY5MzcwMA==,black,15355.558776606418,63.0
mn=QlA1MEM1NQ==:sn=NDMwMDY5MzcwMA==,cyan,18326.339534580333,69.0
mn=QlA1MEM1NQ==:sn=NDMwMDY5MzcwMA==,magenta,20830.184457505555,71.0
mn=QlA1MEM1NQ==:sn=NDMwMDY5MzcwMA==,yellow,21867.754384084627,71.0
mn=QlA1MEM1NQ==:sn=NDMwMDY5OTYwMA==,black,29210.774879009048,80.0
mn=QlA1MEM1NQ==:sn=NDMwMDY5OTYwMA==,cyan,15174.796875000002,74.0
mn=QlA1MEM1NQ==:sn=NDMwMDY5OTYwMA==,magenta,9601.023622047243,72.0
mn=QlA1MEM1NQ==:sn=NDMwMDY5OTYwMA==,yellow,9231.731343283584,71.0
mn=QlA1MEM1NQ==:sn=NDMwMDYwOTcwMA==,black,22803.515427375347,46.0
mn=QlA1MEM1NQ==:sn=NDMwMDYwOTcwMA==,cyan,35255.33241111863,17.0


In [0]:
from pyspark.sql import functions as F
from pyspark.sql.window import Window
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.regression import LinearRegression
from pyspark.sql.types import StructType, StructField, StringType, DoubleType

# ------------------------------
# 1️⃣ Read the feature table
# ------------------------------
df1 = spark.table("toner_regression_features")

# ------------------------------
# 2️⃣ Reconstruct cumulative print count
# ------------------------------
window_pc = Window.partitionBy("deviceId", "color").orderBy("timestamp")
df_pc = (
    df1
    .withColumn(
        "cumulative_print_count",
        F.sum("delta_print_count").over(window_pc)
    )
)

# ------------------------------
# 3️⃣ Prepare ML input (features + label)
# ------------------------------
df_ml_base = df_pc.select(
    "deviceId",
    "color",
    F.col("toner_pct_remaining").alias("label"),
    "cumulative_print_count",
    "timestamp"  # keep timestamp for getting latest toner
)

assembler1 = VectorAssembler(
    inputCols=["cumulative_print_count"],
    outputCol="features"
)

df_ml1 = assembler1.transform(df_ml_base).select(
    "deviceId",
    "color",
    "label",
    "features",
    "timestamp"
)

# ------------------------------
# 4️⃣ Train Linear Regression per device & color
# ------------------------------
results = []

pairs = df_ml1.select("deviceId", "color").distinct().collect()

for r in pairs:
    device = r["deviceId"]
    color = r["color"]

    df_group = df_ml1.filter(
        (F.col("deviceId") == device) &
        (F.col("color") == color)
    )

    if df_group.count() < 2:  # need at least 2 points
        continue

    lr = LinearRegression(
        featuresCol="features",
        labelCol="label",
        fitIntercept=True
    )

    model = lr.fit(df_group)

    m = model.coefficients[0]
    c = model.intercept

    if m >= 0:  # only consider depletion models
        continue

    predicted_print_count = -c / m

    results.append((device, color, m, c, predicted_print_count))

# ------------------------------
# 5️⃣ Create prediction DataFrame
# ------------------------------
schema1 = StructType([
    StructField("deviceId", StringType(), True),
    StructField("color", StringType(), True),
    StructField("slope_m", DoubleType(), True),
    StructField("intercept_c", DoubleType(), True),
    StructField("predicted_print_count", DoubleType(), True)
])

df_predictions = spark.createDataFrame(results, schema1)

# ------------------------------
# 6️⃣ Get latest toner_pct_remaining per device & color
# ------------------------------
window_last = Window.partitionBy("deviceId", "color").orderBy(F.col("timestamp").desc())

df_latest_toner = df_pc.withColumn(
    "latest_toner_pct_remaining",
    F.first("toner_pct_remaining").over(window_last)
).select(
    "deviceId", "color", "latest_toner_pct_remaining"
).distinct()

# ------------------------------
# 7️⃣ Join predictions with latest toner %
# ------------------------------
df_final = df_predictions.join(
    df_latest_toner,
    on=["deviceId", "color"],
    how="left"
).select(
    "deviceId",
    "color",
    "predicted_print_count",
    "latest_toner_pct_remaining"
)

# ------------------------------
# 8️⃣ Display final results
# ------------------------------
df_final.display()


deviceId,color,predicted_print_count,latest_toner_pct_remaining
mn=QlA1MEM1NQ==:sn=NDMwMDY5MzcwMA==,black,15355.55877660641,63.0
mn=QlA1MEM1NQ==:sn=NDMwMDY5MzcwMA==,cyan,18326.33953458036,69.0
mn=QlA1MEM1NQ==:sn=NDMwMDY5MzcwMA==,magenta,20830.184457505515,71.0
mn=QlA1MEM1NQ==:sn=NDMwMDY5MzcwMA==,yellow,21867.7543840846,71.0
mn=QlA1MEM1NQ==:sn=NDMwMDY5OTYwMA==,black,29210.77487900894,80.0
mn=QlA1MEM1NQ==:sn=NDMwMDY5OTYwMA==,cyan,15174.796874999953,74.0
mn=QlA1MEM1NQ==:sn=NDMwMDY5OTYwMA==,magenta,9601.023622047393,72.0
mn=QlA1MEM1NQ==:sn=NDMwMDY5OTYwMA==,yellow,9231.731343283407,71.0
mn=QlA1MEM1NQ==:sn=NDMwMDYwOTcwMA==,black,22803.51542737552,46.0
mn=QlA1MEM1NQ==:sn=NDMwMDYwOTcwMA==,cyan,35255.332411118616,17.0
