In [2]:
import pandas as pd
from pathlib import Path
import pyarrow as pa  # not yet needed, might need it later
import pyarrow.parquet as pq
import pyarrow.compute as pc  # not yet needed, might need it later.

## Reading Parquet-systems metrics metadata
First, look at the parquet metrics and see which ones you wish to study.

In [3]:
metrics_dir = Path('../../data/raw/parquet-metrics/')
# Note: We generally give a directory, rather than an individual file, to the next step.
metrics_pq = pq.ParquetDataset(metrics_dir)
metrics_df = metrics_pq.read().to_pandas()
metrics_df.columns

Index(['system_id', 'metric_id', 'sensor_name', 'common_name', 'raw_units',
       'units', 'calc_scale', 'calc_offset', 'calc_details',
       'aggregation_type', 'source_type', 'source_id', 'comments',
       'standard_name'],
      dtype='object')

In [4]:
metrics_df['sensor_name'].unique()

array(['dc_power', 'ac_power', 'ac_current', 'ac_voltage',
       'dc_pos_current', 'das_battery_voltage', 'poa_irradiance',
       'ambient_temp', 'module_temp_1', 'module_temp_2', 'module_temp_3',
       'inverter_temp', 'dc_pos_voltage', 'das_temp', 'inv1_ac_power',
       'inv1_dc_power', 'inv2_ac_power', 'inv2_dc_power', 'inv3_ac_power',
       'inv3_dc_power', 'inv4_ac_power', 'inv4_dc_power', 'inv5_ac_power',
       'inv5_dc_power', 'inv6_ac_power', 'inv6_dc_power', 'inv7_ac_power',
       'inv7_dc_power', 'inv1_dc_current', 'inv2_dc_current',
       'inv3_dc_current', 'inv4_dc_current', 'inv5_dc_current',
       'inv6_dc_current', 'inv7_dc_current', 'inv1_temp', 'inv2_temp',
       'inv3_temp', 'inv5_temp', 'inv6_temp', 'inv1_dc_voltage',
       'inv2_dc_voltage', 'inv3_dc_voltage', 'inv4_dc_voltage',
       'inv5_dc_voltage', 'inv6_dc_voltage', 'inv7_dc_voltage',
       'inv4_dc_temp', 'inv7_temp', 'ac_power_metered_kW', 'ac_frequency',
       'module_temp_F', 'ambient_temp_F'

In [5]:
metrics_df['common_name'].unique()

array(['DC power', 'AC power', 'AC current', 'AC voltage', 'DC current',
       'DC voltage battery', 'Irradiance POA', 'Temperature ambient',
       'Temperature module', 'Temperature inverter', 'DC voltage',
       'Temperature panel', 'Temperature other', 'AC other', 'Wind speed',
       'System status', 'Irradiance GHI', 'Humidity', 'Irradiance DNI',
       'AC current other', 'AC energy', 'Wind direction',
       'Temperature refdevice', 'DC current other',
       'Temperature enclosure', 'AC power other', 'PR', 'AC energy other',
       'Irradiance albedo', 'Irradiance other', 'DC current battery',
       'Battery other', 'DC voltage other', 'AC voltage other',
       'Temperature backsheet', 'DC energy other', 'DC power other',
       'System other'], dtype=object)

In [6]:
# collect power_factors
metrics_pf = metrics_df.loc[metrics_df.loc[:, 'sensor_name'].str.contains('factor')]
metrics_pf

Unnamed: 0,system_id,metric_id,sensor_name,common_name,raw_units,units,calc_scale,calc_offset,calc_details,aggregation_type,source_type,source_id,comments,standard_name
88,1200,4198,power_factor,AC other,-,-,1.0,0.0,,,,,,power_factor__4198
94,1201,2799,power_factor,AC other,-,-,1.0,0.0,,avg,,,,power_factor__2799
141,1202,2805,power_factor,AC other,-,-,1.0,0.0,,,,,,power_factor__2805
154,1203,2901,inv1_power_factor,AC other,-,-,1.0,0.0,,avg,,,,inv1_power_factor__2901
155,1203,2908,inv2_power_factor,AC other,-,-,1.0,0.0,,avg,,,,inv2_power_factor__2908
449,1276,3038,power_factor,AC other,-,-,1.0,0.0,,avg,,,,power_factor__3038
464,1277,3053,power_factor,AC other,-,-,1.0,0.0,,avg,,,,power_factor__3053
484,1278,3068,inv1_power_factor,AC other,-,-,1.0,0.0,,avg,,,,inv1_power_factor__3068
485,1278,3077,inv2_power_factor,AC other,-,-,1.0,0.0,,avg,,,,inv2_power_factor__3077
526,1284,957,power_factor,AC other,-,-,1.0,0.0,,avg,,,,power_factor__957


In [7]:
def get_filtered_metrics(system_id: int):
    '''Restrict the metrics to the site in question.'''
    return metrics_df.loc[metrics_df.loc[:,'system_id'] == system_id]

In [8]:
my_metrics = get_filtered_metrics(1200)
my_metrics

Unnamed: 0,system_id,metric_id,sensor_name,common_name,raw_units,units,calc_scale,calc_offset,calc_details,aggregation_type,source_type,source_id,comments,standard_name
51,1200,2751,ac_power,AC power,W,W,1.0,0.0,ac_power_metered_kW,avg,,,,ac_power__2751
52,1200,2752,dc_power,DC power,W,W,1.0,0.0,,avg,,,,dc_power__2752
53,1200,2757,inv1_ac_power,AC power,W,W,1.0,0.0,,avg,,,,inv1_ac_power__2757
54,1200,2758,inv1_dc_power,DC power,W,W,1.0,0.0,,avg,,,,inv1_dc_power__2758
55,1200,2762,inv2_ac_power,AC power,W,W,1.0,0.0,,avg,,,,inv2_ac_power__2762
56,1200,2763,inv2_dc_power,DC power,W,W,1.0,0.0,,avg,,,,inv2_dc_power__2763
57,1200,2767,inv3_ac_power,AC power,W,W,1.0,0.0,,avg,,,,inv3_ac_power__2767
58,1200,2768,inv3_dc_power,DC power,W,W,1.0,0.0,,avg,,,,inv3_dc_power__2768
59,1200,2772,inv4_ac_power,AC power,W,W,1.0,0.0,,avg,,,,inv4_ac_power__2772
60,1200,2773,inv4_dc_power,DC power,W,W,1.0,0.0,,avg,,,,inv4_dc_power__2773


For the current purposes, let's grab DC power, temperature, and irradiance
*However*, data is stored in the main parquet framework by metric_id,
so just manually choose the ids you want.

In [9]:
def get_metric_ids_and_names(system_id: int, selected_metrics,
                             return_type = 'dict_paired'):
    pruned_metrics = metrics_df.loc[
        (metrics_df.loc[:, 'system_id'] == system_id)
        & (metrics_df.loc[:, 'metric_id'].isin(selected_metrics))
    ]
    # basic error_checking
    if pruned_metrics.shape[0] == 0:
        raise ValueError(
            f'Mismatch between system_id {system_id} and metrics '
            + f'{selected_metrics}.\n No values returned!'
        )
    # grab metrics and names in the *same* order
    sorted_metrics = tuple(pruned_metrics['metric_id'])
    sorted_names = tuple(pruned_metrics['common_name'])
    if return_type == 'dict_paired':
        return {
            sorted_metrics[j]: sorted_names[j]
            for j in range(len(sorted_metrics))
        }
    elif return_type == 'dict_separate':
        return {
            'metrics': sorted_metrics,
            'names': sorted_names
        }
    elif return_type == 'tuples':
        return (sorted_metrics, sorted_names)
    else:
        raise ValueError('Not a valid return_type.')
    

## Now read the relevant data

We now read the actual dataset, using our metric_ids to filter.

In [11]:
def read_and_filter(system_id: int, selected_metrics,
                    name_change='add'):
    '''Grab the data,
    filter for only our metrics,
    name the metrics, 
    and save the data by-metric
    
    Parameters:
    system_id: int
        The system ID number you wish to replace.
    selected_metrics: iterable
        The metrics you want to choose
    name_change: str, any of "add", "drop", or "none"
        If "add", add on common_name to metric_id
        If "replace", replace metric_id with common_name
        If "none", do nothing.
    '''
    access_system_dir = Path(f'../../data/raw/systems/parquet/{system_id}/')
    current_pq = pq.ParquetDataset(
        access_system_dir,
        filters= [('metric_id', 'in', selected_metrics)])
    current_df = current_pq.read().to_pandas()
    # even with raw data, duplicates can happen!
    # only drop *complete* duplicates for now.
    current_df = current_df.drop_duplicates()
    if(name_change == 'add') or (name_change == 'replace'):
        # replace numbers with meaningful names
        correspondence_dict = get_metric_ids_and_names(
            system_id=system_id,
            selected_metrics=selected_metrics,
            return_type='dict_paired'
        )
        current_df['common_name'] = current_df['metric_id'].apply(
            lambda x: correspondence_dict[x]
        )
        # put metric name next to metric id.
        col_reorder = ['measured_on', 'utc_measured_on', 'metric_id', 'common_name', 'value']
        current_df = current_df[col_reorder]
        if name_change == 'replace':
            current_df.drop(
                columns='metric_id'
            )
    return current_df


Experiments to see what the different options' storage size is.

In [13]:
power_factor = read_and_filter(1200, [2754], 'add')

In [15]:
power_factor.loc[25:40, 'value']

25      56.0108
26      55.1223
27      54.4838
28      53.7838
29      53.3707
30    3472.5300
31    3472.5300
32    3472.5300
33    3472.5300
34    3472.5300
35    3472.5300
36    3472.5300
37      61.3418
38      61.3729
39      60.2054
40      56.9338
Name: value, dtype: float64

In [None]:
add_34 = read_and_filter(34,
                         [2694, 2695, 2679, 2686, 2687, 2688, 2689],
                         'add')
add_34.info(verbose=True)

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 2359161 entries, 0 to 2359160
Data columns (total 5 columns):
 #   Column           Dtype         
---  ------           -----         
 0   measured_on      datetime64[ns]
 1   utc_measured_on  datetime64[ns]
 2   metric_id        int32         
 3   metric_name      object        
 4   value            float64       
dtypes: datetime64[ns](2), float64(1), int32(1), object(1)
memory usage: 81.0+ MB


In [47]:
add_34 = add_34.astype({'metric_name':'str'})

In [48]:
add_34.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 2359161 entries, 0 to 2359160
Data columns (total 5 columns):
 #   Column           Dtype         
---  ------           -----         
 0   measured_on      datetime64[ns]
 1   utc_measured_on  datetime64[ns]
 2   metric_id        int32         
 3   metric_name      object        
 4   value            float64       
dtypes: datetime64[ns](2), float64(1), int32(1), object(1)
memory usage: 81.0+ MB


In [23]:
replace_34 = read_and_filter(34,
                         [2694, 2695, 2679, 2686, 2687, 2688, 2689],
                         'replace')
replace_34.info(verbose=True)

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 2359161 entries, 0 to 2359160
Data columns (total 5 columns):
 #   Column           Dtype         
---  ------           -----         
 0   measured_on      datetime64[ns]
 1   utc_measured_on  datetime64[ns]
 2   metric_id        int32         
 3   value            float64       
 4   metric_name      object        
dtypes: datetime64[ns](2), float64(1), int32(1), object(1)
memory usage: 81.0+ MB


In [24]:
noname_34 = read_and_filter(34,
                         [2694, 2695, 2679, 2686, 2687, 2688, 2689],
                         'none')
noname_34.info(verbose=True)

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 2359161 entries, 0 to 2359160
Data columns (total 4 columns):
 #   Column           Dtype         
---  ------           -----         
 0   measured_on      datetime64[ns]
 1   utc_measured_on  datetime64[ns]
 2   metric_id        int32         
 3   value            float64       
dtypes: datetime64[ns](2), float64(1), int32(1)
memory usage: 63.0 MB


In [12]:
def save_to_parquet(df, system_id, save_dir, pivot_choices):
    save_path = Path(save_dir + f'{system_id}/')
    df.to_parquet(save_path,
                  partition_cols = pivot_choices,
                  index=None)


In [30]:
save_to_parquet(
    noname_34,
    '34',
    './temp/noname/',
    'metric_id'
)

In [None]:
save_to_parquet(
    replace_34,
    '34',
    './temp/replace/',
    'common_name'
)

In [None]:
save_to_parquet(
    add_34,
    '34',
    './temp/addname/',
    'common_name'
)

In [33]:
save_to_parquet(
    add_34,
    '34',
    './temp/addid/',
    'metric_id'
)

Result: Significant differences in opened Dataframe size; small differences in stored size.

As per suggestions, keep to 'none' for now.

## Let's put it all together

In [16]:
systems_cleaned = pd.read_csv('../../data/core/systems_cleaned.csv')
parquet_systems = systems_cleaned.loc[
    systems_cleaned.loc[:, 'is_lake_parquet_data']
]  # is already boolean!
irrad_parquet_systems = parquet_systems.loc[
    parquet_systems.loc[:, 'has_irrad_data']
]
my_irrad_parquet_indices = list(
    irrad_parquet_systems.system_id.values
)

In [152]:
j = 60
system_id = my_irrad_parquet_indices[j]
cur_mets = get_filtered_metrics(system_id=system_id)
cur_mets = cur_mets[['system_id', 'metric_id', 'sensor_name', 'common_name', 'units']]
cur_mets.loc[
    (cur_mets.loc[:, 'sensor_name'].str.contains('Pow'))
    | (cur_mets.loc[:, 'sensor_name'].str.contains('pow'))
    | (cur_mets.loc[:, 'sensor_name'].str.contains('temp'))
    | (cur_mets.loc[:, 'sensor_name'].str.contains('Temp'))
    | (cur_mets.loc[:, 'common_name'].str.contains('ambient'))
    | (cur_mets.loc[:, 'sensor_name'].str.contains('rrad'))
]

Unnamed: 0,system_id,metric_id,sensor_name,common_name,units
1590,4903,82702,AmbTemp_C_Avg,Temperature ambient,C
1593,4903,82689,CR1000Temp_C_Avg,Temperature enclosure,C
1609,4903,82740,InvTempHeatsink_C_Avg,Temperature inverter,C
1610,4903,82738,InvTempInternalAir_C_Avg,Temperature inverter,C
1611,4903,82739,InvTempInverterAir_C_Avg,Temperature inverter,C
1656,4903,82741,SEWSAmbientTemp_C_Avg,Temperature ambient,C
1657,4903,82742,SEWSModuleTemp_C_Avg,Temperature module,C
1670,4903,82798,TCTemps_C_Avg_1,Temperature other,C
1671,4903,82807,TCTemps_C_Avg_10,Temperature other,C
1672,4903,82808,TCTemps_C_Avg_11,Temperature other,C


In [None]:
my_metrics = []
where_to_store = '../../data/sorted_by_metric/systems/parquet/'

In [141]:
current_df = read_and_filter(system_id=system_id, selected_metrics=my_metrics, name_change='none')
save_to_parquet(current_df, system_id, where_to_store, ['metric_id'])

In [153]:
no_power_sites = [1294, 1295, 1296, 1297, 1298, 1299, 1301,
                  1302, 1303, 1304, 1309, 1311, 4901, 4902, 4903]