In [0]:
# Import necessary libraries
from pyspark.sql import SparkSession
from statsmodels.tsa.statespace.sarimax import SARIMAX
from datetime import datetime
from pyspark.dbutils import DBUtils
import re

import pyodbc

import pandas as pd
import numpy as np

from pyspark.sql.types import ArrayType, StructType, StructField, StringType 
from pyspark.sql.functions import from_json, col, explode,collect_list

# Initialize Spark session (Databricks environment should have this pre-configured)
spark = SparkSession.builder.appName("Energy Consumption Prediction").getOrCreate()

dbutils = DBUtils(spark)
# Retrieve parameter passed to the notebook
# Get Forecasters input from Databricks task id
databrick_task_id = dbutils.widgets.get("DatabrickTaskID")

print(databrick_task_id)



2605


In [0]:

# Read data from SQL Server
server_name = "jdbc:sqlserver://esk-maz-sdb-san-dev-01.database.windows.net"
database_name = "ESK-MAZ-SDB-SAN-DEV-01"
url = server_name + ";" + "databaseName=" + database_name + ";"
table_DBT = "dbo.DataBrickTasks"
table_UFM = "dbo.UserForecastMethod"

table_actual = "dbo.ActualData" 
table_version = "dbo.DimVersion"
table_forecast = "dw.ForecastActive"
user = "arul"
password = "aari@Singds.8734"



In [0]:
# Get Forecasters input from Databricks task id
# Define a SQL query to retrieve various forecasting details associated with a specific Databricks task.
# This includes forecast methods, customer details, regional data, and task execution status.


# ufm.Name
query = f"""
SELECT TOP 1
    ufm.StartDate,
    ufm.EndDate,
    ufm.Parameters,
    ufm.Region,
    ufm.Status,
    ufm.ForecastMethodID,
    ufm.UserForecastMethodID,
    ufm.JSONCustomer as CustomerJSON,
    ufm.varJSON,  
    dfm.Method,
    dbt.DatabrickID
FROM 
    [dbo].[DataBrickTasks] AS dbt
INNER JOIN 
    [dbo].[UserForecastMethod] AS ufm ON dbt.UserForecastMethodID = ufm.UserForecastMethodID
INNER JOIN 
    [dbo].[DimForecastMethod] AS dfm ON ufm.ForecastMethodID = dfm.ForecastMethodID

WHERE  dbt.DatabrickID={databrick_task_id} and ufm.ForecastMethodID = 2 and dbt.ExecutionStatus='In Progress' 

ORDER BY
    dbt.CreationDate

"""
print(query)

# WHERE ExecutionStatus IN ('In Progress')

# Read data using Spark SQL by setting up the database connection and executing the SQL query.
df = spark.read \
    .format("jdbc") \
    .option("url", url) \
    .option("query", query) \
    .option("user", user) \
    .option("password", password) \
    .load()

# Assuming you need to convert the Spark DataFrame to a Pandas DataFrame
# If the resulting DataFrame from your query matches what you want to convert to Pandas, you can do so directly
# Extract specific fields from the DataFrame, convert them to a Pandas DataFrame, and store in variables.

Forecast_Method_Name = df.select("Method").toPandas().iloc[0]['Method']
Model_Parmeters=df.select("Parameters").toPandas().iloc[0]['Parameters']
UFMID=df.select("UserForecastMethodID").toPandas().iloc[0]['UserForecastMethodID']
# CustomerID=df.select("Customer").toPandas().iloc[0]['Customer']

StartDate=df.select("StartDate").toPandas().iloc[0]['StartDate']
EndDate=df.select("EndDate").toPandas().iloc[0]['EndDate']
DatabrickID=df.select("DatabrickID").toPandas().iloc[0]['DatabrickID']
Hyper_Parameters=df.select("Parameters").toPandas().iloc[0]['Parameters']





SELECT TOP 1
    ufm.StartDate,
    ufm.EndDate,
    ufm.Parameters,
    ufm.Customer,
    ufm.Region,
    ufm.Status,
    ufm.ForecastMethodID,
    ufm.UserForecastMethodID,
    ufm.JSONCustomer as CustomerJSON,
    ufm.varJSON,  
    dfm.Method,
    dbt.DatabrickID
FROM 
    [dbo].[DataBrickTasks] AS dbt
INNER JOIN 
    [dbo].[UserForecastMethod] AS ufm ON dbt.UserForecastMethodID = ufm.UserForecastMethodID
INNER JOIN 
    [dbo].[DimForecastMethod] AS dfm ON ufm.ForecastMethodID = dfm.ForecastMethodID

WHERE  dbt.DatabrickID=2605 and ufm.ForecastMethodID = 2 and dbt.ExecutionStatus='Failed' 

ORDER BY
    dbt.CreationDate




In [0]:
print(df)

DataFrame[StartDate: date, EndDate: date, Parameters: string, Customer: string, Region: string, Status: string, ForecastMethodID: int, UserForecastMethodID: int, CustomerJSON: string, varJSON: string, Method: string, DatabrickID: int]


In [0]:
print(Forecast_Method_Name,UFMID,DatabrickID,Hyper_Parameters)

SARIMA 693 2605 1 (1,1,1)


In [0]:
print(UFMID)

693


In [0]:

json_schema = ArrayType(StructType([
    StructField("CustomerID", StringType(), True)
]))


# df_cust.show()

if "CustomerJSON" in df.columns:
    json_schema = ArrayType(StructType([
        StructField("CustomerID", StringType(), True)
    ]))

    # Parse the JSON string into a column of arrays of structs
    df_cust = df.withColumn("ParsedJSON", from_json("CustomerJSON", json_schema))\
                .select(explode("ParsedJSON").alias("CustomerDetails"))\
                .select(col("CustomerDetails.CustomerID"))



    # Collect the IDs into a list
    multiple_customer_ids_list = df_cust.agg(collect_list("CustomerID")).first()[0]

    # Convert the list to a comma-separated string


 # Convert the list to a comma-separated string
    if multiple_customer_ids_list:
        multiple_customer_ids_list = ','.join(multiple_customer_ids_list)
    else:
        multiple_customer_ids_list = ''

    # Output the comma-separated IDs
    print("Comma-separated Customer IDs:")
    print(multiple_customer_ids_list)

else:
    print("Column 'CustomerJSON' does not exist in the dataframe.")


Comma-separated Customer IDs:
49,109,121,169,229,241,289,300,567


In [0]:
print(df.columns)
if "varJSON" in df.columns:
    print("True")
    VarJsonSchema=ArrayType(
                        StructType([
                                    StructField("VariableID",StringType(),True)
                                ])

                            )
    df_var=df.withColumn("ParsedVarJson",from_json("varJSON",VarJsonSchema))\
         .select(explode("ParsedVarJson").alias("SelectedVarList"))\
         .select(col("SelectedVarList.VariableID")).alias("VariableID")
    all_variables=df_var.agg(collect_list("VariableID")).first()[0] 
    all_variables=','.join(all_variables)

print(all_variables)

# Ensure all_variables is a list
if isinstance(all_variables, str):
    all_variables = all_variables.split(',')

['StartDate', 'EndDate', 'Parameters', 'Customer', 'Region', 'Status', 'ForecastMethodID', 'UserForecastMethodID', 'CustomerJSON', 'varJSON', 'Method', 'DatabrickID']
True
Peak,Standard


In [0]:

# Create DataFrame


# Sample AllVariables DataFrame
# all_variables = ["Peak"]

# Define the required columns mapping
columns_mapping = {
    frozenset(["PeakConsumption", "StandardConsumption", "OffPeakConsumption"]): ["ReportingMonth", "CustomerID", "OffpeakConsumption", "StandardConsumption", "PeakConsumption"],
    frozenset(["PeakConsumption", "StandardConsumption"]): ["ReportingMonth", "CustomerID", "StandardConsumption", "PeakConsumption"],
    frozenset(["PeakConsumption", "OffPeakConsumption"]): ["ReportingMonth", "CustomerID", "OffpeakConsumption", "PeakConsumption"],
    frozenset(["StandardConsumption", "OffPeakConsumption"]): ["ReportingMonth", "CustomerID", "OffpeakConsumption", "StandardConsumption"],
    frozenset(["PeakConsumption"]): ["ReportingMonth", "CustomerID", "PeakConsumption"],
    frozenset(["StandardConsumption"]): ["ReportingMonth", "CustomerID", "StandardConsumption"],
    frozenset(["OffPeakConsumption"]): ["ReportingMonth", "CustomerID", "OffpeakConsumption"]
}

# Convert AllVariables to a set for easy comparison
all_variables_set = set(all_variables)
print(all_variables_set)

# Find the matching key in the columns_mapping
matching_key = None
for key in columns_mapping.keys():
    # print(key)
    if key.issubset(all_variables_set):
        matching_key = key
        break

# Select the appropriate columns based on the matching key
if matching_key:
    selected_columns = columns_mapping[matching_key]

    print(selected_columns)
else:
    print("No matching columns found in AllVariables")

{'Standard', 'Peak'}
['ReportingMonth', 'CustomerID', 'StandardConsumption', 'PeakConsumption']


In [0]:
print(selected_columns[2:])

['StandardConsumption', 'PeakConsumption']


In [0]:
# Construct a SQL query to select all records from the actuals table for a specific customer.
query_act_cons = f"(SELECT * FROM {table_actual} WHERE CustomerID  IN ({multiple_customer_ids_list})) AS subquery"


# Read data from a JDBC source using the Spark SQL DataFrameReader. This operation involves connecting to the database
# and executing the query to fetch actual consumption data for the specified customer.
df = spark.read \
    .format("jdbc") \
    .option("url", url) \
    .option("dbtable", query_act_cons) \
    .option("user", user) \
    .option("password", password) \
    .load()

# Convert the Spark DataFrame to a Pandas DataFrame to use with time series analysis in Python.
pandas_df = df.select(*selected_columns).toPandas()
pandas_df['CustomerID'] = pandas_df['CustomerID'].astype(str)

#pandas_df['ReportingMonth'] = pd.to_datetime(pandas_df['ReportingMonth'])
pandas_df['ReportingMonth'] = pd.to_datetime(pandas_df['ReportingMonth']).dt.to_period('M').dt.to_timestamp()

print(query_act_cons)

(SELECT * FROM dbo.ActualData WHERE CustomerID  IN (49,109,121,169,229,241,289,300,567)) AS subquery


In [0]:
# Find the most recent reporting month in the data, which will be used to determine the starting point for forecasting.
Actuals_last_date = pandas_df['ReportingMonth'].max() 


# Generate a date range from the start date to the end date with a monthly frequency, starting on the first of each month.
# This range represents the forecast period.

forecast_dates = pd.date_range(start=StartDate, end=EndDate, freq='MS')[0:]
print(forecast_dates,f"Actual_last_date {Actuals_last_date}",f"No of month to predict for {len(forecast_dates)}")

DatetimeIndex(['2024-08-01', '2024-09-01', '2024-10-01', '2024-11-01',
               '2024-12-01', '2025-01-01', '2025-02-01', '2025-03-01',
               '2025-04-01', '2025-05-01', '2025-06-01', '2025-07-01',
               '2025-08-01', '2025-09-01', '2025-10-01', '2025-11-01',
               '2025-12-01', '2026-01-01', '2026-02-01', '2026-03-01',
               '2026-04-01', '2026-05-01', '2026-06-01', '2026-07-01'],
              dtype='datetime64[ns]', freq='MS') Actual_last_date 2024-03-01 00:00:00 No of month to predict for 24


In [0]:

# Calculate n_periods as the number of months between the last historical date
# and the last date you want to forecast
n_periods = len(forecast_dates)

print(f"Number of periods to forecast: {n_periods}")


Number of periods to forecast: 24


In [0]:
# validate if data is only processed for intended customer

unique_customers = pandas_df['CustomerID'].unique()
print(unique_customers)


['109' '121' '169' '229' '241' '289' '49']


In [0]:
# Initialize default parameter sets
parameter_set1 = (0, 0, 0)  # Default value for ARIMA order
parameter_set2 = (0, 0, 0, 0)  # Default value for SARIMA seasonal_order

if Forecast_Method_Name == 'SARIMA':
# Extract parameters for SARIMA

# This regular expression finds tuples within parentheses.
# Splitting the string to isolate order and seasonal_order
    order, seasonal_order= re.findall(r'\(.*?\)',Hyper_Parameters)   #['(1,1,1),(1,1,1,2)']
    print(f"(Order_Paramaters {order}  Seasonal_Paramaters {seasonal_order}")  

# Remove parentheses and split the parameters by commas to extract individual elements.
order = order.strip('()')
order_parameters = order.split(',')

seasonal_order=seasonal_order.strip('()')
seasonal_order_parameters = seasonal_order.split(',')



# Assign the values to the variables
p = int(order_parameters[0])
d = int(order_parameters[1])
q = int(order_parameters[2])


# Convert extracted string parameters into integers and assign them to corresponding variables. 
 
s_p = int(seasonal_order_parameters[0])
s_d = int(seasonal_order_parameters[1])
s_q = int(seasonal_order_parameters[2])
s_m = int(seasonal_order_parameters[3])

print(f"(Order_Paramaters {p,d,q}  Seasonal_Paramaters {s_p,s_d,s_q,s_m}")  


[0;31m---------------------------------------------------------------------------[0m
[0;31mValueError[0m                                Traceback (most recent call last)
File [0;32m<command-3474730922484391>, line 10[0m
[1;32m      3[0m parameter_set2 [38;5;241m=[39m ([38;5;241m0[39m, [38;5;241m0[39m, [38;5;241m0[39m, [38;5;241m0[39m)  [38;5;66;03m# Default value for SARIMA seasonal_order[39;00m
[1;32m      5[0m [38;5;28;01mif[39;00m Forecast_Method_Name [38;5;241m==[39m [38;5;124m'[39m[38;5;124mSARIMA[39m[38;5;124m'[39m:
[1;32m      6[0m [38;5;66;03m# Extract parameters for SARIMA[39;00m
[1;32m      7[0m 
[1;32m      8[0m [38;5;66;03m# This regular expression finds tuples within parentheses.[39;00m
[1;32m      9[0m [38;5;66;03m# Splitting the string to isolate order and seasonal_order[39;00m
[0;32m---> 10[0m     order, seasonal_order[38;5;241m=[39m re[38;5;241m.[39mfindall([38;5;124mr[39m[38;5;124m'[39m[38;5;124m\[39m[38;5;124

In [0]:
print(Forecast_Method_Name, n_periods,len(forecast_dates))

[0;31m---------------------------------------------------------------------------[0m
[0;31mValueError[0m                                Traceback (most recent call last)
File [0;32m<command-3474730922484391>, line 10[0m
[1;32m      3[0m parameter_set2 [38;5;241m=[39m ([38;5;241m0[39m, [38;5;241m0[39m, [38;5;241m0[39m, [38;5;241m0[39m)  [38;5;66;03m# Default value for SARIMA seasonal_order[39;00m
[1;32m      5[0m [38;5;28;01mif[39;00m Forecast_Method_Name [38;5;241m==[39m [38;5;124m'[39m[38;5;124mSARIMA[39m[38;5;124m'[39m:
[1;32m      6[0m [38;5;66;03m# Extract parameters for SARIMA[39;00m
[1;32m      7[0m 
[1;32m      8[0m [38;5;66;03m# This regular expression finds tuples within parentheses.[39;00m
[1;32m      9[0m [38;5;66;03m# Splitting the string to isolate order and seasonal_order[39;00m
[0;32m---> 10[0m     order, seasonal_order[38;5;241m=[39m re[38;5;241m.[39mfindall([38;5;124mr[39m[38;5;124m'[39m[38;5;124m\[39m[38;5;124

In [0]:


def automated_forecasts_for_all_types(data,  selected_columns,n_periods=n_periods,):
    if selected_columns is None:
        selected_columns = ["OffpeakConsumption", "StandardConsumption", "PeakConsumption"]

    all_forecasts = []

    # Determine which consumption types are in the selected columns
    consumption_types = ["OffpeakConsumption", "StandardConsumption", "PeakConsumption"]
    cons_types = [col for col in selected_columns if col in consumption_types]
    # print(cons_types)

    # Iterate through each unique customer ID in the dataset.
    for customer_id in data['CustomerID'].unique():
        customer_forecasts = {}

        # Ensure data is sorted by ReportingMonth
        customer_data = data[data['CustomerID'] == customer_id].sort_values('ReportingMonth')

        # Loop through each type of consumption to forecast individually.        
        for cons_type in cons_types:
            # Prepare the time series  by setting ReportingMonth as the index
            series = customer_data.set_index('ReportingMonth')[cons_type]

            forecast=None

            if Forecast_Method_Name == 'SARIMA':
                # Fit SARIMA model
                model = SARIMAX(series, order=(1,2,1), seasonal_order=(1,2,1,3))               
                #model = SARIMAX(series, order=(q,d,p), seasonal_order=(s_p,s_d,s_q,s_m))
                model_fit = model.fit(disp=False)
                forecast = model_fit.forecast(steps=n_periods)
                print("Forecast done with SARIMA model")

            # if forecast is not None:
            #  forecast[forecast < 0] = forecast*-1


            customer_forecasts[cons_type] = forecast
               # Construct a DataFrame to store the forecast results
        forecast_df_data = {
                                'ReportingMonth': forecast_dates,
                                'CustomerID': [customer_id] * n_periods
                            }
        for cons_type in cons_types:
            forecast_df_data[cons_type] = customer_forecasts.get(cons_type, [None] * n_periods)


        forecast_df = pd.DataFrame(forecast_df_data)        
        # Append each customer's forecast DataFrame to a list.
        all_forecasts.append(forecast_df)

    # Concatenate all individual forecast DataFrames into one.
    forecast_combined_df = pd.concat(all_forecasts, ignore_index=True)


    # Round the consumption columns if they are present
    for cons_type in cons_types:
        forecast_combined_df[cons_type] = forecast_combined_df[cons_type].round(2)
        # print(forecast_combined_df)

    # Return the combined forecast DataFrame.
    return forecast_combined_df

# Execute the forecasting function with the loaded data.
forecast_combined_df = automated_forecasts_for_all_types(pandas_df,selected_columns)


  self._init_dates(dates, freq)
  self._init_dates(dates, freq)
  self._init_dates(dates, freq)
  self._init_dates(dates, freq)
  self._init_dates(dates, freq)
  self._init_dates(dates, freq)


Forecast done with SARIMA model
Forecast done with SARIMA model
Forecast done with SARIMA model
Forecast done with SARIMA model


  self._init_dates(dates, freq)
  self._init_dates(dates, freq)
  self._init_dates(dates, freq)
  self._init_dates(dates, freq)
  self._init_dates(dates, freq)
  self._init_dates(dates, freq)
  self._init_dates(dates, freq)
  self._init_dates(dates, freq)


Forecast done with SARIMA model
Forecast done with SARIMA model
Forecast done with SARIMA model
Forecast done with SARIMA model


  self._init_dates(dates, freq)
  self._init_dates(dates, freq)
  self._init_dates(dates, freq)
  self._init_dates(dates, freq)
  self._init_dates(dates, freq)
  self._init_dates(dates, freq)


Forecast done with SARIMA model
Forecast done with SARIMA model
Forecast done with SARIMA model


  self._init_dates(dates, freq)
  self._init_dates(dates, freq)
  self._init_dates(dates, freq)
  self._init_dates(dates, freq)
  self._init_dates(dates, freq)
  self._init_dates(dates, freq)


Forecast done with SARIMA model
Forecast done with SARIMA model
Forecast done with SARIMA model


  self._init_dates(dates, freq)
  self._init_dates(dates, freq)


In [0]:

pd.set_option('display.max_columns', 50)
print(forecast_combined_df)


    ReportingMonth CustomerID  StandardConsumption  PeakConsumption
0       2024-08-01        109           3816157.73        777960.18
1       2024-09-01        109           4651832.54        812294.12
2       2024-10-01        109           5394481.60       -279202.47
3       2024-11-01        109           6421466.99        130907.30
4       2024-12-01        109           7673058.53         88173.13
..             ...        ...                  ...              ...
163     2026-03-01         49          50098868.35      -9789785.76
164     2026-04-01         49          55305727.22     -12610796.94
165     2026-05-01         49          60233932.89     -12485636.46
166     2026-06-01         49          65501238.78     -13357045.09
167     2026-07-01         49          71694190.73     -16522733.12

[168 rows x 4 columns]


In [0]:



# if (forecast_combined_df[selected_columns[2:]] < 0).any().any():
#         print("Negative values detected after transformation:")
#         print(forecast_combined_df[
#                                (forecast_combined_df['StandardConsumption'] < 0) |
#                                (forecast_combined_df['PeakConsumption'] < 0)])

In [0]:
print(forecast_combined_df)

    ReportingMonth CustomerID  OffpeakConsumption  StandardConsumption
0       2024-08-01        121        7.403130e+06         3.816158e+06
1       2024-09-01        121        6.414111e+06         4.651833e+06
2       2024-10-01        121        7.534321e+06         5.394482e+06
3       2024-11-01        121        9.307516e+06         6.421467e+06
4       2024-12-01        121        8.437162e+06         7.673059e+06
..             ...        ...                 ...                  ...
103     2027-03-01        241        9.880224e+07         1.292894e+08
104     2027-04-01        241        1.069110e+08         1.388569e+08
105     2027-05-01        241        1.161396e+08         1.477746e+08
106     2027-06-01        241        1.199529e+08         1.571062e+08
107     2027-07-01        241        1.290696e+08         1.679374e+08

[108 rows x 4 columns]


In [0]:
forecast_combined_df['UserForecastMethodID'] = UFMID

In [0]:
print(forecast_combined_df)

    ReportingMonth CustomerID  ...  StandardConsumption  UserForecastMethodID
0       2024-08-01        121  ...         3.816158e+06                   639
1       2024-09-01        121  ...         4.651833e+06                   639
2       2024-10-01        121  ...         5.394482e+06                   639
3       2024-11-01        121  ...         6.421467e+06                   639
4       2024-12-01        121  ...         7.673059e+06                   639
..             ...        ...  ...                  ...                   ...
103     2027-03-01        241  ...         1.292894e+08                   639
104     2027-04-01        241  ...         1.388569e+08                   639
105     2027-05-01        241  ...         1.477746e+08                   639
106     2027-06-01        241  ...         1.571062e+08                   639
107     2027-07-01        241  ...         1.679374e+08                   639

[108 rows x 5 columns]


In [0]:
print(forecast_combined_df.describe())

       OffpeakConsumption  StandardConsumption  UserForecastMethodID
count        1.080000e+02         1.080000e+02                 108.0
mean         4.690210e+07         5.838803e+07                 639.0
std          3.680347e+07         4.917561e+07                   0.0
min          6.414111e+06         3.816158e+06                 639.0
25%          1.552667e+07         1.573354e+07                 639.0
50%          3.608522e+07         4.362205e+07                 639.0
75%          7.243804e+07         9.263950e+07                 639.0
max          1.290696e+08         1.679374e+08                 639.0


In [0]:
forecast_combined_spark_df = spark.createDataFrame(forecast_combined_df)


In [0]:
# Define the properties for the database connection
write_url = "jdbc:sqlserver://esk-maz-sdb-san-dev-01.database.windows.net;databaseName=ESK-MAZ-SDB-SAN-DEV-01"
write_properties = {
    "user": "arul",
    "password": "aari@Singds.8734",
    "driver": "com.microsoft.sqlserver.jdbc.SQLServerDriver"
}

# Define the name of the target table
target_table_name = "dbo.ForecastFact"

# Write the DataFrame to the SQL table
forecast_combined_spark_df.write.jdbc(url=write_url, table=target_table_name, mode="append", properties=write_properties)



==============================           Projection     ====================================