Perform the necessary imports

In [None]:
import os
import psycopg2
import dotenv
import pandas

Establish a connection to the database

In [None]:
dotenv.load_dotenv()
params = {
    'database': os.getenv("DB_DATABASE"),
    'user': os.getenv("DB_USER"),
    'password': os.getenv("DB_PASSWORD"),
    'host': os.getenv("DB_HOST"),
    'port': os.getenv("DB_PORT"),
}
conn = psycopg2.connect(**params)
cursor = conn.cursor()

Retrieve the data from the database

In [None]:
cursor.execute(
""" 
WITH
_ts(created_at, time, transacted_price, fee_sum) AS (
	SELECT 
	    created_at
		, time_bucket_gapfill('1 minute', created_at) AS time
		, transacted_price
		, SUM(2*transaction_fee) OVER ( ORDER BY created_at
							           ROWS UNBOUNDED PRECEDING 
							          ) fee_sum
	FROM transactions
)  -- SELECT * FROM _ts; /*
, _ts_bucketed AS (
	SELECT 
		time
		, AVG(transacted_price) AS transacted_price
		, MAX(fee_sum) AS fee_sum
	FROM _ts 
	GROUP BY time
	ORDER BY time
) -- SELECT * FROM _ts_bucketed; /*
, _total_impedance AS (
	SELECT time, AVG(total_impedance) AS total_impedance FROM
		(SELECT time_bucket_gapfill('1 minute', tl.created_at) AS time, tl.tl_val+c.c_val AS total_impedance FROM 
		(SELECT created_at, SUM(transmission_line_impedance) AS tl_val 
			FROM (
				SELECT created_at, SQRT(transmission_line_resistance::float8*transmission_line_resistance::float8 + transmission_line_inductance::float8*transmission_line_inductance::float8) AS transmission_line_impedance
				FROM (
					SELECT created_at, transmission_line_resistance::float8, transmission_line_length::float8* transmission_line_inductance_per_meter::float8 as transmission_line_inductance
					FROM (
						SELECT created_at,
						jsonb_array_elements(jsonb_array_elements(grid_state->'circuits')->'loads')->'load_type'->'TransmissionLine'->'resistance' as transmission_line_resistance,
						jsonb_array_elements(jsonb_array_elements(grid_state->'circuits')->'loads')->'load_type'->'TransmissionLine'->'length' as transmission_line_length,
						jsonb_array_elements(jsonb_array_elements(grid_state->'circuits')->'loads')->'load_type'->'TransmissionLine'->'inductance_per_meter' as transmission_line_inductance_per_meter
						FROM grid_history
					)
				)
			) 
			WHERE transmission_line_impedance <> 0 GROUP BY created_at) AS tl 
			INNER JOIN (
				SELECT created_at, SUM(consumer_resistance) AS c_val
				FROM (
					SELECT created_at, (jsonb_array_elements(jsonb_array_elements(grid_state->'circuits')->'loads')->'load_type'->'Consumer'->'resistance')::float8 AS consumer_resistance
					FROM grid_history
				)
				WHERE consumer_resistance <> 0
				GROUP BY created_at
			) AS c 
			ON c.created_at = tl.created_at)
	GROUP BY time
	ORDER BY time
) -- SELECT * FROM _total_impedance; /*
, _consumer_voltage AS (
	SELECT time, AVG(consumer_voltage) AS consumer_voltage
	FROM
		(SELECT time_bucket_gapfill('1 minute', created_at) AS time, consumer_voltage
		FROM
			(SELECT created_at, AVG(voltage) AS consumer_voltage
				FROM (
					SELECT created_at,(jsonb_array_elements(jsonb_array_elements(grid_state->'circuits')->'loads')->'load_type'->'Consumer'->'voltage'->'oscilloscope_detail'->'amplitude')::float8 AS voltage
					FROM grid_history
				) 
				WHERE voltage <> 0 GROUP BY created_at
			)
		)
	GROUP BY time
	ORDER BY time
) -- SELECT * FROM _consumer_voltage; /*
, _transmission_line_voltage AS (
	SELECT time, AVG(transmission_line_voltage) AS transmission_line_voltage
	FROM
		(SELECT time_bucket_gapfill('1 minute', created_at) AS time, transmission_line_voltage
		FROM
			(SELECT created_at, AVG(voltage) AS transmission_line_voltage
				FROM (
					SELECT created_at,(jsonb_array_elements(jsonb_array_elements(grid_state->'circuits')->'loads')->'load_type'->'TransmissionLine'->'voltage'->'oscilloscope_detail'->'amplitude')::float8 AS voltage
					FROM grid_history
				) 
				WHERE voltage <> 0 GROUP BY created_at
			)
		)
	GROUP BY time
	ORDER BY time
) -- SELECT * FROM _transmission_line_voltage; /*
, _generator_voltage AS (
	SELECT time, AVG(generator_voltage) AS generator_voltage
	FROM
		(SELECT time_bucket_gapfill('1 minute', created_at) AS time, generator_voltage
		FROM
			(SELECT created_at, AVG(voltage) AS generator_voltage
				FROM (
					SELECT created_at, (jsonb_array_elements(jsonb_array_elements(grid_state->'circuits')->'generators')->'voltage'->'oscilloscope_detail'->'amplitude')::float8 AS voltage
					FROM grid_history
				) 
				WHERE voltage <> 0 GROUP BY created_at
			)
		)
	GROUP BY time
	ORDER BY time
) -- SELECT * FROM _generator_voltage; /*
, _appliance_data AS (
	WITH 
	_air_conditioner_data AS (
		SELECT time, AVG(air_conditioner) AS air_conditioner
		FROM (
			SELECT time_bucket_gapfill('1 minute', time) AS time, data AS air_conditioner
			FROM appliance_data
			WHERE appliance = 'air_conditioner' 
		)
		GROUP BY time
		ORDER BY time
	) -- SELECT * FROM _air_conditioner_data; /*
	, _air_purifier_data AS (
		SELECT time, AVG(air_purifier) AS air_purifier
		FROM (
			SELECT time_bucket_gapfill('1 minute', time) AS time, data AS air_purifier
			FROM appliance_data
			WHERE appliance = 'air_purifier' 
		)
		GROUP BY time
		ORDER BY time
	) -- SELECT * FROM _air_purifier_data; /*
	, _boiler_data AS (
		SELECT time, AVG(boiler) AS boiler
		FROM (
			SELECT time_bucket_gapfill('1 minute', time) AS time, data AS boiler
			FROM appliance_data
			WHERE appliance = 'boiler' 
		)
		GROUP BY time
		ORDER BY time
	) -- SELECT * FROM _boiler_data; /*
	, _coffee_data AS (
		SELECT time, AVG(coffee) AS coffee
		FROM (
			SELECT time_bucket_gapfill('1 minute', time) AS time, data AS coffee
			FROM appliance_data
			WHERE appliance = 'coffee' 
		)
		GROUP BY time
		ORDER BY time
	) -- SELECT * FROM _coffee_data; /*
	, _computer_data AS (
		SELECT time, AVG(computer) AS computer
		FROM (
			SELECT time_bucket_gapfill('1 minute', time) AS time, data AS computer
			FROM appliance_data
			WHERE appliance = 'computer' 
		)
		GROUP BY time
		ORDER BY time
	) -- SELECT * FROM _computer_data; /*
	, _dehumidifier_data AS (
		SELECT time, AVG(dehumidifier) AS dehumidifier
		FROM (
			SELECT time_bucket_gapfill('1 minute', time) AS time, data AS dehumidifier
			FROM appliance_data
			WHERE appliance = 'dehumidifier' 
		)
		GROUP BY time
		ORDER BY time
	) -- SELECT * FROM _dehumidifier_data; /*
	, _dishwasher_data AS (
		SELECT time, AVG(dishwasher) AS dishwasher
		FROM (
			SELECT time_bucket_gapfill('1 minute', time) AS time, data AS dishwasher
			FROM appliance_data
			WHERE appliance = 'dishwasher' 
		)
		GROUP BY time
		ORDER BY time
	) -- SELECT * FROM _dishwasher_data; /*
	, _dryer_data AS (
		SELECT time, AVG(dryer) AS dryer
		FROM (
			SELECT time_bucket_gapfill('1 minute', time) AS time, data AS dryer
			FROM appliance_data
			WHERE appliance = 'dryer' 
		)
		GROUP BY time
		ORDER BY time
	) -- SELECT * FROM _dryer_data; /*
	, _fan_data AS (
		SELECT time, AVG(fan) AS fan
		FROM (
			SELECT time_bucket_gapfill('1 minute', time) AS time, data AS fan
			FROM appliance_data
			WHERE appliance = 'fan' 
		)
		GROUP BY time
		ORDER BY time
	) -- SELECT * FROM _fan_data; /*
	, _freezer_data AS (
		SELECT time, AVG(freezer) AS freezer
		FROM (
			SELECT time_bucket_gapfill('1 minute', time) AS time, data AS freezer
			FROM appliance_data
			WHERE appliance = 'freezer' 
		)
		GROUP BY time
		ORDER BY time
	) -- SELECT * FROM _freezer_data; /*
	, _fridge_data AS (
		SELECT time, AVG(fridge) AS fridge
		FROM (
			SELECT time_bucket_gapfill('1 minute', time) AS time, data AS fridge
			FROM appliance_data
			WHERE appliance = 'fridge' 
		)
		GROUP BY time
		ORDER BY time
	) -- SELECT * FROM _fridge_data; /*
	, _internet_router_data AS (
		SELECT time, AVG(internet_router) AS internet_router
		FROM (
			SELECT time_bucket_gapfill('1 minute', time) AS time, data AS internet_router
			FROM appliance_data
			WHERE appliance = 'internet_router' 
		)
		GROUP BY time
		ORDER BY time
	) -- SELECT * FROM _internet_router_data; /*
	, _laptop_data AS (
		SELECT time, AVG(laptop) AS laptop
		FROM (
			SELECT time_bucket_gapfill('1 minute', time) AS time, data AS laptop
			FROM appliance_data
			WHERE appliance = 'laptop' 
		)
		GROUP BY time
		ORDER BY time
	) -- SELECT * FROM _laptop_data; /*
	, _micro_wave_oven_data AS (
		SELECT time, AVG(micro_wave_oven) AS micro_wave_oven
		FROM (
			SELECT time_bucket_gapfill('1 minute', time) AS time, data AS micro_wave_oven
			FROM appliance_data
			WHERE appliance = 'micro_wave_oven' 
		)
		GROUP BY time
		ORDER BY time
	) -- SELECT * FROM _micro_wave_oven_data; /*
	, _phone_charger_data AS (
		SELECT time, AVG(phone_charger) AS phone_charger
		FROM (
			SELECT time_bucket_gapfill('1 minute', time) AS time, data AS phone_charger
			FROM appliance_data
			WHERE appliance = 'phone_charger' 
		)
		GROUP BY time
		ORDER BY time
	) -- SELECT * FROM _phone_charger_data; /*
	, _printer_data AS (
		SELECT time, AVG(printer) AS printer
		FROM (
			SELECT time_bucket_gapfill('1 minute', time) AS time, data AS printer
			FROM appliance_data
			WHERE appliance = 'printer' 
		)
		GROUP BY time
		ORDER BY time
	) -- SELECT * FROM _printer_data; /*
	, _printer_3D_data AS (
		SELECT time, AVG(printer_3D) AS printer_3D
		FROM (
			SELECT time_bucket_gapfill('1 minute', time) AS time, data AS printer_3D
			FROM appliance_data
			WHERE appliance = 'printer_3D' 
		)
		GROUP BY time
		ORDER BY time
	) -- SELECT * FROM _printer_3D_data; /*
	, _radiator_data AS (
		SELECT time, AVG(radiator) AS radiator
		FROM (
			SELECT time_bucket_gapfill('1 minute', time) AS time, data AS radiator
			FROM appliance_data
			WHERE appliance = 'radiator' 
		)
		GROUP BY time
		ORDER BY time
	) -- SELECT * FROM _radiator_data; /*
	, _screen_data AS (
		SELECT time, AVG(screen) AS screen
		FROM (
			SELECT time_bucket_gapfill('1 minute', time) AS time, data AS screen
			FROM appliance_data
			WHERE appliance = 'screen' 
		)
		GROUP BY time
		ORDER BY time
	) -- SELECT * FROM _screen_data; /*
	, _solar_panel_data AS (
		SELECT time, AVG(solar_panel) AS solar_panel
		FROM (
			SELECT time_bucket_gapfill('1 minute', time) AS time, data AS solar_panel
			FROM appliance_data
			WHERE appliance = 'solar_panel' 
		)
		GROUP BY time
		ORDER BY time
	) -- SELECT * FROM _solar_panel_data; /*
	, _sound_system_data AS (
		SELECT time, AVG(sound_system) AS sound_system
		FROM (
			SELECT time_bucket_gapfill('1 minute', time) AS time, data AS sound_system
			FROM appliance_data
			WHERE appliance = 'sound_system' 
		)
		GROUP BY time
		ORDER BY time
	) -- SELECT * FROM _sound_system_data; /*
	, _tv_data AS (
		SELECT time, AVG(tv) AS tv
		FROM (
			SELECT time_bucket_gapfill('1 minute', time) AS time, data AS tv
			FROM appliance_data
			WHERE appliance = 'tv' 
		)
		GROUP BY time
		ORDER BY time
	) -- SELECT * FROM _tv_data; /*
	, _vacuum_data AS (
		SELECT time, AVG("vacuum") AS "vacuum"
		FROM (
			SELECT time_bucket_gapfill('1 minute', time) AS time, data AS "vacuum"
			FROM appliance_data
			WHERE appliance = 'vacuum' 
		)
		GROUP BY time
		ORDER BY time
	) -- SELECT * FROM _vacuum_data; /*
	, _washing_machine_data AS (
		SELECT time, AVG(washing_machine) AS washing_machine
		FROM (
			SELECT time_bucket_gapfill('1 minute', time) AS time, data AS washing_machine
			FROM appliance_data
			WHERE appliance = 'washing_machine' 
		)
		GROUP BY time
		ORDER BY time
	) -- SELECT * FROM _washing_machine_data; /*
	SELECT _tv_data.time
		, to_char(_tv_data.time, 'HH24:MI') AS day_time
		, _air_conditioner_data.air_conditioner
		, _air_purifier_data.air_purifier
		, _boiler_data.boiler
		, _coffee_data.coffee
		, _computer_data.computer
		, _dehumidifier_data.dehumidifier
		, _dishwasher_data.dishwasher
		, _dryer_data.dryer
		, _fan_data.fan
		, _freezer_data.freezer
		, _fridge_data.fridge
		, _internet_router_data.internet_router
		, _laptop_data.laptop
		, _micro_wave_oven_data.micro_wave_oven
		, _phone_charger_data.phone_charger
		, _printer_data.printer
		, _printer_3D_data.printer_3D
		, _radiator_data.radiator
		, _screen_data.screen
		, _solar_panel_data.solar_panel
		, _sound_system_data.sound_system
		, _tv_data.tv
		, _vacuum_data."vacuum"
		, _washing_machine_data.washing_machine
	FROM _air_conditioner_data
		JOIN _air_purifier_data ON _air_conditioner_data.time = _air_purifier_data.time
		JOIN _boiler_data ON _air_conditioner_data.time = _boiler_data.time
		JOIN _coffee_data ON _air_conditioner_data.time = _coffee_data.time
		JOIN _computer_data ON _air_conditioner_data.time = _computer_data.time
		JOIN _dehumidifier_data ON _air_conditioner_data.time = _dehumidifier_data.time 
		JOIN _dishwasher_data ON _air_conditioner_data.time = _dishwasher_data.time 
		JOIN _dryer_data ON _air_conditioner_data.time = _dryer_data.time 
		JOIN _fan_data ON _air_conditioner_data.time = _fan_data.time 
		JOIN _freezer_data ON _air_conditioner_data.time = _freezer_data.time 
		JOIN _fridge_data ON _air_conditioner_data.time = _fridge_data.time
		JOIN _internet_router_data ON _air_conditioner_data.time = _internet_router_data.time 
		JOIN _laptop_data ON _air_conditioner_data.time = _laptop_data.time 
		JOIN _micro_wave_oven_data ON _air_conditioner_data.time = _micro_wave_oven_data.time 
		JOIN _phone_charger_data ON _air_conditioner_data.time = _phone_charger_data.time
		JOIN _printer_data ON _air_conditioner_data.time = _printer_data.time 
		JOIN _printer_3D_data ON _air_conditioner_data.time = _printer_3D_data.time 
		JOIN _radiator_data ON _air_conditioner_data.time = _radiator_data.time 
		JOIN _screen_data ON _air_conditioner_data.time = _screen_data.time 
		JOIN _solar_panel_data ON _air_conditioner_data.time = _solar_panel_data.time 
		JOIN _sound_system_data ON _air_conditioner_data.time = _sound_system_data.time 
		JOIN _tv_data ON _air_conditioner_data.time = _tv_data.time 
		JOIN _vacuum_data ON _air_conditioner_data.time = _vacuum_data.time 
		JOIN _washing_machine_data ON _air_conditioner_data.time = _washing_machine_data.time
) -- SELECT * FROM _appliance_data; /*
, _ts_data AS (
	SELECT _ts_bucketed.time
		, to_char(_ts_bucketed.time, 'HH24:MI') AS day_time
		, fee_sum
		, total_impedance
		, LAG(total_impedance) OVER (
			ORDER BY _ts_bucketed.time ASC
		  ) lag_total_impedance
		, consumer_voltage
		, LAG(consumer_voltage) OVER (
			ORDER BY _ts_bucketed.time ASC
		  ) lag_consumer_voltage
		, transmission_line_voltage
		, LAG(transmission_line_voltage) OVER (
			ORDER BY _ts_bucketed.time ASC
		  ) lag_transmission_line_voltage
		, generator_voltage
		, LAG(generator_voltage) OVER (
			ORDER BY _ts_bucketed.time ASC
		  ) lag_generator_voltage
		, transacted_price
	FROM _ts_bucketed
		LEFT JOIN _total_impedance ON _ts_bucketed.time = _total_impedance.time
		LEFT JOIN _consumer_voltage ON _ts_bucketed.time = _consumer_voltage.time
		LEFT JOIN _transmission_line_voltage ON _ts_bucketed.time = _transmission_line_voltage.time
		LEFT JOIN _generator_voltage ON _ts_bucketed.time = _generator_voltage.time
)  -- SELECT * FROM _ts_data WHERE NOT (_ts_data IS NOT NULL); /*
, _combined_table AS (SELECT
	_ts_data.time --bucket
	, _ts_data.day_time
	, CAST(EXTRACT(HOUR FROM _ts_data.time) AS INTEGER) AS day_hour
	, CAST(EXTRACT(MINUTE FROM _ts_data.time) AS INTEGER) AS day_minute
	, fee_sum
	, COALESCE(total_impedance, lag_total_impedance) AS total_impedance
	, COALESCE(consumer_voltage, lag_consumer_voltage) AS consumer_voltage
	, COALESCE(transmission_line_voltage, lag_transmission_line_voltage) 
		AS transmission_line_voltage
	, COALESCE(generator_voltage, lag_generator_voltage) AS generator_voltage
	, transacted_price
	, air_conditioner
	, air_purifier
	, boiler
	, coffee
	, computer
	, dehumidifier
	, dishwasher
	, dryer
	, fan
	, freezer
	, fridge
	, internet_router
	, laptop
	, micro_wave_oven
	, phone_charger
	, printer
	, printer_3D
	, radiator
	, screen
	, solar_panel
	, sound_system
	, tv
	, "vacuum"
	, washing_machine
FROM _ts_data
	LEFT JOIN _appliance_data ON _ts_data.day_time = _appliance_data.day_time
)
SELECT 
	day_hour
	, day_minute
	, fee_sum
	, total_impedance
	, consumer_voltage
	, transmission_line_voltage
	, generator_voltage
	, transacted_price
	, air_conditioner
	, air_purifier
	, boiler
	, coffee
	, computer
	, dehumidifier
	, dishwasher
	, dryer
	, fan
	, freezer
	, fridge
	, internet_router
	, laptop
	, micro_wave_oven
	, phone_charger
	, printer
	, printer_3D
	, radiator
	, screen
	, solar_panel
	, sound_system
	, tv
	, "vacuum"
	, washing_machine
FROM _combined_table
WHERE (_combined_table IS NOT NULL);

--*/ --*/ ---*/ ----*/ ----*/
""")
data = cursor.fetchall()

Close the database connection

In [None]:
cursor.close()
conn.close()

Load the data into a dataframe

In [None]:
colum_names = ["day_hour",
               "day_minute",
               "fee_sum",
               "total_impedance",
               "consumer_voltage",
               "transmission_line_voltage",
               "generator_voltage",
               "transacted_price",
               "air_conditioner",
               "air_purifier",
               "boiler",
               "coffee",
               "computer",
               "dehumidifier",
               "dishwasher",
               "dryer",
               "fan",
               "freezer",
               "fridge",
               "internet_router",
               "laptop",
               "micro_wave_oven",
               "phone_charger",
               "printer",
               "printer_3D",
               "radiator",
               "screen",
               "solar_panel",
               "sound_system",
               "tv",
               "vacuum",
               "washing_machine"]
df = pandas.DataFrame(data, columns=colum_names)
df

Split the data

In [None]:
from sklearn.model_selection import train_test_split
features = ["day_hour", 
            "day_minute", 
            "fee_sum",  
            "total_impedance", 
            "consumer_voltage",
            "transmission_line_voltage", 
            "generator_voltage",  
            "air_conditioner",
            "air_purifier", 
            "boiler", 
            "coffee", 
            "computer", 
            "dehumidifier", 
            "dishwasher", 
            "dryer",
            "fan", 
            "freezer", 
            "fridge", 
            "internet_router", 
            "laptop", 
            "micro_wave_oven", 
            "phone_charger", 
            "printer", 
            "printer_3D", 
            "radiator", 
            "screen", 
            "solar_panel", 
            "sound_system", 
            "tv", 
            "vacuum", 
            "washing_machine"]
X = df[features]
y = df["transacted_price"]
X_train, X_test, y_train, y_test = train_test_split(X, y)

Standardise the data

In [None]:
from sklearn.preprocessing import StandardScaler
scaler = StandardScaler()
# X_sc = scaler.fit_transform(X_train, y_train)

Perform feature reduction

In [None]:
from sklearn.decomposition import PCA
pca = PCA(21)
# X_pca1 = pca1.fit_transform(X_sc, y_train)
# pca1.explained_variance_ratio_
# # X_pca1

Create an ensemble model

In [None]:
from sklearn.linear_model import Ridge
from sklearn.tree import DecisionTreeRegressor
from sklearn.svm import SVR
from sklearn.ensemble import VotingRegressor

rr = Ridge(solver='lbfgs', positive=True)
dtr = DecisionTreeRegressor(max_depth=3)
svr = SVR()
voting_regressor = VotingRegressor(estimators=[("rr", rr), ("dtr", dtr), ("svr", svr)])

Create the pipeline

In [None]:
from sklearn.pipeline import Pipeline
pipe = Pipeline([('scaler', scaler), ('pca', pca), ('voting_regressor', voting_regressor)])
pipe.fit(X_train, y_train)

In [None]:
pipe.score(X_test, y_test)

Pickle the model

In [None]:
import pickle
with open("price-model.pkl", 'wb') as f:
    pickle.dump(pipe, f)

In [None]:
# import pickle
# from sklearn.pipeline import Pipeline
# 
# with open("/home/ruan/projects/Open-Electricity-Market/backend/ml-api/app/models/price/price-model.pkl", "rb") as f:
#     price_model: Pipeline = pickle.load(f)

In [None]:
# y_test

In [None]:
# price_model.predict(X_test)

In [None]:
from sklearn.utils import estimator_html_repr

with open("/home/ruan/projects/amplify_model/pipeline.html", "w") as f:
    f.write(estimator_html_repr(pipe))