<a href="https://colab.research.google.com/github/dwhite12/Colab-Notebooks/blob/main/Uplight_Assessment.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
#install requirements
!sudo apt-get update
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
!wget -q https://dlcdn.apache.org/spark/spark-3.5.7/spark-3.5.7-bin-hadoop3.tgz
!tar xf spark-3.5.7-bin-hadoop3.tgz
!pip install -q findspark
!pip install --upgrade pyspark
!pip install py4j
!pip install numpy

0% [Working]            Hit:1 https://cli.github.com/packages stable InRelease
0% [Connecting to archive.ubuntu.com] [Connecting to security.ubuntu.com (185.1                                                                               Hit:2 https://cloud.r-project.org/bin/linux/ubuntu jammy-cran40/ InRelease
0% [Connecting to archive.ubuntu.com] [Connecting to security.ubuntu.com (185.1                                                                               Hit:3 https://developer.download.nvidia.com/compute/cuda/repos/ubuntu2204/x86_64  InRelease
0% [Waiting for headers] [Waiting for headers] [Waiting for headers] [Connected                                                                               Hit:4 http://archive.ubuntu.com/ubuntu jammy InRelease
Get:5 http://archive.ubuntu.com/ubuntu jammy-updates InRelease [128 kB]
Hit:6 https://r2u.stat.illinois.edu/ubuntu jammy InRelease
Hit:7 http://security.ubuntu.com/ubuntu jammy-security InRelease
Hit:8 http://arch



In [None]:
#create spark session
import os
import sys
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-3.5.7-bin-hadoop3"

import findspark
findspark.init()
findspark.find()

import pyspark

from pyspark.sql import DataFrame, SparkSession
from pyspark.testing.utils import assertDataFrameEqual as ADF
from typing import List
import pyspark.sql.types as T
import pyspark.sql.functions as F

spark = SparkSession \
        .builder \
        .appName("Energy Usage Pipeline") \
        .getOrCreate()

In [None]:
#Read the provided CSV from a local file path
#Validate the schema (all columns have expected types). Handle or log invalid records.
valid_schema = T.StructType([
    T.StructField("customer_id", T.IntegerType(), True),
    T.StructField("usage_kwh", T.DoubleType(), False),
    T.StructField("start_time", T.StringType(), True),
    T.StructField("_corrupt_record", T.StringType(), True)
])

#log invalid records using permissive mode
df = spark.read \
    .schema(valid_schema) \
    .option("header", "true") \
    .option("mode", "PERMISSIVE") \
    .format("csv") \
    .schema(valid_schema) \
    .load("/energy_usage.csv")

#df = spark.read.load("/content/energy_usage.csv", format="csv", header="true", schema=valid_schema)
df.show()
df.printSchema()

+-----------+---------+--------------------+--------------------+
|customer_id|usage_kwh|          start_time|     _corrupt_record|
+-----------+---------+--------------------+--------------------+
|        123|      5.1|2023-05-12T01:00:...|                NULL|
|        456|      4.3|2023-05-12T01:30:...|                NULL|
|        123|      2.2|2023-05-12T02:00:...|                NULL|
|        789|     NULL|2023-05-11T23:00:...|                NULL|
|       NULL|      1.1|2023-05-12T01:00:...|abc,1.1,2023-05-1...|
|        123|      NaN|2023-05-12T03:00:...|                NULL|
|        456|      8.9|2023-05-12T06:00:...|                NULL|
+-----------+---------+--------------------+--------------------+

root
 |-- customer_id: integer (nullable = true)
 |-- usage_kwh: double (nullable = true)
 |-- start_time: string (nullable = true)
 |-- _corrupt_record: string (nullable = true)



In [None]:
#Transform the data:
  #Convert `start_time` to UTC datetime.
df = df.withColumn("start_time", F.col("start_time").cast("timestamp"))
time_offset = "-07:00"
df_utc = df.withColumn("start_time_utc", F.to_utc_timestamp(F.col("start_time"), time_offset))
df_utc.show()

+-----------+---------+-------------------+--------------------+-------------------+
|customer_id|usage_kwh|         start_time|     _corrupt_record|     start_time_utc|
+-----------+---------+-------------------+--------------------+-------------------+
|        123|      5.1|2023-05-12 08:00:00|                NULL|2023-05-12 15:00:00|
|        456|      4.3|2023-05-12 08:30:00|                NULL|2023-05-12 15:30:00|
|        123|      2.2|2023-05-12 09:00:00|                NULL|2023-05-12 16:00:00|
|        789|     NULL|2023-05-12 06:00:00|                NULL|2023-05-12 13:00:00|
|       NULL|      1.1|2023-05-12 08:00:00|abc,1.1,2023-05-1...|2023-05-12 15:00:00|
|        123|      NaN|2023-05-12 10:00:00|                NULL|2023-05-12 17:00:00|
|        456|      8.9|2023-05-12 13:00:00|                NULL|2023-05-12 20:00:00|
+-----------+---------+-------------------+--------------------+-------------------+



In [None]:
#Aggregate each customer's total `usage_kwh` per day (UTC).
#filter out corrupt records
df_usage_clean = df_utc.filter(df_utc._corrupt_record.isNull())
#fill NULL and NA with 0; you could also drop these depending on the use case
df_usage_clean = df_usage_clean.na.fill(value=0)
df_usage_clean.show()
df_usage_agg = df_usage_clean.groupBy(F.to_date(F.col("start_time_utc"), "MM/dd/yyyy").alias("date"), "customer_id").sum("usage_kwh")
df_usage_agg.show()

+-----------+---------+-------------------+---------------+-------------------+
|customer_id|usage_kwh|         start_time|_corrupt_record|     start_time_utc|
+-----------+---------+-------------------+---------------+-------------------+
|        123|      5.1|2023-05-12 08:00:00|           NULL|2023-05-12 15:00:00|
|        456|      4.3|2023-05-12 08:30:00|           NULL|2023-05-12 15:30:00|
|        123|      2.2|2023-05-12 09:00:00|           NULL|2023-05-12 16:00:00|
|        789|      0.0|2023-05-12 06:00:00|           NULL|2023-05-12 13:00:00|
|        123|      0.0|2023-05-12 10:00:00|           NULL|2023-05-12 17:00:00|
|        456|      8.9|2023-05-12 13:00:00|           NULL|2023-05-12 20:00:00|
+-----------+---------+-------------------+---------------+-------------------+

+----------+-----------+--------------+
|      date|customer_id|sum(usage_kwh)|
+----------+-----------+--------------+
|2023-05-12|        789|           0.0|
|2023-05-12|        123|           7.3|

In [None]:
#Output the cleaned & aggregated data as a Parquet or Delta file to a specified output path.
file_output = "/energy_usage.parquet"
df_usage_agg.write.parquet(file_output)

AnalysisException: [PATH_ALREADY_EXISTS] Path file:/energy_usage.parquet already exists. Set mode as "overwrite" to overwrite the existing path.

In [None]:
#**Testing:** Add one unit test for your transformation logic.
#def test_transformation_logic(spark):
    #'''
   # Test conversion to UTC and aggregation
    #'''
test_rows = [(1, 4.2, "2025-09-01T12:45:00-07:00"),
             (1, 2.3, "2025-09-01T19:29:00-07:00"),
             (1, 6.4, "2025-09-02T13:12:00-07:00"),
             (2, 3.7, "2025-09-02T08:33:00-07:00"),
             (3, 8.3, "2025-09-02T14:22:00-07:00"),
             (4, 3.1, "2025-09-02T16:42:00-07:00"),
             (4, 1.2, "2025-09-02T21:30:00-07:00"),
             (5, 5.0, "2025-09-02T22:19:00-07:00")
            ]

test_columns = T.StructType(
    [T.StructField("customer_id", T.IntegerType(), False),
     T.StructField("usage_kwh", T.DoubleType(), False),
     T.StructField("start_time", T.StringType(), False)
     ])

test_df = spark.createDataFrame(data=test_rows, schema=test_columns)
test_df.show()

#expected output
expected_rows = [("2025-09-02", 1, 6.5),
                 ("2025-09-02", 2, 3.7),
                 ("2025-09-03", 1, 6.4),
                 ("2025-09-03", 3, 8.3),
                 ("2025-09-03", 4, 4.3),
                 ("2025-09-03", 5, 5.0)
                 ]

expected_columns = T.StructType(
    [T.StructField("date", T.StringType(), False),
     T.StructField("customer_id", T.IntegerType(), False),
     T.StructField("sum_usage_kwh", T.DoubleType(), False)
    ])

expected_df = spark.createDataFrame(data=expected_rows, schema=expected_columns)
expected_df = expected_df.withColumn("date", F.col("date").cast("date"))
expected_df.show()

+-----------+---------+--------------------+
|customer_id|usage_kwh|          start_time|
+-----------+---------+--------------------+
|          1|      4.2|2025-09-01T12:45:...|
|          1|      2.3|2025-09-01T19:29:...|
|          1|      6.4|2025-09-02T13:12:...|
|          2|      3.7|2025-09-02T08:33:...|
|          3|      8.3|2025-09-02T14:22:...|
|          4|      3.1|2025-09-02T16:42:...|
|          4|      1.2|2025-09-02T21:30:...|
|          5|      5.0|2025-09-02T22:19:...|
+-----------+---------+--------------------+

+----------+-----------+-------------+
|      date|customer_id|sum_usage_kwh|
+----------+-----------+-------------+
|2025-09-02|          1|          6.5|
|2025-09-02|          2|          3.7|
|2025-09-03|          1|          6.4|
|2025-09-03|          3|          8.3|
|2025-09-03|          4|          4.3|
|2025-09-03|          5|          5.0|
+----------+-----------+-------------+



In [None]:
#Transform the data:
  #Convert `start_time` to UTC datetime.
test_df = test_df.withColumn("start_time", F.col("start_time").cast("timestamp"))
time_offset = "-07:00"
test_df_utc = test_df.withColumn("start_time_utc", F.to_utc_timestamp(F.col("start_time"), time_offset))
test_df_utc.show()

test_df_agg = test_df_utc.groupBy(F.to_date(F.col("start_time_utc"), \
                                            "MM/dd/yyyy").alias("date"), "customer_id").sum("usage_kwh") \
                                            .orderBy("date", "customer_id")
test_df_agg.show()

+-----------+---------+-------------------+-------------------+
|customer_id|usage_kwh|         start_time|     start_time_utc|
+-----------+---------+-------------------+-------------------+
|          1|      4.2|2025-09-01 19:45:00|2025-09-02 02:45:00|
|          1|      2.3|2025-09-02 02:29:00|2025-09-02 09:29:00|
|          1|      6.4|2025-09-02 20:12:00|2025-09-03 03:12:00|
|          2|      3.7|2025-09-02 15:33:00|2025-09-02 22:33:00|
|          3|      8.3|2025-09-02 21:22:00|2025-09-03 04:22:00|
|          4|      3.1|2025-09-02 23:42:00|2025-09-03 06:42:00|
|          4|      1.2|2025-09-03 04:30:00|2025-09-03 11:30:00|
|          5|      5.0|2025-09-03 05:19:00|2025-09-03 12:19:00|
+-----------+---------+-------------------+-------------------+

+----------+-----------+--------------+
|      date|customer_id|sum(usage_kwh)|
+----------+-----------+--------------+
|2025-09-02|          1|           6.5|
|2025-09-02|          2|           3.7|
|2025-09-03|          1|       

In [None]:
#ADF(test_df_agg, expected_df)
print(pyspark.__version__)

NameError: name 'pyspark' is not defined