In [0]:
from pyspark.sql.functions import row_number, col, to_timestamp, min as spark_min, unix_timestamp, expr, from_unixtime
from pyspark.sql.window import Window

from pyspark.sql.types import (
    StructType, StructField,
    StringType, LongType, IntegerType,
    BooleanType, ArrayType, TimestampType, DoubleType
)

from sklearn.linear_model import LinearRegression
import numpy as np, pandas as pd

In [0]:
toner_usage_schema = StructType([

    StructField("deviceId", StringType(), True),
    StructField("dealerId", StringType(), True),
    StructField("L1DealerId", StringType(), True),
    StructField("L2DealerId", StringType(), True),
    StructField("L3DealerId", StringType(), True),
    StructField("divisionId", StringType(), True),
    StructField("servicingDealerId", StringType(), True),
    StructField("customerId", StringType(), True),

    StructField("modelName", StringType(), True),
    StructField("serialNumber", StringType(), True),

    StructField("timestamp", StringType(), True),
    StructField("lastSuppliesUpdate", StringType(), True),

    StructField("relatedGroupId", StringType(), True),
    StructField("type", StringType(), True),
    StructField("description", StringType(), True),
    StructField("state", StringType(), True),

    StructField("color", StringType(), True),
    StructField("typical", StringType(), True),
    StructField("unit", StringType(), True),
    StructField("capacity", StringType(), True),

    StructField("printCount", IntegerType(), True),
    StructField("tonerNumber", IntegerType(), True),

    StructField("tagIds", StringType(), True),
    StructField("forecastingList", StringType(), True)
])

df = spark.read.format('csv').schema(toner_usage_schema).option('header', True).load('/Volumes/workspace/default/forecast/Toner Usage.csv')


In [0]:
df_base = (
    df
    .select(
        col("deviceId"),
        col("color"),
        to_timestamp(col("timestamp")).alias("timestamp"),
        col("typical").cast("double").alias("typical"),
        col("printCount").cast("int").alias("printCount")
    )
    .dropna(subset=["deviceId", "color", "timestamp", "typical"])
)

df_base = df_base.filter(col("typical") > 0)

df_base.orderBy("deviceId", "color", "timestamp").show(10, truncate=False)
df_base.printSchema()

In [0]:
w = Window.partitionBy("deviceId", "color")

df_time = df_base \
    .withColumn("t0", spark_min("timestamp").over(w)) \
    .withColumn(
        "minutesSinceStart",
        (unix_timestamp(col("timestamp")) - unix_timestamp(col("t0"))) / 60
    )

df_time.select(
    "deviceId",
    "color",
    "timestamp",
    "minutesSinceStart",
    "typical",
    "printCount"
).display()


In [0]:
from pyspark.ml.feature import VectorAssembler

df_lr = df_time.select(
    "deviceId",
    "color",
    "minutesSinceStart",
    "typical",
    "printCount",
    "t0"
)

assembler = VectorAssembler(
    inputCols=["typical"],
    outputCol="features"
)

df_lr = assembler.transform(df_lr)

df_lr.select(
    "deviceId",
    "color",
    "minutesSinceStart",
    "features",
    "typical"
).display()


In [0]:
from pyspark.sql.types import StructType, StructField, StringType, DoubleType
from sklearn.linear_model import LinearRegression
import pandas as pd

schema = StructType([
    StructField("deviceId", StringType()),
    StructField("color", StringType()),
    StructField("slope", DoubleType()),
    StructField("intercept", DoubleType())
])

def forecast_toner(pdf: pd.DataFrame) -> pd.DataFrame:
    if len(pdf) < 2:
        return pd.DataFrame([], columns=["deviceId", "color", "slope", "intercept"])
    X = pdf[["minutesSinceStart"]].values
    y = pdf["typical"].values
    lr = LinearRegression()
    lr.fit(X, y)
    return pd.DataFrame([{
        "deviceId": pdf["deviceId"].iloc[0],
        "color": pdf["color"].iloc[0],
        "slope": lr.coef_[0],
        "intercept": lr.intercept_
    }])

final_predictions_df = (
    df_lr
    .groupBy("deviceId", "color")
    .applyInPandas(forecast_toner, schema)
    .sort("deviceId", "color")
)

display(final_predictions_df)

In [0]:
from pyspark.sql.functions import expr

# Add a column for predicted minutes until typical reaches zero
final_predictions_df = final_predictions_df.withColumn(
    "minutesUntilDepletion",
    -final_predictions_df["intercept"] / final_predictions_df["slope"]
)

display(final_predictions_df.select(
    "deviceId",
    "color",
    "slope",
    "intercept",
    "minutesUntilDepletion"
))

In [0]:
from pyspark.sql.functions import col, min as spark_min, unix_timestamp, from_unixtime

# Replace "created_at" with your actual timestamp column name
df_start = (
    df_lr
    .groupBy("deviceId", "color")
    .agg(
        spark_min("t0").alias("startTimestamp")
    )
)

df_forecast = (
    final_predictions_df
    .join(df_start, ["deviceId", "color"], "inner")
    .withColumn(
        "predictedDepletionTimestamp",
        from_unixtime(
            unix_timestamp(col("startTimestamp")) + col("minutesUntilDepletion") * 60
        )
    )
)

display(df_forecast.select(
    "deviceId",
    "color",
    "minutesUntilDepletion",
    "predictedDepletionTimestamp"
))

In [0]:
from pyspark.sql.types import StructType, StructField, StringType, DoubleType
from sklearn.linear_model import LinearRegression
import pandas as pd

# Schema for regression parameters
printcount_schema = StructType([
    StructField("deviceId", StringType()),
    StructField("color", StringType()),
    StructField("slope", DoubleType()),
    StructField("intercept", DoubleType())
])

def forecast_printcount(pdf: pd.DataFrame) -> pd.DataFrame:
    if len(pdf) < 2:
        return pd.DataFrame([], columns=["deviceId", "color", "slope", "intercept"])
    X = pdf[["minutesSinceStart"]].values
    y = pdf["printCount"].values
    lr = LinearRegression()
    lr.fit(X, y)
    return pd.DataFrame([{
        "deviceId": pdf["deviceId"].iloc[0],
        "color": pdf["color"].iloc[0],
        "slope": lr.coef_[0],
        "intercept": lr.intercept_
    }])

# Fit regression per device and color
final_printcount_df = (
    df_lr
    .groupBy("deviceId", "color")
    .applyInPandas(forecast_printcount, printcount_schema)
    .sort("deviceId", "color")
)

display(final_printcount_df)

# Join with final_predictions_df to get minutesUntilDepletion
df_remaining = (
    final_printcount_df
    .join(final_predictions_df.select("deviceId", "color", "minutesUntilDepletion"), ["deviceId", "color"], "inner")
    .withColumn(
        "predictedRemainingPrintCount",
        final_printcount_df["intercept"] + final_printcount_df["slope"] * col("minutesUntilDepletion")
    )
)

display(df_remaining.select(
    "deviceId",
    "color",
    "minutesUntilDepletion",
    "predictedRemainingPrintCount"
))

In [0]:
from pyspark.sql.window import Window
from pyspark.sql.functions import row_number, col

final_table = (
    df_base
    .join(
        df_forecast.select(
            "deviceId",
            "color",
            "predictedDepletionTimestamp"
        ),
        ["deviceId", "color"],
        "left"
    )
    .join(
        df_remaining.select(
            "deviceId",
            "color",
            "predictedRemainingPrintCount"
        ),
        ["deviceId", "color"],
        "left"
    )
)

w = Window.partitionBy("deviceId", "color").orderBy(col("timestamp").desc())

final_table_one_row = (
    final_table
    .withColumn("rn", row_number().over(w))
    .filter(col("rn") == 1)
    .drop("rn")
)

display(
    final_table_one_row.select(
        "deviceId",
        "timestamp",
        "color",
        "typical",
        "printCount",
        "predictedDepletionTimestamp",
        "predictedRemainingPrintCount"
    )
)