**This jupyter notebook demonstrates how to:**

1. Create a multivariate timeseries using different datasets:
 - categorical data from lift stations;
 - Weather stations data;
 - Alarm events;
 - Instrument data (timeseries containing current from pumps and level);
 - eletric meters timeseries;
 - daily totalizers timeseries.

2. Some data preparation:
 - creation of timeseries from events dataset;
 - cleaning instrument or aquisition errors;
 - convertion (eg.: str to float);
 - removal of deactivation periods from lift stations.

**Mount google drive to access files**

In [1]:
from google.colab import drive
drive.mount('/content/gdrive')

Go to this URL in a browser: https://accounts.google.com/o/oauth2/auth?client_id=947318989803-6bn6qk8qdgf4n4g3pfee6491hc0brc4i.apps.googleusercontent.com&redirect_uri=urn%3aietf%3awg%3aoauth%3a2.0%3aoob&response_type=code&scope=email%20https%3a%2f%2fwww.googleapis.com%2fauth%2fdocs.test%20https%3a%2f%2fwww.googleapis.com%2fauth%2fdrive%20https%3a%2f%2fwww.googleapis.com%2fauth%2fdrive.photos.readonly%20https%3a%2f%2fwww.googleapis.com%2fauth%2fpeopleapi.readonly

Enter your authorization code:
··········
Mounted at /content/gdrive


In [2]:
#Important defines:
#total number of lift stations
NUMBER_OF_LIFT_STATIONS = 30 

#number of pumps per lift stations
PUMPS_PER_LIFT_STATION  = [2,2,3,3,3,1,2,3,2,2,3,3,2,2,2,
                           2,3,2,2,2,2,2,2,2,2,2,2,2,2,2]

#google drive folder where datasets are stored
DATASETS_DIR = r'/content/gdrive/My Drive/datasets/'                         


In [3]:
import pandas as pd
import numpy as np
import matplotlib
import matplotlib.pyplot as plt
import datetime

**Load lift stations register**

This data frame contains characteristics and specifications about sewage lift stations

Some columns (listed above) contains float values with comma instead of dot. 

In [4]:
lift_station_register = pd.read_csv(DATASETS_DIR + r'sewage_pumping_stations_base.csv')

#list of columns to transform into float, changing comma for dot
columns_to_change_to_float = ['utmx', 'utmy', 'nivel_max','vazao_recalque', 
                              'altura_manometrica','extensao_recalque',
                              'potencia_instalada']

for column in columns_to_change_to_float:
  try:
    lift_station_register[column] = lift_station_register[column].str.replace(',', '.').astype(float)
  except:
    print(f'Erro changing type of column: {column}')
    continue


**Weather stations data**

Files where downloaded from http://www.inmet.gov.br/portal/index.php?r=bdmep/bdmep

Weather data are related to Sewage Lift Stations events because because part of the rainwater is improperly connected to the sewer network.

This dataset is composed by data from 9 weather stations wich are located in Belo Horizonte region. They are near sewage lift stations analysed at this master degree work.

Each weather station file contains daily precipitation between January 1, 2010 and December 31, 2018.


In [6]:
#load weather stations register containing ids from all weather stations
weather_stations = pd.read_csv(DATASETS_DIR + r'cadastro_estacoes.csv')

#load 9 weather stations datasets
weather_stations_data_list = []
for i,row in weather_stations.iterrows():
    #load the dataset from each weather station 
    #files names uses the id from each weather station
    weather_station_data = pd.read_csv(DATASETS_DIR + r'estacao_{0}.csv'.format(row['id']),delimiter=';')
    #set the datetime index
    weather_station_data['datahora'] = pd.to_datetime(weather_station_data['Data'],format='%d/%m/%Y')
    weather_station_data.index = pd.to_datetime(weather_station_data['datahora'])
    #change the precipitation column name preparing for concatenation
    new_column_name = f"WeatherStation_{row['id']}"
    weather_station_data = weather_station_data.rename(columns = {'Precipitacao':new_column_name})
    #append each weather station dataset to a list for letter concatenation
    weather_stations_data_list.append(weather_station_data[['datahora',new_column_name]])

#contatenate weather station datasets    
weather_stations_datas = pd.concat(weather_stations_data_list, axis=1)
weather_stations_datas = weather_stations_datas.drop(columns=['datahora',])
#fill values na with 0
weather_stations_datas = weather_stations_datas.fillna(0)

**Intervals with operation interruption from some lift stations**




In [7]:
#some sewage lift station were deactivated after a specific date 
#the dict below helps to get correct operation interval
lifting_operation_intervals = {}
lifting_operation_intervals[3] = {'ini':pd.to_datetime(datetime.date(2011,1,1)) ,
                            'fim':pd.to_datetime(datetime.date(2013,4,8))}
lifting_operation_intervals[4] = {'ini':pd.to_datetime(datetime.date(2011,1,1)) ,
                            'fim':pd.to_datetime(datetime.date(2013,3,21))}
lifting_operation_intervals[5] = {'ini':pd.to_datetime(datetime.date(2011,1,1)) ,
                            'fim':pd.to_datetime(datetime.date(2013,3,28))}
lifting_operation_intervals[6] = {'ini':pd.to_datetime(datetime.date(2011,1,1)) ,
                            'fim':pd.to_datetime(datetime.date(2012,7,4))}
lifting_operation_intervals[7] = {'ini':pd.to_datetime(datetime.date(2011,1,1)) ,
                            'fim':pd.to_datetime(datetime.date(2016,11,8))}
lifting_operation_intervals[18] = {'ini':pd.to_datetime(datetime.date(2011,1,1)) ,
                             'fim':pd.to_datetime(datetime.date(2017,8,30))}
lifting_operation_intervals[24] = {'ini':pd.to_datetime(datetime.date(2011,1,1)) ,
                             'fim':pd.to_datetime(datetime.date(2016,11,27))}
lifting_operation_intervals[26] = {'ini':pd.to_datetime(datetime.date(2011,1,1)) ,
                             'fim':pd.to_datetime(datetime.date(2017,6,17))}
lifting_operation_intervals[27] = {'ini':pd.to_datetime(datetime.date(2011,1,1)) ,
                             'fim':pd.to_datetime(datetime.date(2016,12,8))}

**The function bellow helps to access alarm events with a generic content**

The function returns alarm and event data removing specific tags related to sewage lift stations.

In [8]:
def get_alarm_data(lift_station_index):
    """Function to return alarm data.

    Get alarms and events from dataset changing names and messages to 
    standardize the dataframe result.

    This function needs two global variables already declared: 
        lift_station_register: a dataframe containing information and 
          specifications from lift stations
        lifting_operation_intervals: a dictionary containing operating intervals 
          for sewage pumping stations

    Args:
        lift_station_index (int): index from the sewage lift station.

    Returns:
        dataframe: alarms dataframe with

    """
    #read alarm files
    alarms_file = DATASETS_DIR + r'alarms_utr_{:>02}w.csv'.format(lift_station_index)

    #verify if lift station has a operation interval to use just data 
    #during its operation period.
    try:
        start = lifting_operation_intervals[lift_station_index]['ini']
        end   = lifting_operation_intervals[lift_station_index]['fim']
        print(f'{lift_station_register.iloc[lift_station_index - 1].nome}:{ini} a {fim}')
    except:
        start = pd.to_datetime(datetime.date(2011,1,1))
        end   = pd.to_datetime(datetime.date(2018,7,31))
        pass    
    
    alarms = None
    first = True
    #read file with chunks to avoid Memory error
    reader = pd.read_csv(alarms_file, chunksize=50000)
    for chunk in reader:
        temp_regs = chunk
        temp_regs['INTIME_']  = pd.to_datetime(temp_regs['INTIME_'])
        temp_regs['OUTTIME_'] = pd.to_datetime(temp_regs['OUTTIME_'])    

        #create the dataframe at the first chunk and append later 
        if first:
            alarms = temp_regs
            first = False
        else:
            alarms = alarms.append(temp_regs)

    #filter data between end and start dates
    alarms = alarms[(alarms.INTIME_ >= start) & (alarms.INTIME_ <= end)] 
    #create index for datetime
    alarms.index = pd.to_datetime(alarms['INTIME_'])

    #set tag name for sewage level signal
    tag_level      = f'LI01-{lift_station_index}'

    #replaces tags and values ​​by generic content to facilitate
    #treatment of data
    for pump_index in range(1,4):
        tag_pump          = 'B{:>02}-{}'.format(pump_index,lift_station_index)
        new_tag_pump      = f'pump{pump_index}'
        tag_lift_station  = 'UTR_{:>02}'.format(lift_station_index)
        tag_eletric       = 'MGE{:>02}-{}'.format(pump_index,lift_station_index)
        new_tag_eletric   = f'eletrica_bomba{pump_index}'

        #change alarms messages with generic contents
        alarms.loc[alarms.TAG == tag_pump,    'TAG']  = new_tag_pump
        alarms.loc[alarms.TAG == tag_lift_station,      'TAG']  = 'lift_station'
        alarms.loc[alarms.TAG == tag_level,    'TAG']  = 'level'
        alarms.loc[alarms.TAG == tag_eletric, 'TAG']  = new_tag_eletric

        for fase in ['R', 'S', 'T', 'RS', 'RT', 'ST']:
          messages = ['Corrente Baixa fase {} - Normalizado'.format(fase),
                      'Corrente Baixa fase {}'.format(fase),
                      'Corrente Alta fase {} - Normalizado'.format(fase),
                      'Corrente Alta fase {}'.format(fase),
                      'Tensão {} Alta - Normalizado'.format(fase),
                      'Tensão {} Alta'.format(fase),
                      'Tensão {} Baixa - Normalizado'.format(fase),
                      'Tensão {} Baixa'.format(fase),]
          for message in messages:
            new_message = message.replace(' {}'.format(fase),'')
            alarms.loc[alarms.MESSAGE == message, 'MESSAGE']  = new_message


    return alarms

**The function bellow helps to access instruments data with a generic content**

The function returns instrument data (current for pump, level and timestamp) removing specific tags related to sewage lift stations. 

In [9]:
def get_instrument_data(lift_station_index):
    """Function to return instrument data.

    Get alarms and events from dataset changing names and messages to 
    standardize the dataframe result.

    This function needs two global variables already declared: 
        lift_station_register: a dataframe containing information and 
          specifications from lift stations
        lifting_operation_intervals: a dictionary containing operating intervals 
          for sewage pumping stations

    Args:
        lift_station_index (int): index from the sewage lift station.

    Returns:
        dataframe: instrument dataframe level and pump current values

    """

    #verify if lift station has a operation interval to use just data 
    #during its operation period.
    try:
        start = lifting_operation_intervals[lift_station_index]['ini']
        end   = lifting_operation_intervals[lift_station_index]['fim']
        print(f'{lift_station_register.iloc[lift_station_index - 1].nome}:{ini} a {fim}')
    except:
        start = pd.to_datetime(datetime.date(2011,1,1))
        end   = pd.to_datetime(datetime.date(2018,7,31))
        pass

    #read dataframe from file 
    data_file   = DATASETS_DIR + r'tab_inst_utr_{:>02}.csv'.format(lift_station_index)  
    instrument_data   = pd.read_csv(data_file)  
    instrument_data['TIMESTAMP'] = pd.to_datetime(instrument_data['TIMESTAMP'])
    instrument_data = instrument_data.sort_values(by=['TIMESTAMP'])
    instrument_data = instrument_data[(instrument_data.TIMESTAMP >= start) & 
                                      (instrument_data.TIMESTAMP <= end)]
    instrument_data.index = pd.to_datetime(instrument_data['TIMESTAMP'])

    #rename level column name
    column_level_name = 'LI01_{}'.format(lift_station_index)
    instrument_data = instrument_data.rename(columns={column_level_name: 'level',})


    #get limits for level signal
    limits = {'max':3*lift_station_register.iloc[lift_station_index - 1]['nivel_max'],
              'min':0.0}    
    
    #cleaning level value errors
    #if less than 0, starts with 0
    instrument_data.loc[instrument_data.level < 0, 'level'] = 0.0
    
    #set invalid level value at a column
    instrument_data['level_inv_max'] = 0
    instrument_data.loc[instrument_data.level > limits['max'], 'level_inv_max'] = 1
    
    #change column names with generic content 
    for pump_index in range(1,4):
        tag_current = 'IB%02d_%d' % (pump_index,lift_station_index) 
        new_tag_current     = f'current{pump_index}'
        new_tag_current_lim = f'corrente{pump_index}'
        new_tag_pump        = f'bomba{pump_index}'

        #if the pump does not exists, set current signal with 0
        if tag_current in instrument_data.columns:
          instrument_data = instrument_data.rename(columns={tag_current: new_tag_current,})
        else:
          instrument_data[new_tag_current] = 0.0

        #get limits for current signal  
        limits = {'max':3*lift_station_register.iloc[lift_station_index - 1][f'{new_tag_current_lim}_max'],
                   'min':0}
        
        #initialize current value with 0 
        if lift_station_register.iloc[lift_station_index - 1][new_tag_pump] == 0:
          instrument_data[new_tag_current] = 0.0          
        else:
          #if less than 0, starts with 0
          instrument_data.loc[instrument_data[new_tag_current] < limits['min'], 
                              new_tag_current] = 0.0          
          
        #set invalid level value at a column
        column_inv_max_current = f'{new_tag_current_lim}_inv_max'
        instrument_data[column_inv_max_current] = 0
        instrument_data.loc[instrument_data[new_tag_current] > limits['max'], 
                            column_inv_max_current] = 1          
    
    #filter columns to be returned and adjust timestamp at 5 minute intervals
    data_columns = ['level','level_inv_max', 'current1','current2','current3', 
                    'corrente1_inv_max','corrente2_inv_max','corrente3_inv_max']

    #resample with 5 minutes interval
    instrument_data = instrument_data.resample('5T').first()[data_columns]
    instrument_data['TIMESTAMP'] = instrument_data.index
    instrument_data['TIMESTAMP_fim'] = instrument_data['TIMESTAMP'] + np.timedelta64(300, 's')
    instrument_data['DATA'] = instrument_data.index.date

    return instrument_data




**The function bellow returns sewage lift station datasets**

The function returns alarm, instrument, energy meter and totalizations data related to sewage lift stations

In [10]:
def get_datasets(lift_station_index):
    """Return alarm, instrument, energy meter and totalizations data.

    Args:
        lift_station_index (int): index from the sewage lift station.

    Returns:
        dataframe: alarms dataframe
        dataframe: instrument data dataframe (level and pump currents)
        list: eletric meters dataframes
        dataframe: totalizers data

    """
    #get alarms dataframe
    alarms = get_alarm_data(lift_station_index)

    #get instrument dataframe
    instrument_data = get_instrument_data(lift_station_index)

    #get eletric meter files and dataframe
    number_of_pumps = PUMPS_PER_LIFT_STATION[lift_station_index - 1]
    eletric_meter_files   = [DATASETS_DIR + r'mge_utr_{:>02}_{}.csv'.format(
        lift_station_index, 
        pump_index) for pump_index in range(1, number_of_pumps + 1)]
    eletric_meters    = [pd.read_csv(meter_file) for meter_file in eletric_meter_files]

    #totalizers by day dataframe
    totday_file = DATASETS_DIR + r'tab_totdia_utr_{:>02}.csv'.format(lift_station_index)
    totday  =  pd.read_csv(totday_file)  
    totday['TIMESTAMP'] = pd.to_datetime(totday['TIMESTAMP'])
    totday = totday.sort_values(by=['TIMESTAMP'])
    
    return alarms, instrument_data, eletric_meters, totday

In [13]:
def end_of_overflow_test(row, column_name, limits):
    """test for the end of sewage overflow event

    Args:
        row: dataframe row containg values.
        column_name: name of the column to.
        limits: gives min and max limits for level

    Returns:
        bool: result of the test (True: the event ended)

    """
    # set the end of the overflow event if a nan value is found or 
    # if the level value is half the max limit
    if np.isnan(row[column_name]) | (row[column_name] < limits['max']/2):
      return True
    return False
  
def end_pump_activate_test(row, column_name, limits):
    """test for the end of pump activation

    Args:
        row: dataframe row containg values.
        column_name: name of the column to.
        limits: gives min and max limits for level

    Returns:
        bool: result of the test (True: the event ended)

    """
    # set the end of the pump activation if a nan value is found or 
    # if the pump current is zero    
    if np.isnan(row[column_name]) | (row[column_name] == 0):
      return True
    return False  


def end_maintenance_test(row, column_name, limits):
    """test for the end of maintenance

    Args:
        row: dataframe row containg values.
        column_name: name of the column to.
        limits: gives min and max limits for level

    Returns:
        bool: result of the test (True: the event ended)

    """
    # set the end of the maintenance event if a nan value is found or 
    # if the pump current is more than zero      
    if np.isnan(row[column_name]) | (row[column_name] > 0):
      return True
    return False
  

In [14]:
def create_timeseries_from_events(
    events, 
    multivar, 
    label, 
    func_test, 
    column_name, 
    limits):
    """create timeseries from events adding aditional information into the
    multivariate timeseries

    Args:
        events: events dataframe which will be used to create timeseries
        multivar: original multivariate timeseries
        label: label from the event that will be used to create timeseries
        func_test: a function that will be user to check the end of the event
        column_name: name of the column that will be used with the func_test


    Returns:
        None

    """

    label_contagem  = f'{label}_contagem'
    label_duracao   = f'{label}_duracao'
    label_ocorreu   = f'{label}_ocorreu'
    label_ocorrendo = f'{label}_ocorrendo'    
    multivar[label_contagem] = 0.0
    multivar[label_duracao]  = 0.0
    multivar[label_ocorreu]  = 0   
    multivar[label_ocorrendo]  = 0
    
    # scans entire dataframe filling counters and events duration
    row_iterator = events.iterrows()

    for i, row in row_iterator:
        # returns time series that are related to the event period
        multivar_regs = multivar[((multivar.TIMESTAMP_fim > row['INTIME_']) & 
                                  (multivar.TIMESTAMP <= row['INTIME_'])) |
                                 ((multivar.TIMESTAMP_fim > row['OUTTIME_']) & 
                                  (multivar.TIMESTAMP <= row['OUTTIME_'])) |
                                 ((multivar.TIMESTAMP <= row['OUTTIME_']) & 
                                  (multivar.TIMESTAMP >= row['INTIME_']))
                                ]
        # scans records adding count and duration associated with the event
        for index, row_var in multivar_regs.iterrows():
          # check for the event end
          if func_test(row_var, column_name, limits):
            continue
          
          inicio = row_var['TIMESTAMP'] 
          fim    = row_var['TIMESTAMP_fim']
          if inicio < row['INTIME_']:
            inicio = row['INTIME_']
          if fim > row['OUTTIME_']:
            fim = row['OUTTIME_']
          multivar.at[index, label_contagem]  = row_var[label_contagem] + 1
          multivar.at[index, label_ocorrendo] = 1          
          multivar.at[index, label_duracao] = row_var[label_duracao] + (fim - inicio)/np.timedelta64(1,'s')
          # marks the occurrence of event only at the first index 
          # of the returned records 
          if index == 0:
            multivar.at[index, label_ocorreu] = 1
    
    # calculate cumulative sum for event duration and count
    multivar[f'{label_contagem}_cum'] = multivar[label_contagem].cumsum(axis = 0)
    multivar[f'{label_duracao}_cum'] = multivar[label_duracao].cumsum(axis = 0)    
    multivar[f'{label_ocorreu}_cum'] = multivar[label_ocorreu].cumsum(axis = 0)    
    multivar[f'{label_ocorrendo}_cum'] = multivar[label_ocorrendo].cumsum(axis = 0)  
    

In [17]:
def get_data_and_clean(lift_station, debug = False):
    """get all datasets and call additional functions to make some 
    data preparation and transformation

    Args:
        lift_station: lift station index
        debug: enable printing debug information


    Returns:
        multivariate dataframe containg all needed information

    """  
    # get datasets that will be used
    alarms, multivar, mges, totdia = get_datasets(lift_station)
    # if debug, print deactivation intervals
    if debug:
        invalid_dates = set(multivar[(multivar.level_inv_max == 1) | 
                                     (multivar.corrente1_inv_max == 1) | 
                                     (multivar.corrente2_inv_max == 1) | 
                                     (multivar.corrente3_inv_max == 1) ].index.date)
        # print information about intervals containing invalid instrument data 
        if len(invalid_dates) > 0:
            print(f'lift_station {lift_station} :{len(invalid_dates)}    ' + \
                  f'---   {len(set(multivar[(multivar.corrente1_inv_max == 1)].index.date))}    ' + \
                  f'---   {len(set(multivar[(multivar.corrente2_inv_max == 1)].index.date))}    ' + \
                  f'---   {len(set(multivar[(multivar.corrente3_inv_max == 1)].index.date))}')
  
    # query for sewage overflow events
    if debug:
        print(f'{datetime.datetime.now()} - Lift station {lift_station}: processing sewage overflow events')  
    mask = (alarms['MESSAGE'] == 'Limite Alto - Normalizado') & \
           (alarms['INTIME_'] <= alarms['OUTTIME_'])
    overflow_events = alarms.loc[mask][['INTIME_','OUTTIME_']]
  
    # use level transmitter limits
    limits = {'max':lift_station_register.iloc[elevatoria - 1]['nivel_max'],
              'min':0.0}  
    
    # process overflow events
    create_timeseries_from_events(
        overflow_events, 
        multivar, 
        'extravasao', 
        end_of_overflow_test, 
        'level', 
        limits)
    
    # iterate over pumps creating time series for pump events 
    # (activation and maintenance)
    for mge in range(1,4):
        novo_tag_bomba    = f'bomba{mge}'
        tag_corrente      = f'current{mge}'
        max_current_tag   = f'corrente{mge}_max'
    
        # obtain pump current limits
        limits = {'max':lift_station_register.iloc[lift_station - 1][max_current_tag],
                  'min':0.0}
    
        if debug:    
            print(f'{datetime.datetime.now()} - Lift station {lift_station}: processing pump activation {novo_tag_bomba}')
        
        #  query for pump activation events
        mask = (alarms['MESSAGE'] == 'Bomba Desligou') & \
               (alarms['INTIME_'] <= alarms['OUTTIME_']) & \
               (alarms.TAG == novo_tag_bomba)
        events = alarms.loc[mask][['INTIME_','OUTTIME_']]

        # process pump activation events
        create_timeseries_from_events(
            events, 
            multivar, 
            f'acc_bomba{mge}', 
            end_pump_activate_test, 
            tag_corrente, 
            limits)
        
        if debug:
            print(f'{datetime.datetime.now()} - Lift station {lift_station}: processing pump maintenance {novo_tag_bomba}')

        #  query for maintenance events            
        mask = (alarms['MESSAGE'] == 'Bomba saiu de Manutenção') & \
               (alarms['INTIME_'] <= alarms['OUTTIME_']) & \
               (alarms.TAG == novo_tag_bomba)
        events = alarms.loc[mask][['INTIME_','OUTTIME_']]
        
        # process pump maintenance events
        create_timeseries_from_events(
            events, 
            multivar, 
            f'mnt_bomba{mge}', 
            end_maintenance_test, 
            tag_corrente, 
            limits)
 
    return multivar


In [None]:
#varre todas as elevatorias gerando arquivos com time series compiladas e com as devidas limpezas 
for lift_station in range(1,31):
    multivar = get_data_and_clean(lift_station, True)
    timeseries_file = DATASETS_DIR + f'timeseries_lift_station_{lift_station}_.csv'
    multivar.to_csv(timeseries_file)