## Import and basic formatting

In [1]:
from pyspark.sql import SparkSession

# Import data from files with PySpark
fs = spark._jvm.org.apache.hadoop.fs.FileSystem.get(
    spark._jsc.hadoopConfiguration()
)

for f in fs.listStatus(spark._jvm.org.apache.hadoop.fs.Path("Files")):
    print(f.getPath().getName())

StatementMeta(, 84af411d-25f0-45c7-b872-40427c4cd8e4, 3, Finished, Available, Finished)

output.xlsx


In [2]:
import pandas as pd

# Read and show data with pandas
raw = pd.read_excel(
    "/lakehouse/default/Files/output.xlsx",
    header=None
)

raw.head(20)

StatementMeta(, 84af411d-25f0-45c7-b872-40427c4cd8e4, 4, Finished, Available, Finished)

Unnamed: 0,0,1,2
0,DateTime,Production,Consumption
1,2011-12-31 23:00:00,15223,15493
2,2012-01-01 00:00:00,13650,15332
3,2012-01-01 01:00:00,12858,15098
4,2012-01-01 02:00:00,12555,14840
5,2012-01-01 03:00:00,12096,14591
6,2012-01-01 04:00:00,11956,14543
7,2012-01-01 05:00:00,11817,14604
8,2012-01-01 06:00:00,11670,14780
9,2012-01-01 07:00:00,11112,14889


In [3]:
# Format dataframe
pdf = pd.read_excel(
    "/lakehouse/default/Files/output.xlsx",
    header=0,        # or 1, 2, etc.
    skiprows=0       # if metadata rows exist
)

StatementMeta(, 84af411d-25f0-45c7-b872-40427c4cd8e4, 5, Finished, Available, Finished)

In [4]:
pdf.head()
#pdf.tail()

StatementMeta(, 84af411d-25f0-45c7-b872-40427c4cd8e4, 6, Finished, Available, Finished)

Unnamed: 0,DateTime,Production,Consumption
0,2011-12-31 23:00:00,15223,15493
1,2012-01-01 00:00:00,13650,15332
2,2012-01-01 01:00:00,12858,15098
3,2012-01-01 02:00:00,12555,14840
4,2012-01-01 03:00:00,12096,14591


In [6]:
# MAke sure DateTime column is datetime, strip time zone
pdf["DateTime"] = (
    pd.to_datetime(pdf["DateTime"])
      .dt.tz_localize(None)
)

StatementMeta(, 84af411d-25f0-45c7-b872-40427c4cd8e4, 8, Finished, Available, Finished)

In [7]:
pdf.head()

StatementMeta(, 84af411d-25f0-45c7-b872-40427c4cd8e4, 9, Finished, Available, Finished)

Unnamed: 0,DateTime,Production,Consumption
0,2011-12-31 23:00:00,15223,15493
1,2012-01-01 00:00:00,13650,15332
2,2012-01-01 01:00:00,12858,15098
3,2012-01-01 02:00:00,12555,14840
4,2012-01-01 03:00:00,12096,14591


In [8]:
# Sanity check
pdf["DateTime"].min(), pdf["DateTime"].max()

StatementMeta(, 84af411d-25f0-45c7-b872-40427c4cd8e4, 10, Finished, Available, Finished)

(Timestamp('2011-12-31 23:00:00'), Timestamp('2025-07-29 21:00:00'))

In [11]:
from pyspark.sql.functions import to_date, hour

StatementMeta(, 84af411d-25f0-45c7-b872-40427c4cd8e4, 13, Finished, Available, Finished)

## SQL table creation

In [12]:
# Create columns Date, Hour
df = spark.createDataFrame(pdf)

df = (
    df
    .withColumn("Date", to_date("DateTime"))
    .withColumn("Hour", hour("DateTime"))
)

df.write \
  .mode("overwrite") \
  .option("overwriteSchema", "true") \
  .saveAsTable("energy_operations")

StatementMeta(, 84af411d-25f0-45c7-b872-40427c4cd8e4, 14, Finished, Available, Finished)

In [13]:
#Sanity check
spark.sql("SELECT COUNT(*) FROM energy_operations").show()

StatementMeta(, 84af411d-25f0-45c7-b872-40427c4cd8e4, 15, Finished, Available, Finished)

+--------+
|count(1)|
+--------+
|  119015|
+--------+



In [14]:
spark.sql("SELECT * FROM energy_operations").show()

StatementMeta(, 84af411d-25f0-45c7-b872-40427c4cd8e4, 16, Finished, Available, Finished)

+-------------------+----------+-----------+----------+----+
|           DateTime|Production|Consumption|      Date|Hour|
+-------------------+----------+-----------+----------+----+
|2017-02-02 13:00:00|     24202|      19992|2017-02-02|  13|
|2017-02-02 14:00:00|     24272|      20074|2017-02-02|  14|
|2017-02-02 15:00:00|     24332|      20254|2017-02-02|  15|
|2017-02-02 16:00:00|     24465|      20276|2017-02-02|  16|
|2017-02-02 17:00:00|     23785|      20006|2017-02-02|  17|
|2017-02-02 18:00:00|     23433|      19719|2017-02-02|  18|
|2017-02-02 19:00:00|     22718|      19404|2017-02-02|  19|
|2017-02-02 20:00:00|     22377|      19017|2017-02-02|  20|
|2017-02-02 21:00:00|     21320|      18356|2017-02-02|  21|
|2017-02-02 22:00:00|     19655|      17540|2017-02-02|  22|
|2017-02-02 23:00:00|     17447|      16825|2017-02-02|  23|
|2017-02-03 00:00:00|     16741|      16464|2017-02-03|   0|
|2017-02-03 01:00:00|     16494|      16157|2017-02-03|   1|
|2017-02-03 02:00:00|   

## Additional column creation for BI treatment purposes

In [15]:
from pyspark.sql.functions import col, year, month, dayofmonth, date_format, weekday
from pyspark.sql import functions as F

df = spark.read.table("energy_operations")

date_df = (
    df.select(F.to_date("DateTime").alias("Date"))
      .distinct()
      .withColumn("Year", year("Date"))
      .withColumn("MonthNumber", month("Date"))
      .withColumn("Month", date_format("Date", "MMMM"))
      .withColumn("MonthShort", date_format("Date", "MMM"))
      .withColumn("Day", dayofmonth("Date"))
      .withColumn("WeekdayNumber", weekday("Date") + 1)
      .withColumn("Weekday", date_format("Date", "EEEE"))
      .withColumn("YearMonth", date_format("Date", "yyyy-MM"))
)

date_df.write \
  .mode("overwrite") \
  .saveAsTable("dim_date")


StatementMeta(, 84af411d-25f0-45c7-b872-40427c4cd8e4, 17, Finished, Available, Finished)

In [1]:
from pyspark.sql import functions as F

hour_df = (
    spark.range(0, 24)
    .withColumnRenamed("id", "Hour")
    .withColumn("HourLabel", F.format_string("%02d:00", F.col("Hour")))
)

hour_df.write \
    .mode("overwrite") \
    .saveAsTable("dim_hour")


StatementMeta(, ee1b6d67-ad7f-4393-8ddd-e7eee9b724a7, 3, Finished, Available, Finished)