To connect to the Dask dashboard use ssh portforwarding: http://distributed.dask.org/en/latest/web.html#id1  

# Imports

In [1]:
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import seaborn as sns
import matplotlib as mpl
mpl.style.use('default')
import glob
import dask
import dask.dataframe as dd
from dask.distributed import Client, LocalCluster, progress, fire_and_forget
from dask import delayed
from tqdm import tqdm

#%matplotlib inline

# Setting up LocalCluster & run it
Might not be needed, but experienced that `processes=False, n_workers=1` are the best options to have.

In [2]:
#cluster = LocalCluster(processes=False, n_workers=8)
cluster = LocalCluster(processes=False, n_workers=1)
#cpu_worker = cluster.workers[0]
#cpu_worker.name = 'cpu'
#cpu_worker.set_resources(CPU=90)

client=Client(cluster, processes=True)

In [3]:
cluster

In [4]:
client

0,1
Client  Scheduler: inproc://145.52.169.135/8416/1  Dashboard: http://localhost:8787/status,Cluster  Workers: 1  Cores: 8  Memory: 17.02 GB


# Defining functions

In [26]:
def read_weather_data():
    """
    Reads in the weather Pandas DataFrame.
    :return: Pandas DataFrame
    """
    # Check if UTC to gmt+1 conversion is being handled correctly
    weather = pd.read_csv('F://datc//opschaler//weather_data//knmi_10_min_raw_data//output//df_combined_uncleaned.csv',
                          delimiter='\t', comment='#',
                          parse_dates=['datetime'])
    weather = weather.set_index(['datetime'])
    weather = weather.astype('float32')
    return weather


def smartmeter_data():
    """
    Reads in the file paths and dwelling id's of the smartmeter data.
    :return: file_paths, dwelling_ids, both as lists.
    """
    path = 'F://datc//opschaler//smartmeter_data//'
    file_paths = np.array(glob.glob(path + "*.csv"))

    print('Detected %s smartmeter_data files.' % len(file_paths))
    dwelling_ids = np.array(list((map(lambda x: x[-15:-4], file_paths))))

    return file_paths, dwelling_ids


def reduce_memory(df):
    """
    Reduces memory footprint of the input dataframe.
    Changes float64 columns to float32 dtype.
    """
    columns = df.columns
    memory_before = df.memory_usage(deep=False).sum() / 2**30 # convert bytes to GB

    for column in columns:
        if df[column].dtype == 'float64':
            df[column] = df[column].astype('float32')
        
    #memory_after = df.memory_usage(deep=False).sum() / 2**30 # convert bytes to GB
    #print('Memory uasge reduced from %.3f GB to %.3f GB' % (memory_before, memory_after))
    
    return df


@delayed(nout=2)
def clean_prepare_smart_gas(file_path, dwelling_id):
    """
    Input is a dwelling_id.csv file.
    Output are cleaned & prepared dataframes (smart, gas).

    :param file_path: path to 'dwelling_id.csv' file
    :return: Smart and gas Pandas DataFrames
    """
    df = pd.read_csv(file_path, delimiter=';', header=0)
    df = df.rename(index=str, columns={'Timestamp': 'datetime', 'gasTimestamp': 'datetime'})

    smart = df.iloc[:, :7]
    gas = df.iloc[:, 7:]
    
    del df
    
    try:
        smart['datetime'] = pd.to_datetime(smart['datetime'])
        gas['datetime'] = pd.to_datetime(gas['datetime'])
    except:
        print('datetime column contains non-datetime values')
        smart = clean_datetime(smart)
        gas = clean_datetime(gas)
        smart['datetime'] = pd.to_datetime(smart['datetime'])
        gas['datetime'] = pd.to_datetime(gas['datetime'])

    smart = smart.set_index(['datetime'])
    gas = gas.set_index(['datetime'])

    smart = reduce_memory(smart)
    gas = reduce_memory(gas)

    return smart, gas


@delayed
def clean_datetime(df):
    """
    TODO: Speed up the function
    Input should be a df with a column called 'datetime'.
    This function checks wether a row in the df.datetime column can be parsed to a Pandas datetime object,
    by trying pd.to_datetime() on it.
    If it fails it will replace that row with np.nan().
    Finally this function will return the df with the NaN rows dropped.
    It only drops the row if the datetime column contains a NaN.

    :param df: Pandas DataFrame containing a datetime column called 'datetime'.
    :return: Pandas DataFrame
    """
    for i in range(len(df)):
        try:
            pd.to_datetime(df.datetime[i])
        except ValueError:
            print('-----')
            print('ValueError at index = %s' % i)
            print(df.datetime[i])
            df.datetime = df.datetime.replace(df.datetime[i], np.nan)
    df = df.dropna(subset=['datetime'])
    return df


@delayed(nout=2)
def resample_dfs(smart, gas):
    smart = smart.resample('10s').mean()
    gas = gas.resample('H').mean()
    
    return smart, gas


@delayed
def create_hour_df(smart, gas, weather, dwelling_id):    
    # resample to original sample rates, just to be sure
    smart = smart.resample('10s').mean()
    gas = gas.resample('H').mean()
    
    # resample smart df to one hour, define what to do per column
    to_mean = ['ePower', 'ePowerReturn']
    to_last = ['eMeter', 'eMeterReturn', 'eMeterLow', 'eMeterLowReturn']
    
    # Resample columns by said methods, this will not resample the index yet.
    smart[to_mean] = smart[to_mean].resample('H').mean() # take mean of values
    smart[to_last] = smart[to_last].resample('H').last() # take 'last' known value, see pandas documentation for more info
    # ^ Vectors validates baldiri's wishes
    
    # Resample the complete df to H, including indices.
    smart = smart.resample('H').mean()
    
    # Down sample weather df to one hour
    weather = weather.resample('H').mean()
    
    # Combine gas, smart, weather into one df
    df_hour = pd.merge(smart, gas, left_index=True, right_index=True)
    df_hour = pd.merge(df_hour, weather, left_index=True, right_index=True)
    
    # Add a dwelling id column
    df_hour['dwelling'] = dwelling_id
    
    return df_hour


@delayed
def create_10s_df(smart, gas, weather, dwelling_id):
    gas = gas.resample('10s').ffill()  # Up sample gas to 10s by forward filling the values
    # Calculate gasPower column, is this the right way? Or should we ffill it?
    # Currently this code makes it so there is one gasPower value per hour
    gas['gasPower'] = gas['gasMeter'].diff()

    weather = weather.resample('10s').ffill()  # forward fill because the raw data is the 10 minute mean
    
    # Combine gas, smart, weather into one df
    df_10s = pd.merge(smart, gas, left_index=True, right_index=True)
    df_10s = pd.merge(df_10s, weather, left_index=True, right_index=True)
    df_10s['dwelling'] = dwelling_id # add a dwelling id column
    
    return df_10s


# Can't get this to work delayed because of object oriented stuff
def plot_nans(df, dwelling_id, resample_to):
    """
    Create a heatmap of the NaNs in the input DataFrame.
    :param df: Pandas DataFrame
    :param df: String to resample to, for example '1T' or 'H'
    :param dwelling_id: String
    :return: Seaborn heatmap as a Figure
    """
    
    df = df.isnull()
    # Downsample to make all data visible
    df = df.resample(resample_to).sum()  # Downsample to make small NaNs visible
    df = df.apply(lambda x: x > 0, 1)  # Replace values >0 with 1

    # Reindex datetimes
    # https://stackoverflow.com/questions/41046630/set-time-formatting-on-a-datetime-index-when-plotting-pandas-series
    try:
        df.index = df.index.to_period('D')
    except:
        print('plot_nans could not set df.index.to_period')

    # Plot heatmap
    n = int(len(df)*0.1)  # Choose amount of yticklabels to show
    
    #fig = plt.figure(clear=True)
    
    try:
        ax = sns.heatmap(df, cmap='Reds', square=False, vmin=0, cbar=False, yticklabels=n*2, cbar_kws={})
    except TypeError:
        print('plot_nans ValueError')
        ax = sns.heatmap(df, cmap='Reds', square=False, vmin=0, cbar=False, cbar_kws={})

    # Set cbar ticks manually
    #cbar = fig.collections[0].colorbar
    #cbar.set_ticks([0, 1])
    #cbar.set_ticklabels(['Not NaN', 'NaN'])

    # Correct layout
    ax.invert_yaxis()
    ax.tick_params(axis='x', rotation=90)
    ax.tick_params(axis='y', rotation=0)
    ax.set(xlabel='Column [-]', ylabel='Index [-]')
    ax.set_title('Dwelling ID: '+dwelling_id)
    
    fig = ax.get_figure()
    
    #.tight_layout()
    #fig.show()
    #print('Saving heatmap')
    #fig.savefig('F://datc//opschaler//nan_information//figures//' + dwelling_id + '.png', dpi=1200)
    #savefig crashes dask
    
    return fig


@delayed
def df_nan_checker(df, threshold_percentage):
    """
    TODO: Parellalize, as in one column per core/worker?
    Checks each column in the input dataframe for NaNs.
    Outputs the amount of NaNs behind each other, including the start and stop index, per column as a sublist.
    For example when the dataframe has three columns.
    Output is in the form of:
    [[column_one_info], [column_two_info], [column_three_info]]
    With the column_..._info being in the form of:
    [start_index, stop_index, amount_of_NaNs]

    :param df: Pandas DataFrame
    :param threshold_percentage: Filter output based on NaN streaks being larger than x % of the total length of the dataframe.
    :return: Pandas DataFrame
    """
    columns = df.columns
    df = df.isnull()
    output = []
    length = len(columns)
    
    
    @delayed
    def check_rows(df, column_name):
        column_info = []
        temp = []
        x = False

        for j, value in enumerate(df[column_name]):
            if x == False and value == True:
                temp.append(df.index[j])
                x = True
            elif x == True and value == True:
                temp.append(df.index[j])
            elif x == True and value == False:
                column_info.append(temp)
                temp = []
                x = False

        lengths = []

        for array in column_info:
            lengths.append([array[0], array[-1], len(array)])

        return lengths

    
    for i in range(length):
        lengths = check_rows(df, columns[i])
        output.append(lengths)
    
    @delayed
    def list_to_df(output):
        # Convert df_info to a readable dataframe instead of list

        """
        Row per column from the 'output' list
        Columns: start-index, stop-index, NaN streak
        """

        df_info = pd.DataFrame(columns=['Column name', 'Start index', 'Stop index', 'Amount of NaNs'])
        length = len(output)
        column_names = []
        starts = []
        stops = []
        amounts = []

        for column in range(length):
            #print('At iteration %s of %s' % (column, length))
            for i in range(len(output[column])):
                column_names.append(df.columns[column])
                starts.append(output[column][i][0])
                stops.append(output[column][i][1])
                amounts.append(output[column][i][2])

        #print('Appending NaN info to df')
        # Convert list to pd series
        column_names = pd.Series(column_names)
        starts = pd.Series(starts)
        stops = pd.Series(stops)
        amounts = pd.Series(amounts)
        # Append pd series to a column
        df_info['Column name'] = column_names.values
        df_info['Start index'] = starts.values
        df_info['Stop index'] = stops.values
        df_info['Amount of NaNs'] = amounts.values

        percentage = (df_info['Amount of NaNs'] / len(df)) * 100
        df_info.drop(df_info[percentage < threshold_percentage].index, inplace=True)
        return df_info

    df_info = list_to_df(output)
    
    return df_info


def save_df_unprocessed(df, dwelling_id):
    """
    Save unprocessed dataframe.
    :param df: Pandas DataFrame
    :param dwelling_id: String
    :return: None
    """
    dir = 'F://datc//opschaler//combined_gas_smart_weather_dfs//unprocessed//'
    df.to_csv(dir + dwelling_id + '.csv', sep='\t', index=True)
    print('Saved unprocessed df: %s' % dwelling_id)
    return


@delayed
def drop_nan_streaks_above_threshold(df, df_nan_table, thresholds):
    """
    Drops NaN streaks from the df when they are larger then the threshold value.
    This function also inputs df_nan_table because it already has been made in the smart_gas_nan_checker.
    :param df: Pandas DataDrame to process NaNs off
    :param df_nan_table: NaN info Pandas DataFrame of the input df
    :param thresholds: Dictionary {'column_name':column_threshold}, column_threshold has to be an integer.
    :return: Pandas DataFrame
    """
    df_nan_table = df_nan_table.compute()

    # Check for NaN streaks > threshold and drop them from the df
    length = len(df_nan_table['Amount of NaNs'])
    #print('df_nan_table length: %s' % length)

    indices_to_drop = []
    for i, amount in enumerate(df_nan_table['Amount of NaNs']):
        selected_column = df_nan_table['Column name'][i]
        try:
            if amount > thresholds[selected_column]:
                start_index = (df_nan_table['Start index'][i])
                stop_index = (df_nan_table['Stop index'][i])
                indices = df[start_index:stop_index].index
                #print('Enumeration %s of %s | From \t %s \t to \t %s | column %s | NaN streak length: %s'
                #      % (i, length, start_index, stop_index, selected_column, (len(indices))))
                try:
                    indices_to_drop += indices
                except:
                    print('Could not add indices to indices_to_drop list')
            else:
                #print('amount < threshold')
                pass
        except:
            #print('No threshold detected for %s' % selected_column)
            pass

    #print('Dropping NaN streaks > threshold')
    l1 = len(df)
    df = df.drop(indices_to_drop)
    l2 = len(df)
    #print('Removed %s rows' % (l1-l2))
    return df


def save_df_processed(df, dwelling_id):
    """
    Save interpolated dataframe.
    :param df: Pandas DataFrame
    :param dwelling_id: String
    :return: None
    """
    dir = 'F://datc//opschaler//combined_gas_smart_weather_dfs//processed//'
    df.to_csv(dir + dwelling_id + '.csv', sep='\t', index=True)
    print('Saved processed df: %s' % dwelling_id)
    return

def save_nan_table(nan_table, dwelling_id):
    """
    Save nan table
    :param df: Pandas DataFrame
    :param dwelling_id: String
    :return: None
    """
    dir = 'F://datc//opschaler//nan_information//'
    nan_table.to_csv(dir + dwelling_id + '.csv', sep='\t', index=True)
    print('Saved nan table from: %s' % dwelling_id)
    return

def save_nan_fig(fig, dwelling_id):
    """
    Save nan fig
    :param df: Pandas DataFrame
    :param dwelling_id: String
    :return: None
    """
    dir = 'F://datc//opschaler//nan_information//figures//'
    #fig.tight_layout()
    #fig.tight_layout
    fig.savefig(dir + dwelling_id + '.png', dpi=300)
    #print('Saved nan fig from: %s' % dwelling_id)
    return


# Main loop

In [None]:
%%time

client.restart()

weather = read_weather_data()
weather_rs = weather.resample('10min').mean()

file_paths, dwelling_ids = smartmeter_data()

#file_paths = file_paths[:5]
#dwelling_ids = dwelling_ids[:5]

dfs_hour = []
dfs_10s = []
dfs_nan_table_10s = []
dfs_nan_table_hour = []

nan_figs_10s = []
nan_figs_hour = []

dfs_10s_partly_processed = []
dfs_hour_partly_processed = []

smarts = []
gass = []


for i, path in enumerate(file_paths):
    dwelling_id = dwelling_ids[i]
    
    smart, gas = clean_prepare_smart_gas(path, dwelling_id)
    
    # client.persist: Start computing these variables and keep them in memory
    smart = smart.persist()
    gas = gas.persist()

    smart, gas = resample_dfs(smart, gas)
    
    smart = smart.persist()
    gas = gas.persist()
    
    df_hour = create_hour_df(smart, gas, weather, dwelling_id)
    df_10s = create_10s_df(smart, gas, weather, dwelling_id)
    
    df_hour = df_hour.persist()
    df_10s = df_10s.persist()
    
    #Slow, plus low cpu usage...
    #nan_fig_10s = plot_nans(df_10s, dwelling_id+' 10s sample rate', '1T')
    #nan_fig_hour = plot_nans(df_hour, dwelling_id+' one hour sample rate', 'H')
    
    #df_nan_table_10s = df_nan_checker(df_10s, 0)
    #df_nan_table_hour = df_nan_checker(df_hour, 0)
    
    #df_nan_table_10s = df_nan_table_10s.persist()
    #df_nan_table_hour = df_nan_table_hour.persist()
    
    #thresholds_10s = {'eMeter': 6, 'ePower': 6, 'gasMeter': 72, 'T': 36, 'Q': 18}
    #df_10s_partly_processed = drop_nan_streaks_above_threshold(df_10s, df_nan_table_10s, thresholds_10s)
    #df_10s_partly_processed = df_10s_partly_processed.persist()
    
    #thresholds_hour = {'gasMeter': 4, 'T': 4, 'Q': 4}
    #df_hour_partly_processed = drop_nan_streaks_above_threshold(df_hour, df_nan_table_hour, thresholds_hour)
    #df_hour_partly_processed = df_hour_partly_processed.persist()
    
    dfs_hour.append(df_hour)
    dfs_10s.append(df_10s)
    #dfs_nan_table_10s.append(df_nan_table_10s)
    #dfs_nan_table_hour.append(df_nan_table_hour)
    
    #nan_figs_10s.append(nan_fig_10s)
    #nan_figs_hour.append(nan_fig_hour)
    
    #dfs_10s_partly_processed.append(df_10s_partly_processed)
    #dfs_hour_partly_processed.append(df_hour_partly_processed)

# Compute stuff

In [None]:
dfs_10s_results = []
dfs_hour_results = []

dfs_nan_table_10s_results = []
dfs_nan_table_hour_results = []

dfs_10s_partly_processed_results = []
dfs_hour_partly_processed_results = []

nan_figs_10s_results = []
nan_figs_hour_results = []

for i in range(len(dfs_10s)):
    dfs_10s_results.append(client.compute(dfs_10s[i].compute()))
    dfs_hour_results.append(client.compute(dfs_hour[i].compute()))
    
    #dfs_nan_table_10s_results.append(client.compute(dfs_nan_table_10s[i].compute()))
    #dfs_nan_table_hour_results.append(client.compute(dfs_nan_table_hour[i].compute()))
    
    #dfs_10s_partly_processed_results.append(client.compute(dfs_10s_partly_processed[i].compute()))
    #dfs_hour_partly_processed_results.append(client.compute(dfs_hour_partly_processed[i].compute()))

# Save stuff

In [12]:
# Save unprocessed dfs

zz = []
for i in range(len(dfs_10s)):
    dwelling_id = dwelling_ids[i]
    df = dfs_10s[i].compute()
    z = client.submit(save_df_unprocessed, df, dwelling_id+'_10s')
    zz.append(z) # This makes it run in parallel?
    print('Finished saving %s' % i)
    
zz = []
for i in range(len(dfs_hour)):
    dwelling_id = dwelling_ids[i]
    df = dfs_hour[i].compute()
    z = client.submit(save_df_unprocessed, df, dwelling_id+'_hour')
    zz.append(z) # This makes it run in parallel?
    print('Finished saving %s' % i)

Finished saving 0
Finished saving 1
Saved unprocessed df: P01S01W0001_10s
Finished saving 0
Saved unprocessed df: P01S01W0000_hour
Finished saving 1
Saved unprocessed df: P01S01W0001_hour
Saved unprocessed df: P01S01W0000_10s


In [13]:
# Save processed dfs

zz = []
for i in range(len(dfs_10s_partly_processed)):
    dwelling_id = dwelling_ids[i]
    df = dfs_10s_partly_processed[i].compute()
    z = client.submit(save_df_processed, df, dwelling_id+'_10s')
    zz.append(z) # This makes it run in parallel?
    print('Finished saving %s' % i)

zz = []
for i in range(len(dfs_hour_partly_processed)):
    dwelling_id = dwelling_ids[i]
    df = dfs_hour_partly_processed[i].compute()
    z = client.submit(save_df_processed, df, dwelling_id+'_hour')
    zz.append(z) # This makes it run in parallel?
    print('Finished saving %s' % i)

Finished saving 0
Finished saving 1
Saved processed df: P01S01W0001_10s
Finished saving 0
Saved processed df: P01S01W0000_hour
Saved processed df: P01S01W0000_10s
Finished saving 1
Saved processed df: P01S01W0001_hour


In [14]:
# Save nan tables

zz = []
for i in range(len(dfs_10s_partly_processed)):
    dwelling_id = dwelling_ids[i]
    df = dfs_nan_table_10s_results[i].result()
    z = client.submit(save_nan_table, df, dwelling_id+'_10s')
    zz.append(z) # This makes it run in parallel?
    print('Finished saving %s' % i)

zz = []
for i in range(len(dfs_hour_partly_processed)):
    dwelling_id = dwelling_ids[i]
    df = dfs_nan_table_hour_results[i].result()
    z = client.submit(save_nan_table, df, dwelling_id+'_hour')
    zz.append(z) # This makes it run in parallel?
    print('Finished saving %s' % i)

Finished saving 0
Finished saving 1
Saved nan table from: P01S01W0000_10s
Saved nan table from: P01S01W0001_10s
Finished saving 0
Saved nan table from: P01S01W0000_hour
Finished saving 1
Saved nan table from: P01S01W0001_hour


# Create & save NaN figures

In [None]:
# Save nan figures, non-daks way. Using dask gives errors

for i in tqdm(range(len(dfs_10s_results))):
    plt.close()
    dwelling_id = dwelling_ids[i]
    df_hour = dfs_hour_results[i]
    fig = plot_nans(df_hour, dwelling_id+' one hour sample rate', 'H')
    fig.tight_layout()
    
    dir = 'F://datc//opschaler//nan_information//figures//'
    fig.savefig(dir + dwelling_id + '_hour.png', dpi=300)

    
for i in tqdm(range(len(dfs_10s_results))):
    plt.close()
    dwelling_id = dwelling_ids[i]
    df_10s = dfs_10s_results[i]
    fig = plot_nans(df_10s, dwelling_id+' 10s sample rate', '1T')
    fig.tight_layout()
    
    dir = 'F://datc//opschaler//nan_information//figures//'
    fig.savefig(dir + dwelling_id+ '_10s.png', dpi=300)



  0%|                                                                                           | 0/56 [00:00<?, ?it/s]

  2%|█▍                                                                                 | 1/56 [00:00<00:20,  2.70it/s]

  4%|██▉                                                                                | 2/56 [00:00<00:18,  2.87it/s]

  5%|████▍                                                                              | 3/56 [00:01<00:20,  2.62it/s]

  7%|█████▉                                                                             | 4/56 [00:01<00:25,  2.07it/s]

  9%|███████▍                                                                           | 5/56 [00:02<00:24,  2.07it/s]

 11%|████████▉                                                                          | 6/56 [00:03<00:26,  1.87it/s]

 12%|██████████▍                                                                        | 7/56 [00:04<00:28,  1.72it/s]

 14%|███████████▊             

In [47]:
figs[2]

IndexError: list index out of range

Saved nan fig from: P01S01W0000


# Save nan figures

zz = []
for i in range(len(nan_figs_10s_results)):
    dwelling_id = dwelling_ids[i]
    fig = nan_figs_10s_results[i].result()
    z = client.submit(save_nan_fig, fig, dwelling_id+'_10s')
    zz.append(z) # This makes it run in parallel?
    print('Finished saving %s' % i)

zz = []
for i in range(len(nan_figs_hour_results)):
    dwelling_id = dwelling_ids[i]
    fig = nan_figs_hour_results[i].result()
    z = client.submit(save_nan_fig, fig, dwelling_id+'_hour')
    zz.append(z) # This makes it run in parallel?
    print('Finished saving %s' % i)

In [None]:
%matplotlib inline