In [0]:
#reading row data
df_raw = spark.read.csv(
    "/Volumes/workspace/default/volume/Toner Usage.csv",
    header=True,
    inferSchema=True
)
df_raw.show()
df_raw.printSchema()

+--------------------+--------+----------+----------+----------+----------+-----------------+----------+--------------+------------+--------------------+--------------------+--------------------+----+-------------+-----+-------+-------+-------+--------+----------+-----------+--------------------+---------------+
|            deviceId|dealerId|L1DealerId|L2DealerId|L3DealerId|divisionId|servicingDealerId|customerId|     modelName|serialNumber|           timestamp|  lastSuppliesUpdate|      relatedGroupId|type|  description|state|  color|typical|   unit|capacity|printCount|tonerNumber|              tagIds|forecastingList|
+--------------------+--------+----------+----------+----------+----------+-----------------+----------+--------------+------------+--------------------+--------------------+--------------------+----+-------------+-----+-------+-------+-------+--------+----------+-----------+--------------------+---------------+
|mn=QlA1MEM1NQ==:s...|  840928|      null|      null|     

In [0]:
#adding schema to data
from pyspark.sql.types import (
    StructType, StructField,
    StringType, IntegerType, LongType, TimestampType
)
schema = StructType([
    StructField("deviceId", StringType(), True),
    StructField("dealerId", IntegerType(), True),
    StructField("L1DealerId", StringType(), True),
    StructField("L2DealerId", StringType(), True),
    StructField("L3DealerId", StringType(), True),
    StructField("divisionId", StringType(), True),
    StructField("servicingDealerId", StringType(), True),
    StructField("customerId", IntegerType(), True),
    StructField("modelName", StringType(), True),
    StructField("serialNumber", LongType(), True),
    StructField("timestamp", TimestampType(), True),
    StructField("lastSuppliesUpdate", TimestampType(), True),
    StructField("relatedGroupId", StringType(), True),
    StructField("type", IntegerType(), True),
    StructField("description", StringType(), True),
    StructField("state", StringType(), True),
    StructField("color", StringType(), True),
    StructField("typical", IntegerType(), True),
    StructField("unit", StringType(), True),
    StructField("capacity", IntegerType(), True),
    StructField("printCount", IntegerType(), True),
    StructField("tonerNumber", IntegerType(), True),
    StructField("tagIds", StringType(), True),
    StructField("forecastingList", StringType(), True)
])

df = spark.read \
    .format("csv") \
    .option("header", "true") \
    .schema(schema) \
    .load("/Volumes/workspace/default/volume/Toner Usage.csv")


In [0]:
#droping null columns
df1=df.drop('L1DealerID','L2DealerId','L3DealerID','divisionId','servicingDealerId')
df2=df1.dropna()
df2.printSchema()

root
 |-- deviceId: string (nullable = true)
 |-- dealerId: integer (nullable = true)
 |-- customerId: integer (nullable = true)
 |-- modelName: string (nullable = true)
 |-- serialNumber: long (nullable = true)
 |-- timestamp: timestamp (nullable = true)
 |-- lastSuppliesUpdate: timestamp (nullable = true)
 |-- relatedGroupId: string (nullable = true)
 |-- type: integer (nullable = true)
 |-- description: string (nullable = true)
 |-- state: string (nullable = true)
 |-- color: string (nullable = true)
 |-- typical: integer (nullable = true)
 |-- unit: string (nullable = true)
 |-- capacity: integer (nullable = true)
 |-- printCount: integer (nullable = true)
 |-- tonerNumber: integer (nullable = true)
 |-- tagIds: string (nullable = true)
 |-- forecastingList: string (nullable = true)



In [0]:
#creating delta table
spark.sql("CREATE DATABASE IF NOT EXISTS iot")

(
    df2
    .write
    .format("delta")
    .mode("append")
    .option("overwriteSchema", "true")
    .saveAsTable("iot.device_supplies")
)


In [0]:

#reading 
spark.sql("SELECT * FROM iot.device_supplies").display()


deviceId,dealerId,customerId,modelName,serialNumber,timestamp,lastSuppliesUpdate,relatedGroupId,type,description,state,color,typical,unit,capacity,printCount,tonerNumber,tagIds,forecastingList
mn=QlA1MEM1NQ==:sn=NDMwMDY5MzcwMA==,840928,9281566,SHARP BP-50C55,4300693700,2025-10-13T15:11:58.662Z,2025-10-13T14:34:31.000Z,110d917d-32ea-486b-b091-d38c7602293e,1,Yellow Toner,ok,yellow,92,percent,100,78,1,"""[""""55ebaeca-89ef-4e3c-b56c-e7863143d286""""]""",
mn=QlA1MEM1NQ==:sn=NDMwMDY5MzcwMA==,840928,9281566,SHARP BP-50C55,4300693700,2025-10-09T18:14:01.325Z,2025-10-09T17:33:33.000Z,110d917d-32ea-486b-b091-d38c7602293e,1,Yellow Toner,ok,yellow,93,percent,100,66,1,"""[""""55ebaeca-89ef-4e3c-b56c-e7863143d286""""]""",
mn=QlA1MEM1NQ==:sn=NDMwMDY5MzcwMA==,840928,9281566,SHARP BP-50C55,4300693700,2025-10-02T21:08:51.511Z,2025-10-02T20:30:08.000Z,110d917d-32ea-486b-b091-d38c7602293e,1,Magenta Toner,ok,magenta,96,percent,100,37,1,"""[""""55ebaeca-89ef-4e3c-b56c-e7863143d286""""]""",
mn=QlA1MEM1NQ==:sn=NDMwMDY5MzcwMA==,840928,9281566,SHARP BP-50C55,4300693700,2025-10-18T00:12:42.752Z,2025-10-17T23:34:29.000Z,110d917d-32ea-486b-b091-d38c7602293e,1,Black Toner,ok,black,90,percent,100,929,1,"""[""""55ebaeca-89ef-4e3c-b56c-e7863143d286""""]""",
mn=QlA1MEM1NQ==:sn=NDMwMDY5MzcwMA==,840928,9281566,SHARP BP-50C55,4300693700,2025-11-12T06:09:17.389Z,2025-11-12T06:09:17.389Z,110d917d-32ea-486b-b091-d38c7602293e,1,Black Toner,ok,black,71,percent,100,3914,1,,[]
mn=QlA1MEM1NQ==:sn=NDMwMDY5MzcwMA==,840928,9281566,SHARP BP-50C55,4300693700,2025-10-27T18:12:17.315Z,2025-10-27T17:35:18.000Z,110d917d-32ea-486b-b091-d38c7602293e,1,Cyan Toner,ok,cyan,83,percent,100,1408,1,"""[""""55ebaeca-89ef-4e3c-b56c-e7863143d286""""]""",
mn=QlA1MEM1NQ==:sn=NDMwMDY5MzcwMA==,840928,9281566,SHARP BP-50C55,4300693700,2025-11-03T18:14:31.711Z,2025-11-03T17:35:13.000Z,110d917d-32ea-486b-b091-d38c7602293e,1,Yellow Toner,ok,yellow,79,percent,100,2666,1,"""[""""55ebaeca-89ef-4e3c-b56c-e7863143d286""""]""",
mn=QlA1MEM1NQ==:sn=NDMwMDY5MzcwMA==,840928,9281566,SHARP BP-50C55,4300693700,2025-11-21T06:06:58.508Z,2025-11-21T06:06:58.508Z,110d917d-32ea-486b-b091-d38c7602293e,1,Magenta Toner,ok,magenta,74,percent,100,3982,1,,[]
mn=QlA1MEM1NQ==:sn=NDMwMDY5MzcwMA==,840928,9281566,SHARP BP-50C55,4300693700,2025-11-21T06:06:58.508Z,2025-11-21T06:06:58.508Z,110d917d-32ea-486b-b091-d38c7602293e,1,Black Toner,ok,black,67,percent,100,4741,1,,[]
mn=QlA1MEM1NQ==:sn=NDMwMDY5MzcwMA==,840928,9281566,SHARP BP-50C55,4300693700,2025-11-15T06:06:32.254Z,2025-11-15T06:06:32.254Z,110d917d-32ea-486b-b091-d38c7602293e,1,Black Toner,ok,black,69,percent,100,4382,1,,[]


In [0]:
#checking for nulls
from pyspark.sql import functions as F

df_transformed = (
    df2
    .filter(
        F.col("timestamp").isNotNull() &
        F.col("typical").isNotNull() &
        F.col("printCount").isNotNull()
    )
)
df_transformed.count()


686

In [0]:
#changing datatype of timestamp
df_transformed = df_transformed.withColumn(
    "timestamp_sec",
    F.col("timestamp").cast("long")
)



In [0]:
df_transformed = df_transformed.orderBy("deviceId", "description", "timestamp")
df_transformed.show(5)

+--------------------+--------+----------+--------------+------------+--------------------+-------------------+--------------------+----+-----------+-----+-----+-------+-------+--------+----------+-----------+--------------------+---------------+-------------+
|            deviceId|dealerId|customerId|     modelName|serialNumber|           timestamp| lastSuppliesUpdate|      relatedGroupId|type|description|state|color|typical|   unit|capacity|printCount|tonerNumber|              tagIds|forecastingList|timestamp_sec|
+--------------------+--------+----------+--------------+------------+--------------------+-------------------+--------------------+----+-----------+-----+-----+-------+-------+--------+----------+-----------+--------------------+---------------+-------------+
|mn=QlA1MEM1NQ==:s...|  840928|   9281566|SHARP BP-50C55|  4300693700|2025-09-24 21:12:...|2025-09-24 20:41:48|110d917d-32ea-486...|   1|Black Toner|   ok|black|     99|percent|     100|         0|          1|        

In [0]:
from pyspark.sql.window import Window
from pyspark.sql.functions import row_number, col

window_spec = Window.partitionBy(
    "deviceId", "color", "typical", "timestamp"
).orderBy(col("lastSuppliesUpdate").desc_nulls_last())

df_dedup = df_transformed.withColumn(
    "rn", row_number().over(window_spec)
).filter(col("rn") == 1).drop("rn")
df_dedup.distinct().count()


686

In [0]:
#droping duplicates
df_dedup = df_transformed.dropDuplicates(
    ["deviceId", "color", "typical", "timestamp"]
)
df_dedup.printSchema()

root
 |-- deviceId: string (nullable = true)
 |-- dealerId: integer (nullable = true)
 |-- customerId: integer (nullable = true)
 |-- modelName: string (nullable = true)
 |-- serialNumber: long (nullable = true)
 |-- timestamp: timestamp (nullable = true)
 |-- lastSuppliesUpdate: timestamp (nullable = true)
 |-- relatedGroupId: string (nullable = true)
 |-- type: integer (nullable = true)
 |-- description: string (nullable = true)
 |-- state: string (nullable = true)
 |-- color: string (nullable = true)
 |-- typical: integer (nullable = true)
 |-- unit: string (nullable = true)
 |-- capacity: integer (nullable = true)
 |-- printCount: integer (nullable = true)
 |-- tonerNumber: integer (nullable = true)
 |-- tagIds: string (nullable = true)
 |-- forecastingList: string (nullable = true)
 |-- timestamp_sec: long (nullable = true)



In [0]:
#converting data to vector 
from pyspark.ml.feature import VectorAssembler

time_assembler = VectorAssembler(
    inputCols=["timestamp_sec"],
    outputCol="features"
)

time_df = time_assembler.transform(df_dedup)


In [0]:
#using linear regression model
from pyspark.ml.regression import LinearRegression

lr_time = LinearRegression(
    featuresCol="features",
    labelCol="typical"
)

time_model = lr_time.fit(time_df)


In [0]:
#finding slope and intercept
a = time_model.coefficients[0]
b = time_model.intercept
#typical=a*timestamp+b

In [0]:
#doing prediction for end date 
predicted_end_time = -b / a


In [0]:
#printing the end date it converts numeric to date displayed
from pyspark.sql.functions import from_unixtime, lit

spark.range(1).select(
    from_unixtime(lit(predicted_end_time)).alias("predicted_end_date")
).show(truncate=False)



+-------------------+
|predicted_end_date |
+-------------------+
|2026-07-28 23:09:08|
+-------------------+



In [0]:
#finding end date without linear regression model
#finding min timestamp
from pyspark.sql.functions import min, col

min_time = df_dedup.select(min("timestamp")).collect()[0][0]
from pyspark.sql.functions import unix_timestamp

df_safe = df_dedup.withColumn(
    "x",
    (unix_timestamp("timestamp") - unix_timestamp(lit(min_time))) / 86400
).withColumnRenamed("typical", "y")


In [0]:
#calculating 
from pyspark.sql.functions import sum, col, count

stats = df_safe.select(
    count("*").alias("n"),
    sum("x").alias("sum_x"),
    sum("y").alias("sum_y"),
    sum(col("x") * col("y")).alias("sum_xy"),
    sum(col("x") * col("x")).alias("sum_x2")
).collect()[0]


In [0]:
#summation value
n = stats["n"]
sum_x = stats["sum_x"]
sum_y = stats["sum_y"]
sum_xy = stats["sum_xy"]
sum_x2 = stats["sum_x2"]
#slope and intercept 
m_pc = (n * sum_xy - sum_x * sum_y) / (n * sum_x2 - sum_x ** 2)
c_pc = (sum_y - m_pc * sum_x) / n


In [0]:
#prediction y=0 because it typical will zero in end date 
x_end_days = -c_pc/ m_pc
x_end_days


393.2919578906791

In [0]:
#printing the end date 
from pyspark.sql.functions import date_add, lit, col
spark.createDataFrame(
    [(x_end_days,)],
    ["days"]
).withColumn(
    "predicted_end_date",
    date_add(
        lit(min_time).cast("date"),
        col("days").cast("int")
    )
).show(truncate=False)


+-----------------+------------------+
|days             |predicted_end_date|
+-----------------+------------------+
|393.2919578906791|2026-07-28        |
+-----------------+------------------+



In [0]:
from pyspark.sql.functions import min

min_time_df = (
    df_dedup
    .groupBy("color")
    .agg(min("timestamp").alias("min_time"))
)
min_time_df.show(truncate=False)

+-------+-----------------------+
|color  |min_time               |
+-------+-----------------------+
|magenta|2025-06-30 16:08:43.031|
|cyan   |2025-06-30 16:08:43.031|
|black  |2025-06-30 16:08:43.031|
|yellow |2025-06-30 16:08:43.031|
+-------+-----------------------+



In [0]:
from pyspark.sql.functions import unix_timestamp, lit

df_scaled = (
    df_dedup
    .join(min_time_df, on="color")
    .withColumn(
        "x",
        (unix_timestamp("timestamp") - unix_timestamp("min_time")) / 86400.0
    )
    .withColumnRenamed("typical", "y")
)
df_scaled.display()

color,deviceId,dealerId,customerId,modelName,serialNumber,timestamp,lastSuppliesUpdate,relatedGroupId,type,description,state,y,unit,capacity,printCount,tonerNumber,tagIds,forecastingList,timestamp_sec,min_time,x
black,mn=QlA1MEM1NQ==:sn=NDMwMDY5MzcwMA==,840928,9281566,SHARP BP-50C55,4300693700,2025-12-16T14:00:29.180Z,2025-12-16T14:00:29.180Z,110d917d-32ea-486b-b091-d38c7602293e,1,Black Toner,ok,63,percent,100,5437,1,"""[""""55ebaeca-89ef-4e3c-b56c-e7863143d286""""]""",[],1765893629,2025-06-30T16:08:43.031Z,168.91094907407407
black,mn=QlA1MEM1NQ==:sn=NDMwMDY5MzcwMA==,840928,9281566,SHARP BP-50C55,4300693700,2025-11-25T06:06:21.424Z,2025-11-25T06:06:21.424Z,110d917d-32ea-486b-b091-d38c7602293e,1,Black Toner,ok,66,percent,100,4813,1,,[],1764050781,2025-06-30T16:08:43.031Z,147.5816898148148
black,mn=QlA1MEM1NQ==:sn=NDMwMDY5MzcwMA==,840928,9281566,SHARP BP-50C55,4300693700,2025-11-21T06:06:58.508Z,2025-11-21T06:06:58.508Z,110d917d-32ea-486b-b091-d38c7602293e,1,Black Toner,ok,67,percent,100,4741,1,,[],1763705218,2025-06-30T16:08:43.031Z,143.58211805555555
black,mn=QlA1MEM1NQ==:sn=NDMwMDY5MzcwMA==,840928,9281566,SHARP BP-50C55,4300693700,2025-11-19T06:06:32.701Z,2025-11-19T06:06:32.701Z,110d917d-32ea-486b-b091-d38c7602293e,1,Black Toner,ok,68,percent,100,4557,1,,[],1763532392,2025-06-30T16:08:43.031Z,141.58181712962963
black,mn=QlA1MEM1NQ==:sn=NDMwMDY5MzcwMA==,840928,9281566,SHARP BP-50C55,4300693700,2025-11-15T06:06:32.254Z,2025-11-15T06:06:32.254Z,110d917d-32ea-486b-b091-d38c7602293e,1,Black Toner,ok,69,percent,100,4382,1,,[],1763186792,2025-06-30T16:08:43.031Z,137.58181712962963
black,mn=QlA1MEM1NQ==:sn=NDMwMDY5MzcwMA==,840928,9281566,SHARP BP-50C55,4300693700,2025-11-12T06:09:17.389Z,2025-11-12T06:09:17.389Z,110d917d-32ea-486b-b091-d38c7602293e,1,Black Toner,ok,71,percent,100,3914,1,,[],1762927757,2025-06-30T16:08:43.031Z,134.58372685185185
black,mn=QlA1MEM1NQ==:sn=NDMwMDY5MzcwMA==,840928,9281566,SHARP BP-50C55,4300693700,2025-11-06T18:12:19.305Z,2025-11-06T17:35:11.000Z,110d917d-32ea-486b-b091-d38c7602293e,1,Black Toner,ok,72,percent,100,3749,1,"""[""""55ebaeca-89ef-4e3c-b56c-e7863143d286""""]""",,1762452739,2025-06-30T16:08:43.031Z,129.08583333333334
black,mn=QlA1MEM1NQ==:sn=NDMwMDY5MzcwMA==,840928,9281566,SHARP BP-50C55,4300693700,2025-11-05T00:12:13.072Z,2025-11-04T23:35:12.000Z,110d917d-32ea-486b-b091-d38c7602293e,1,Black Toner,ok,73,percent,100,3518,1,"""[""""55ebaeca-89ef-4e3c-b56c-e7863143d286""""]""",,1762301533,2025-06-30T16:08:43.031Z,127.33576388888888
black,mn=QlA1MEM1NQ==:sn=NDMwMDY5MzcwMA==,840928,9281566,SHARP BP-50C55,4300693700,2025-11-04T18:14:42.381Z,2025-11-04T17:35:12.000Z,110d917d-32ea-486b-b091-d38c7602293e,1,Black Toner,ok,74,percent,100,3285,1,"""[""""55ebaeca-89ef-4e3c-b56c-e7863143d286""""]""",,1762280082,2025-06-30T16:08:43.031Z,127.08748842592593
black,mn=QlA1MEM1NQ==:sn=NDMwMDY5MzcwMA==,840928,9281566,SHARP BP-50C55,4300693700,2025-11-04T00:14:27.453Z,2025-11-03T23:35:13.000Z,110d917d-32ea-486b-b091-d38c7602293e,1,Black Toner,ok,75,percent,100,3141,1,"""[""""55ebaeca-89ef-4e3c-b56c-e7863143d286""""]""",,1762215267,2025-06-30T16:08:43.031Z,126.33731481481482


In [0]:
from pyspark.sql.functions import sum, count, col
regression_stats = (
    df_scaled
    .groupBy("color")
    .agg(
        count("*").alias("n"),
        sum("x").alias("sum_x"),
        sum("y").alias("sum_y"),
        sum(col("x") * col("y")).alias("sum_xy"),
        sum(col("x") * col("x")).alias("sum_x2"),
        min("min_time").alias("min_time")
    )
)


In [0]:
result = (
    regression_stats
    .withColumn(
        "m",
        (col("n") * col("sum_xy") - col("sum_x") * col("sum_y")) /
        (col("n") * col("sum_x2") - col("sum_x") * col("sum_x"))
    )
    .withColumn(
        "c",
        (col("sum_y") - col("m") * col("sum_x")) / col("n")
    )
    .withColumn(
        "x_end_days",
        -col("c") / col("m")
    )
)


In [0]:
from pyspark.sql.functions import date_add

final_prediction = (
    result
    .withColumn(
        "predicted_end_date",
        date_add(
            col("min_time").cast("date"),
            col("x_end_days").cast("int")
        )
    )
    .select("color", "predicted_end_date")
)
final_prediction.display()



color,predicted_end_date
black,2006-07-06
yellow,2026-06-16
magenta,2026-04-09
cyan,2026-03-08


In [0]:
#print count
df_pc = (
    df_dedup
    .select("color", col("typical").cast("double").alias("x"),
            col("printCount").cast("double").alias("y"))
)


In [0]:
from pyspark.sql.functions import sum, count, col
pc_stats = (
    df_pc
    .groupBy("color")
    .agg(
        count("*").alias("n"),
        sum("x").alias("sum_x"),
        sum("y").alias("sum_y"),
        sum(col("x") * col("y")).alias("sum_xy"),
        sum(col("x") * col("x")).alias("sum_x2")
    )
)


In [0]:
pc_result = (
    pc_stats
    .withColumn(
        "m",
        (col("n") * col("sum_xy") - col("sum_x") * col("sum_y")) /
        (col("n") * col("sum_x2") - col("sum_x") * col("sum_x"))
    )
    .withColumn(
        "c",
        (col("sum_y") - col("m") * col("sum_x")) / col("n")
    )
)


In [0]:
printcount_prediction = (
    pc_result
    .select(
        "color",
        col("c").alias("predicted_print_count")
    )
)


In [0]:
printcount_prediction.show(truncate=False)


+-------+---------------------+
|color  |predicted_print_count|
+-------+---------------------+
|black  |28258.793472606274   |
|cyan   |21509.604253743615   |
|magenta|20427.127209616934   |
|yellow |20398.33497590219    |
+-------+---------------------+



In [0]:
#printcount without model
df_PC = (
    df_dedup
    .withColumnRenamed("printCount","y") 
    .withColumnRenamed("typical", "x")
)
df_PC.printSchema()

root
 |-- deviceId: string (nullable = true)
 |-- dealerId: integer (nullable = true)
 |-- customerId: integer (nullable = true)
 |-- modelName: string (nullable = true)
 |-- serialNumber: long (nullable = true)
 |-- timestamp: timestamp (nullable = true)
 |-- lastSuppliesUpdate: timestamp (nullable = true)
 |-- relatedGroupId: string (nullable = true)
 |-- type: integer (nullable = true)
 |-- description: string (nullable = true)
 |-- state: string (nullable = true)
 |-- color: string (nullable = true)
 |-- x: integer (nullable = true)
 |-- unit: string (nullable = true)
 |-- capacity: integer (nullable = true)
 |-- y: integer (nullable = true)
 |-- tonerNumber: integer (nullable = true)
 |-- tagIds: string (nullable = true)
 |-- forecastingList: string (nullable = true)
 |-- timestamp_sec: long (nullable = true)



In [0]:

from pyspark.sql import functions as F

df_casted= (
    df_PC
    .withColumn("x", F.col("x").cast("long"))
    .withColumn("y", F.col("y").cast("long"))
)

df_casted.printSchema()
from pyspark.sql import functions as F

df_xy = (
    df_casted
    .select("x", "y")
    .where(F.col("x").isNotNull() & F.col("y").isNotNull())
)

df_xy.printSchema()

root
 |-- deviceId: string (nullable = true)
 |-- dealerId: integer (nullable = true)
 |-- customerId: integer (nullable = true)
 |-- modelName: string (nullable = true)
 |-- serialNumber: long (nullable = true)
 |-- timestamp: timestamp (nullable = true)
 |-- lastSuppliesUpdate: timestamp (nullable = true)
 |-- relatedGroupId: string (nullable = true)
 |-- type: integer (nullable = true)
 |-- description: string (nullable = true)
 |-- state: string (nullable = true)
 |-- color: string (nullable = true)
 |-- x: long (nullable = true)
 |-- unit: string (nullable = true)
 |-- capacity: integer (nullable = true)
 |-- y: long (nullable = true)
 |-- tonerNumber: integer (nullable = true)
 |-- tagIds: string (nullable = true)
 |-- forecastingList: string (nullable = true)
 |-- timestamp_sec: long (nullable = true)

root
 |-- x: long (nullable = true)
 |-- y: long (nullable = true)



In [0]:
stats = df_xy.agg(
    F.mean("x").alias("x_mean"),
    F.mean("y").alias("y_mean")
).collect()[0]

x_mean = stats["x_mean"]
y_mean = stats["y_mean"]


In [0]:
agg = df_xy.agg(
    F.sum((F.col("x") - x_mean) * (F.col("y") - y_mean)).alias("numerator"),
    F.sum((F.col("x") - x_mean) ** 2).alias("denominator")
).collect()[0]

m = agg["numerator"] / agg["denominator"]
c = y_mean - m * x_mean

print("Slope:", m)
print("Intercept:", c)


Slope: -172.73825771637135
Intercept: 22650.624469060574


In [0]:
given_typical = 30

predicted_printCount = m * given_typical + c

print("Predicted printCount:", int(predicted_printCount))


Predicted printCount: 17468


In [0]:
from pyspark.sql import functions as F
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.regression import LinearRegression


In [0]:
df_model = (
    df_dedup
    .select("typical", "printCount")
    .dropna()
    .where(
        (F.col("typical") >= 0) &
        (F.col("printCount") >= 0)
    )
    .withColumn("typical", F.col("typical").cast("double"))
    .withColumn("printCount", F.col("printCount").cast("double"))
)

df_model.printSchema()


root
 |-- typical: double (nullable = true)
 |-- printCount: double (nullable = true)



In [0]:
assembler = VectorAssembler(
    inputCols=["typical"],   # X
    outputCol="features"
)

df_features = assembler.transform(df_model)


In [0]:
lr = LinearRegression(
    featuresCol="features",
    labelCol="printCount",
    fitIntercept=True
)

lr_model = lr.fit(df_features)


In [0]:
print("Coefficient (slope):", lr_model.coefficients[0])
print("Intercept:", lr_model.intercept)


Coefficient (slope): -171.39468851129104
Intercept: 22604.830392695003


In [0]:
predictions = lr_model.transform(df_features)

predictions.select(
    "typical",
    "printCount",
    F.col("prediction").alias("predicted_printCount")
).show(10)


+-------+----------+--------------------+
|typical|printCount|predicted_printCount|
+-------+----------+--------------------+
|   99.0|       0.0|    5636.75623007719|
|   97.0|       4.0|   5979.545607099772|
|   95.0|      17.0|   6322.334984122355|
|   94.0|     286.0|   6493.729672633646|
|   93.0|     432.0|   6665.124361144937|
|   92.0|     586.0|   6836.519049656228|
|   91.0|     839.0|   7007.913738167519|
|   90.0|     929.0|    7179.30842667881|
|   89.0|    1408.0|   7350.703115190101|
|   88.0|    1484.0|   7522.097803701392|
+-------+----------+--------------------+
only showing top 10 rows


In [0]:
new_df = spark.createDataFrame(
    [(30.0,)],
    ["typical"]
)

new_features = assembler.transform(new_df)

lr_model.transform(new_features).select(
    "typical",
    F.col("prediction").alias("predicted_printCount")
).show()


+-------+--------------------+
|typical|predicted_printCount|
+-------+--------------------+
|   30.0|   17462.98973735627|
+-------+--------------------+

