In [None]:
!pip install pyspark

Collecting pyspark
  Downloading pyspark-3.5.0.tar.gz (316.9 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m316.9/316.9 MB[0m [31m3.6 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
Building wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.5.0-py2.py3-none-any.whl size=317425345 sha256=3ec55ed3420eefb0c5b6ba7ab0fde5f4b42611e80a3e7168107bd5a05aad6eab
  Stored in directory: /root/.cache/pip/wheels/41/4e/10/c2cf2467f71c678cfc8a6b9ac9241e5e44a01940da8fbb17fc
Successfully built pyspark
Installing collected packages: pyspark
Successfully installed pyspark-3.5.0


In [None]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, sum, monotonically_increasing_id, split, substring, array_contains, to_timestamp, lit, array, expr
from pyspark.sql.window import Window

# Initialize SparkSession
spark = SparkSession.builder.getOrCreate()

# Load Actual Silo Readings
df_actuals = spark.read.csv("/content/silo_actuals (1).csv", sep=",", header=True, inferSchema=True)

# Load Reference Tons from historical daily
df_historical = spark.read.csv("/content/historical_averages (1).csv", sep=",", header=True, inferSchema=True)

# Create a dataframe of all days in June 2023
days_list = [
    "06-01-2023", "06-02-2023", "06-03-2023", "06-04-2023", "06-05-2023", "06-06-2023", "06-07-2023",
    "06-08-2023", "06-09-2023", "06-10-2023", "06-11-2023", "06-12-2023", "06-13-2023", "06-14-2023",
    "06-15-2023", "06-16-2023", "06-17-2023", "06-18-2023", "06-19-2023", "06-20-2023", "06-21-2023",
    "06-22-2023", "06-23-2023", "06-24-2023", "06-25-2023", "06-26-2023", "06-27-2023", "06-28-2023",
    "06-29-2023", "06-30-2023"
]

# Create a DataFrame with the days list
df_days = spark.createDataFrame([(day,) for day in days_list], ["date"])

# Join the Actual Silo Readings dataframe with the days dataframe
df_actuals = df_actuals.join(df_days, on="date", how="right")

# Fill in missing values with reference tons
df_actuals = df_actuals.fillna(0).replace(0, df_historical.select(sum("average_tons")).collect()[0][0]).withColumnRenamed("sum(average_tons)", "average_tons")

# Calculate the running total of tons for each day
window = Window.orderBy("date").rowsBetween(Window.unboundedPreceding, 0)
df_actuals = df_actuals.withColumn("mtd_running_total_tons", sum("silo_wt_in_tons").over(window))

# Save the output to a csv file
df_actuals.write.csv("/content/output1_reference.csv", mode="overwrite", header=True)

# Show the top 10 rows of the output
df_actuals.show(31)


+----------+---------------+----------------------+
|      date|silo_wt_in_tons|mtd_running_total_tons|
+----------+---------------+----------------------+
|06-01-2023|            415|                   415|
|06-02-2023|            415|                   830|
|06-03-2023|            415|                  1245|
|06-04-2023|            415|                  1660|
|06-05-2023|            415|                  2075|
|06-06-2023|            415|                  2490|
|06-07-2023|            415|                  2905|
|06-08-2023|            415|                  3320|
|06-09-2023|            415|                  3735|
|06-10-2023|            415|                  4150|
|06-11-2023|            415|                  4565|
|06-12-2023|            415|                  4980|
|06-13-2023|            415|                  5395|
|06-14-2023|            415|                  5810|
|06-15-2023|            415|                  6225|
|06-16-2023|            415|                  6640|
|06-17-2023|