# Import data

In [1]:
import pandas as pd
import numpy as np

nfh_input_file_path='needforheat_raw_measurements.parquet'
remeha_input_file_path='remeha_export.parquet'
rhc_output_file_path='reducedheatcarb_raw_measurements.parquet'

# usually, two decimals suffice for displaying DataFrames (NB internally, precision may be higher)
pd.options.display.precision = 2

import sys
sys.path.append('../data/')
sys.path.append('../view/')

%load_ext autoreload


%matplotlib inline
%matplotlib widget
import pylab as plt
import itertools
from plotter import Plot
from tqdm.notebook import tqdm

In [2]:
units_to_mathtext = property_types = {
    'ppm' : r'$ppm$',
    'kWh' : r'$kWh$',
    'm3' : r'$m^{3}$',
    'degC' : r'$°C$',
    'W' : r'$W$',
    'V' : r'$V$',
    '0' : r'$[-]$',
    'bool': r'$0 = False; 1 = True$',
    'W_m_2' : r'$W\cdotm^{-1}$'
}

In [3]:
%%time
# Attempt to read the Parquet file
try:
    df_nfh = pd.read_parquet(
        nfh_input_file_path, 
        engine='pyarrow',
        use_nullable_dtypes=True
        )
    print("File was successfully read without specifying compression codec.")
except Exception as e:
    print(f"Error reading file: {e}")


File was successfully read without specifying compression codec.
CPU times: user 1.25 s, sys: 131 ms, total: 1.38 s
Wall time: 1.16 s


In [4]:
df_nfh.info()

<class 'pandas.core.frame.DataFrame'>
MultiIndex: 2814412 entries, (424197, 'device', 'twomes-p1-reader-firmware', Timestamp('2023-12-09 12:08:00+0100', tz='Europe/Amsterdam'), 'dsmr_version__0') to (410260, 'device', 'twomes-co2-occupancy-scd41-m5coreink-firmware', Timestamp('2024-04-01 02:00:00+0200', tz='Europe/Amsterdam'), 'temp_in__degC')
Data columns (total 1 columns):
 #   Column  Dtype 
---  ------  ----- 
 0   value   string
dtypes: string(1)
memory usage: 52.7+ MB


In [5]:
df_nfh

Unnamed: 0_level_0,Unnamed: 1_level_0,Unnamed: 2_level_0,Unnamed: 3_level_0,Unnamed: 4_level_0,value
id,source_category,source_type,timestamp,property,Unnamed: 5_level_1
424197,device,twomes-p1-reader-firmware,2023-12-09 12:08:00+01:00,dsmr_version__0,-1.0
424197,device,twomes-p1-reader-firmware,2023-12-09 12:08:00+01:00,e_ret_hi_cum__kWh,1.936
424197,device,twomes-p1-reader-firmware,2023-12-09 12:08:00+01:00,e_ret_lo_cum__kWh,
424197,device,twomes-p1-reader-firmware,2023-12-09 12:08:00+01:00,e_use_hi_cum__kWh,0.000
424197,device,twomes-p1-reader-firmware,2023-12-09 12:08:00+01:00,e_use_lo_cum__kWh,-0.000
...,...,...,...,...,...
410260,device,twomes-co2-occupancy-scd41-m5coreink-firmware,2024-04-01 02:00:00+02:00,heartbeat,7
410260,device,twomes-co2-occupancy-scd41-m5coreink-firmware,2024-04-01 02:00:00+02:00,occupancy__p,4
410260,device,twomes-co2-occupancy-scd41-m5coreink-firmware,2024-04-01 02:00:00+02:00,onboarded__p,4
410260,device,twomes-co2-occupancy-scd41-m5coreink-firmware,2024-04-01 02:00:00+02:00,rel_humidity__0,0.835


In [6]:
%%time
# Attempt to read the Parquet file
try:
    df_remeha = pd.read_parquet(
        remeha_input_file_path, 
        engine='pyarrow',
        use_nullable_dtypes=True
        )
    print("File was successfully read without specifying compression codec.")
except Exception as e:
    print(f"Error reading file: {e}")


File was successfully read without specifying compression codec.
CPU times: user 3.66 s, sys: 1.11 s, total: 4.77 s
Wall time: 3.34 s


In [7]:
df_remeha.info()

<class 'pandas.core.frame.DataFrame'>
MultiIndex: 36753631 entries, (404873, 'batch_import', 'remeha', Timestamp('2024-03-23 02:07:46.428000+0100', tz='Europe/Amsterdam'), 'temp_set__degC') to (495906, 'batch_import', 'remeha', Timestamp('2024-02-07 07:03:54.640000+0100', tz='Europe/Amsterdam'), 'temp_in__degC')
Data columns (total 1 columns):
 #   Column  Dtype  
---  ------  -----  
 0   value   Float64
dtypes: Float64(1)
memory usage: 677.5 MB


In [8]:
df_remeha

Unnamed: 0_level_0,Unnamed: 1_level_0,Unnamed: 2_level_0,Unnamed: 3_level_0,Unnamed: 4_level_0,value
id,source_category,source_type,timestamp,property,Unnamed: 5_level_1
404873,batch_import,remeha,2024-03-23 02:07:46.428000+01:00,temp_set__degC,18.0
404873,batch_import,remeha,2024-03-23 02:07:46.428000+01:00,dhw_temp_out__degC,-327.68
404873,batch_import,remeha,2024-03-23 02:07:46.428000+01:00,fan_rotations__min_1,0.0
404873,batch_import,remeha,2024-03-23 02:07:46.428000+01:00,temp_out__degC,-327.68
404873,batch_import,remeha,2024-03-23 02:07:46.428000+01:00,g_use_ch_inf_cum__kWh,5729.0
...,...,...,...,...,...
495906,batch_import,remeha,2024-02-07 07:03:54.640000+01:00,ch_set_fan_rotations_max__min_1,5900.0
495906,batch_import,remeha,2024-02-07 07:03:54.640000+01:00,g_use_ch_inf_cum__kWh,912.0
495906,batch_import,remeha,2024-02-07 07:03:54.640000+01:00,temp_ret__degC,20.3
495906,batch_import,remeha,2024-02-07 07:03:54.640000+01:00,ch_water_pump_speed__0,0.0


In [9]:
%%time
df_remeha['value'] = df_remeha['value'].astype(str)

CPU times: user 22.3 s, sys: 2.12 s, total: 24.4 s
Wall time: 24 s


In [10]:
df_remeha.info()

<class 'pandas.core.frame.DataFrame'>
MultiIndex: 36753631 entries, (404873, 'batch_import', 'remeha', Timestamp('2024-03-23 02:07:46.428000+0100', tz='Europe/Amsterdam'), 'temp_set__degC') to (495906, 'batch_import', 'remeha', Timestamp('2024-02-07 07:03:54.640000+0100', tz='Europe/Amsterdam'), 'temp_in__degC')
Data columns (total 1 columns):
 #   Column  Dtype 
---  ------  ----- 
 0   value   object
dtypes: object(1)
memory usage: 642.4+ MB


## Merge

In [11]:
%%time
df = pd.concat([df_nfh, df_remeha])

CPU times: user 11.7 s, sys: 2.26 s, total: 14 s
Wall time: 13.9 s


## Initial exploration: size, ids, start & stop times per id

In [12]:
df.info()

<class 'pandas.core.frame.DataFrame'>
MultiIndex: 39568043 entries, (424197, 'device', 'twomes-p1-reader-firmware', Timestamp('2023-12-09 12:08:00+0100', tz='Europe/Amsterdam'), 'dsmr_version__0') to (495906, 'batch_import', 'remeha', Timestamp('2024-02-07 07:03:54.640000+0100', tz='Europe/Amsterdam'), 'temp_in__degC')
Data columns (total 1 columns):
 #   Column  Dtype 
---  ------  ----- 
 0   value   object
dtypes: object(1)
memory usage: 687.1+ MB


In [13]:
len(df)

39568043

In [14]:
%%time
# deduplicate the measurements
df = df.reset_index().drop_duplicates().set_index(['id', 'source_category', 'source_type', 'timestamp', 'property']).sort_index()

CPU times: user 47.8 s, sys: 8.45 s, total: 56.3 s
Wall time: 56.2 s


In [15]:
len(df)

39568043

In [16]:
list(df.index.get_level_values('id').unique())

[401632,
 403603,
 404873,
 410260,
 412715,
 424197,
 429011,
 430062,
 434931,
 438708,
 440152,
 444964,
 449134,
 450051,
 450298,
 456638,
 458000,
 458852,
 478667,
 483173,
 487126,
 487289,
 491671,
 494233,
 495906]

In [17]:
len(df.index.get_level_values('id').unique())

25

In [18]:
%%time
df.reset_index().groupby(['id', 'source_type'])['timestamp'].agg(['min', 'max'])

CPU times: user 6.43 s, sys: 1.58 s, total: 8.01 s
Wall time: 8 s


Unnamed: 0_level_0,Unnamed: 1_level_0,min,max
id,source_type,Unnamed: 2_level_1,Unnamed: 3_level_1
401632,enelogic,2024-01-01 00:00:00+01:00,2024-04-01 00:00:00+02:00
401632,remeha,2024-01-25 01:40:20.168000+01:00,2024-04-02 01:59:15.638000+02:00
401632,twomes-co2-occupancy-scd41-m5coreink-firmware,2024-02-05 14:29:00+01:00,2024-04-01 02:00:00+02:00
401632,twomes-p1-reader-firmware,2024-02-05 14:35:01+01:00,2024-04-01 02:00:00+02:00
403603,enelogic,2024-01-01 00:00:00+01:00,2024-04-01 00:00:00+02:00
...,...,...,...
494233,twomes-p1-reader-firmware,2024-01-14 14:00:00+01:00,2024-03-29 14:50:00+01:00
495906,enelogic,2023-12-11 00:00:00+01:00,2024-04-01 00:00:00+02:00
495906,remeha,2023-12-17 09:18:31.376000+01:00,2024-04-02 01:59:30.615000+02:00
495906,twomes-co2-occupancy-scd41-m5coreink-firmware,2024-01-17 14:09:00+01:00,2024-04-01 02:00:00+02:00


In [19]:
df['value'].count()

39568043

In [20]:
df.duplicated().any()

True

In [21]:
df.info()

<class 'pandas.core.frame.DataFrame'>
MultiIndex: 39568043 entries, (401632, 'batch_import', 'remeha', Timestamp('2024-01-25 01:40:20.168000+0100', tz='Europe/Amsterdam'), 'boiler_status__str') to (495906, 'device', 'twomes-p1-reader-firmware', Timestamp('2024-04-01 02:00:00+0200', tz='Europe/Amsterdam'), 'heartbeat')
Data columns (total 1 columns):
 #   Column  Dtype 
---  ------  ----- 
 0   value   object
dtypes: object(1)
memory usage: 687.1+ MB


In [22]:
df.describe()

Unnamed: 0,value
count,39568043.0
unique,312775.0
top,0.0
freq,7291399.0


In [23]:
df

Unnamed: 0_level_0,Unnamed: 1_level_0,Unnamed: 2_level_0,Unnamed: 3_level_0,Unnamed: 4_level_0,value
id,source_category,source_type,timestamp,property,Unnamed: 5_level_1
401632,batch_import,remeha,2024-01-25 01:40:20.168000+01:00,boiler_status__str,8.0
401632,batch_import,remeha,2024-01-25 01:40:20.168000+01:00,g_use_ch_inf_cum__kWh,11314.0
401632,batch_import,remeha,2024-01-25 01:40:20.168000+01:00,g_use_dhw_inf_cum__kWh,4064.0
401632,batch_import,remeha,2024-01-25 01:40:20.168000+01:00,temp_out__degC,-327.68
401632,batch_import,remeha,2024-01-25 01:40:20.168000+01:00,temp_ret__degC,67.5
...,...,...,...,...,...
495906,device,twomes-p1-reader-firmware,2024-04-01 01:59:31+02:00,e_use_hi_cum__kWh,6007.863
495906,device,twomes-p1-reader-firmware,2024-04-01 01:59:31+02:00,e_use_lo_cum__kWh,4658.435
495906,device,twomes-p1-reader-firmware,2024-04-01 01:59:31+02:00,meter_code__str,E0031
495906,device,twomes-p1-reader-firmware,2024-04-01 02:00:00+02:00,g_use_cum__m3,5020.112


In [24]:
list(df.index.get_level_values('source_category').unique())

['batch_import', 'cloud_feed', 'device']

In [25]:
list(df.index.get_level_values('source_type').unique())

['remeha',
 'enelogic',
 'twomes-co2-occupancy-scd41-m5coreink-firmware',
 'twomes-p1-reader-firmware']

In [26]:
list(df.index.get_level_values('property').unique())

['boiler_status__str',
 'g_use_ch_inf_cum__kWh',
 'g_use_dhw_inf_cum__kWh',
 'temp_out__degC',
 'temp_ret__degC',
 'temp_sup__degC',
 'ch_set_fan_rotations_max__min_1',
 'ch_set_fan_rotations_min__min_1',
 'ch_water_pump_speed__0',
 'dhw_flow__l_min_1',
 'dhw_temp_out__degC',
 'fan_rotations__min_1',
 'gas_valve__str',
 'power_ch_max__kW',
 'temp_ch_sup_max__degC',
 'temp_in__degC',
 'temp_set__degC',
 'e_ret_hi_cum__kWh',
 'e_ret_lo_cum__kWh',
 'e_use_hi_cum__kWh',
 'e_use_lo_cum__kWh',
 'g_use_cum__m3',
 'e_ret_cum__kWh',
 'e_use_cum__kWh',
 'battery_voltage__V',
 'co2__ppm',
 'heartbeat',
 'occupancy__p',
 'onboarded__p',
 'rel_humidity__0',
 'dsmr_version__0',
 'meter_code__str']

In [27]:
df.groupby(['source_type']).size()

source_type
enelogic                                            69215
remeha                                           36753631
twomes-co2-occupancy-scd41-m5coreink-firmware     1279811
twomes-p1-reader-firmware                         1465386
dtype: int64

In [28]:
df.groupby(['source_category', 'source_type', 'property']).size()

source_category  source_type                                    property                       
batch_import     remeha                                         boiler_status__str                 2244389
                                                                ch_set_fan_rotations_max__min_1    2148837
                                                                ch_set_fan_rotations_min__min_1    2148850
                                                                ch_water_pump_speed__0             2233095
                                                                dhw_flow__l_min_1                  2233105
                                                                dhw_temp_out__degC                 2233109
                                                                fan_rotations__min_1               2233115
                                                                g_use_ch_inf_cum__kWh              2244362
                                                

## Write to parquet file(s)

In [29]:
%%time 
df.to_parquet(rhc_output_file_path, index=True, engine='pyarrow')

CPU times: user 15.6 s, sys: 2.06 s, total: 17.7 s
Wall time: 17.7 s


In [31]:
# %%time 
# for home_id in tqdm(homes):
#     df.xs(home_id, drop_level=False).to_parquet(f'{home_id}_raw_measurements.parquet', index=True, engine='pyarrow')

## Write to csv file(s)

In [30]:
homes = list(df.index.get_level_values('id').unique())

### Write raw measurements per home to zipped .CSV files

In [32]:
%%time 
for home_id in tqdm(homes):
    try:
        # df_meas_home =  pd.read_parquet(
        #     f'{home_id}_raw_measurements.parquet', 
        #     engine='pyarrow',
        #     use_nullable_dtypes=True
        # )
        df.xs(home_id, drop_level=False).to_csv(
            f'{home_id}_raw_measurements.zip',
            encoding='utf-8',
            compression= dict(method='zip',
                              archive_name=f'{home_id}_raw_measurements.csv'),
            date_format='%Y-%m-%dT%H:%M:%S%z'
        )
    except FileNotFoundError as e:
        print(f"Error: {e}. Skipping file {home_id}_raw_measurements.parquet.")
        continue     


  0%|          | 0/25 [00:00<?, ?it/s]

CPU times: user 7min 58s, sys: 3.24 s, total: 8min 1s
Wall time: 8min 1s
