In [0]:
#Connection to ADLS bus01prod02 container where pilot data is present
storage_account = "adlsweudpbus01prod02"
key=dbutils.secrets.get(scope="credentials", key="SP-Password")
id=dbutils.secrets.get(scope="credentials", key="clientId")
spark.conf.set("fs.azure.account.auth.type." + storage_account + ".dfs.core.windows.net", "OAuth")
spark.conf.set("fs.azure.account.oauth.provider.type." + storage_account + ".dfs.core.windows.net", "org.apache.hadoop.fs.azurebfs.oauth2.ClientCredsTokenProvider")
spark.conf.set("fs.azure.account.oauth2.client.id." + storage_account + ".dfs.core.windows.net", id)
spark.conf.set("fs.azure.account.oauth2.client.secret." + storage_account + ".dfs.core.windows.net", key)
spark.conf.set("fs.azure.account.oauth2.client.endpoint." + storage_account + ".dfs.core.windows.net", "https://login.microsoftonline.com/505cca53-5750-4134-9501-8d52d5df3cd1/oauth2/token")

%run /Workspace/Repos/kroeper@tbdir.net/Utils/Storage_connection

import sys
sys.path.insert(0,"/Workspace/Repos/Misc/Utils/")

from storageConnection import *
import pyspark.sql.functions as F
from pyspark.sql import Window
from pyspark.sql import types
from pyspark.sql.functions import unix_timestamp
from pyspark.sql.functions import when



conn = StorageConnection('adls_ctp', 'sql_rd')

path = "abfss://al-ctp-pilot-contracts@adlsweudpbus01prod02.dfs.core.windows.net/data/"
df = spark.read.format("delta").load(path)

In [0]:
device_ids = [
    'ctp-0020008236', 'ctp-0020008009', 'ctp-0020008214', 'ctp-0020006799',
    'ctp-0020007916',  'ctp-0020007025', 'ctp-0020006875', 'ctp-0020007843',
    'ctp-0020007885', 'ctp-0020007877', 'ctp-0020008119', 'ctp-0440008057'
]
start_date = '2022-12-01'
end_date = '2023-11-30'
prec_sig = '243'                            #[NACT = 0, ACTIVE = 1, DESD = 2, SNA = 3]
ElHeatPwrReq_Cval_CCU = '4626'             #[-1600, 1612.75] KW/ SNA = 65535
HVAC_BrkResist_CompHeat_Stat = '5661'    #0 == Battery was heated/ 1 == Cabin was heated  --> prec done?/ charging done?
Status_Plugin_Charging = '239'          #[NACT = 0, ACTIVE = 1, ERR = 2, SNA = 3]
#Battery_Temperature = '318'
Outside_Temperature = '4378'
Inside_Temperature = '219'
Standstill_id = '23'
BatteryTemperature_24V1 = '6049'
#BatteryTemperature_24V2 = '5130'



#MomentaryChargePower = '264'        #momentary available Energy that can be input to reach SOC = 100%
#MomentaryDischargePower = '265'     #momentary available Energy that can be used --> 264 & 265 both proportional to SOC
HV_CHRG_PWR = '275'

In [0]:
df.filter((F.col('SignalID') == BatteryTemperature_24V1) | (F.col('SignalID') == BatteryTemperature_24V2) | (F.col('SignalID') == Battery_Temperature)).filter((df['SignalDate'] >= start_date) & (df['SignalDate'] <= end_date)).filter(df['DeviceID'].isin(device_ids)).display()

In [0]:
#filter for the neccessary signals and devices in a given time and pivot the signals 
df = df.filter((F.col('SignalID') == prec_sig) | (F.col('SignalID') == Status_Plugin_Charging) | (F.col('SignalID') == ElHeatPwrReq_Cval_CCU) | (F.col('SignalID') == HVAC_BrkResist_CompHeat_Stat) | (F.col('SignalID') == HV_CHRG_PWR) | (F.col('SignalID') == BatteryTemperature_24V1) | (F.col('SignalID') == Outside_Temperature) | (F.col('SignalID') == Inside_Temperature) | (F.col('SignalID') == Standstill_id)).filter((df['SignalDate'] >= start_date) & (df['SignalDate'] <= end_date)).filter(df['DeviceID'].isin(device_ids)).orderBy('SignalTimestamp').select('DeviceID', 'CustomerName', 'SignalTimestamp','SignalID', 'SignalValue')

df = df.groupBy('DeviceID', 'CustomerName', 'SignalTimestamp').pivot('SignalID', [prec_sig, Status_Plugin_Charging, ElHeatPwrReq_Cval_CCU, HVAC_BrkResist_CompHeat_Stat, HV_CHRG_PWR, BatteryTemperature_24V1, Outside_Temperature, Inside_Temperature, Standstill_id]).agg(F.first('SignalValue'))

In [0]:
df.display()


In [0]:
window = Window.partitionBy("DeviceID").orderBy("SignalTimestamp").rowsBetween(-sys.maxsize, 0)

#ForwardFill
df = df.withColumn(Standstill_id, F.last(Standstill_id, ignorenulls = True).over(window))
df = df.withColumn(prec_sig, F.last(prec_sig, ignorenulls = True).over(window)) #preconditioning
df = df.withColumn(Status_Plugin_Charging, F.last(Status_Plugin_Charging, ignorenulls = True).over(window)) #plugcharge
df = df.withColumn(HVAC_BrkResist_CompHeat_Stat, F.last(HVAC_BrkResist_CompHeat_Stat, ignorenulls = True).over(window)) #heating status



In [0]:
#creates lag column for preconditioning to increment the ID when current row prec is 1 and the previous row is != 1 or null
window2 = Window.partitionBy("DeviceID").orderBy('SignalTimestamp')
df = df.withColumn('lag_243', F.lag(prec_sig).over(window2))
df = df.withColumn('lag_23', F.lag(Standstill_id).over(window2))

#create ID
df = df.withColumn('Prec_ID', F.when(((F.col('lag_243') != 1) | (F.col('lag_243') == None)) & (F.col(prec_sig) == 1), F.sum(prec_sig).over(window))) 
df = df.withColumn('Standstill_ID', F.when(((F.col('lag_23') != 1) | (F.col('lag_23') == None)) & (F.col(Standstill_id) == 1), F.sum(Standstill_id).over(window))) 

#forwardfill ID
df = df.withColumn('Standstill_ID', F.last('Standstill_ID', ignorenulls = True).over(window))
df = df.withColumn('Prec_ID', F.last('Prec_ID', ignorenulls = True).over(window)).filter((F.col('243') == 1) & (F.col(Status_Plugin_Charging) == 1))

#create lag column for charging power to find out if sign of previous row is unequal current row
df = df.withColumn('lag_275', F.lag(HV_CHRG_PWR).over(window2).cast('float'))

#verify change of sign
df = df.withColumn('SignChanged', F.when((F.col(HV_CHRG_PWR) > 0) & (F.col('lag_275') < 0) | ((F.col(HV_CHRG_PWR) < 0) & (F.col('lag_275') > 0)), 1).otherwise(0))

#df = df.filter(~(F.col('4626') == 'null'))


In [0]:
df.display()

In [0]:
from pyspark.sql.functions import unix_timestamp
#cast HVAC power to use it for calculation
df = df.withColumn(ElHeatPwrReq_Cval_CCU, F.col(ElHeatPwrReq_Cval_CCU).cast('float'))

#creates month coulmn to find out overall energy consumption in a given month for a particular vehicle
df = df.withColumn('month', F.month(F.col('SignalTimestamp')))

#find out:
# average HVAC enrgy consumption, 
# number of the sign changes, 
# if enrgy consumption was done, 
# month of a rpeconditioning event,
# the consumption time,
# if preconditioning was done for the cabin or the battery

win = Window.partitionBy("DeviceID", "CustomerName", "Standstill_ID")

df = df.withColumn('End Time Standstill', F.max("SignalTimestamp").over(win))
df = df.withColumn('Start Time Standstill', F.min("SignalTimestamp").over(win))

In [0]:
df.display()

In [0]:
df = df.groupBy('DeviceID', 'CustomerName', 'Standstill_ID', 'Prec_ID').agg(F.avg(ElHeatPwrReq_Cval_CCU).alias('avg_energyConsumption(KW)'), F.min('SignalTimestamp').alias('start time Preconditioning'), F.max('SignalTimestamp').alias('end time Preconditioning'), F.sum('SignChanged').alias('EnergyConsumed_byBool'), F.avg(HV_CHRG_PWR).alias('EnergyConsumed_byVal'), F.last('month').alias('month'), (unix_timestamp(F.max('SignalTimestamp')) - unix_timestamp(F.min('SignalTimestamp'))).alias('consumption_time Preconditioning'), F.sum(F.when(F.col(HVAC_BrkResist_CompHeat_Stat) == 0, 1).otherwise(0)).alias("num_Battery_Consumption"),  F.sum(F.when(F.col(HVAC_BrkResist_CompHeat_Stat) == 1, 1).otherwise(0)).alias("num_Cabin_Consumption"), F.min('Start Time Standstill').alias('Start Time Standstill'), F.max('End Time Standstill').alias('End Time Standstill'), F.max(BatteryTemperature_24V1).alias('Battery_EndTemperature'), F.min(BatteryTemperature_24V1).alias('Battery_StartTemperature'), F.avg(Outside_Temperature).alias('Avg_OutsideTemperature'), F.min(Inside_Temperature).alias('Start_InsideTemperature'), F.max(Inside_Temperature).alias('End_InsideTemperature'))
#df.display()

#set energy consumption to zero if it is null -> null means no energy conumption happened in the event of a preconditioning, because no value was send by the signal
df = df.withColumn('avg_energyConsumption(KW)', F.when(F.col('avg_energyConsumption(KW)').isNull(), 0).otherwise(F.col('avg_energyConsumption(KW)')))
# calculates amount of energy consumed by multiplying consumption time with power
df = df.withColumn('Energy_Amount(KWh)', (df['consumption_time Preconditioning'] * df['avg_energyConsumption(KW)'])/60) #divide by 60 to get KW*h from KW*min

In [0]:
#sometimes preconditioning happens but no energy is consumed. Usually these events happen when preconditioning lasts for a short time (eg. ID #63 for ctp-0020006875 (Hamburger Hochbahn AG)) 

#consumption_time in minutes -> used to calculate Energy_Amount (but divided by 60 to get to KWh)

#I didn't apply a weighted average because the Signal 4646 is sending data every 30sec +- 2sec

#sample data from 2023-04-01 to 2023-08-30

#I used signal 239 which is the "Status Plugin-Charging" Signal to verify if the charging was done or not  

#I extracted the month to be able to calculate the entire energy consumed for each vehicle in a specific month

In [0]:
df.display()

In [0]:
#filters the events in which charging did now happen 
df = df.filter(~(((F.col('EnergyConsumed_byVal').isNull()) | ((F.col('EnergyConsumed_byVal') < 0) & (F.col('EnergyConsumed_byVal') > -1))) & (F.col('EnergyConsumed_byBool') == 0)))

In [0]:
df.display()

In [0]:
df = df.filter(F.col('consumption_time Preconditioning') <= 500000)
df.display()

In [0]:
conn.sql_rd.write_table(data=df,table_name='rd.CTP_Sales_Preconditioning_Consumption',mode='append',truncate=False)

[0;31m---------------------------------------------------------------------------[0m
[0;31mTypeError[0m                                 Traceback (most recent call last)
File [0;32m<command-3220495237065401>, line 2[0m
[1;32m      1[0m conn[38;5;241m.[39msql_rd[38;5;241m.[39mwrite_table(data[38;5;241m=[39mdf,table_name[38;5;241m=[39m[38;5;124m'[39m[38;5;124mrd.CTP_Sales_Preconditioning_Consumption[39m[38;5;124m'[39m,mode[38;5;241m=[39m[38;5;124m'[39m[38;5;124mappend[39m[38;5;124m'[39m,truncate[38;5;241m=[39m[38;5;28;01mFalse[39;00m)
[0;32m----> 2[0m conn[38;5;241m.[39msql_rd[38;5;241m.[39mexecute_query()

[0;31mTypeError[0m: StorageConnection.sql_rd.execute_query() missing 1 required positional argument: 'query'