In [None]:
from importlib import reload
#reload(Utilities)
#reload(clm)
# NOTE: To reload a class imported as, e.g., 
# from module import class
# One must call:
#   1. import module
#   2. reload module
#   3. from module import class

import sys, os
import re
import copy

import pandas as pd
import numpy as np
from pandas.api.types import is_numeric_dtype
from scipy import stats
import datetime
import time
from natsort import natsorted, ns
from packaging import version

import itertools

import pyodbc
#---------------------------------------------------------------------
import matplotlib as mpl
import matplotlib.pyplot as plt
import seaborn as sns
from matplotlib.backends.backend_pdf import PdfPages
import matplotlib.patches as mpatches
from matplotlib.lines import Line2D
import matplotlib.ticker as ticker
from matplotlib import dates
#---------------------------------------------------------------------
sys.path.insert(0, os.path.realpath('..'))
import Utilities_config
#-----
import CommonLearningMethods as clm
#-----
from MeterPremise import MeterPremise
#-----
from AMI_SQL import AMI_SQL
from AMINonVee_SQL import AMINonVee_SQL
from AMIEndEvents_SQL import AMIEndEvents_SQL
from AMIUsgInst_SQL import AMIUsgInst_SQL
from DOVSOutages_SQL import DOVSOutages_SQL
#-----
from GenAn import GenAn
from AMINonVee import AMINonVee
from AMIEndEvents import AMIEndEvents
from AMIUsgInst import AMIUsgInst
from DOVSOutages import DOVSOutages
#---------------------------------------------------------------------
sys.path.insert(0, Utilities_config.get_sql_aids_dir())
import Utilities_sql
import TableInfos
from TableInfos import TableInfo
from SQLElement import SQLElement
from SQLElementsCollection import SQLElementsCollection
from SQLSelect import SQLSelectElement, SQLSelect
from SQLFrom import SQLFrom
from SQLWhere import SQLWhereElement, SQLWhere
from SQLJoin import SQLJoin, SQLJoinCollection
from SQLGroupBy import SQLGroupByElement, SQLGroupBy
from SQLHaving import SQLHaving
from SQLOrderBy import SQLOrderByElement, SQLOrderBy
from SQLQuery import SQLQuery
from SQLQueryGeneric import SQLQueryGeneric
#---------------------------------------------------------------------
#sys.path.insert(0, os.path.join(os.path.realpath('..'), 'Utilities'))
sys.path.insert(0, Utilities_config.get_utilities_dir())
import Utilities
import Utilities_df
from Utilities_df import DFConstructType
import Utilities_dt
import Plot_Box_sns
import GrubbsTest

In [None]:
"""
In MeterPremise, can I create a function which will build mp_df_hist given mp_df_curr?
In DOVSOutages I need to build new build_mp_for_outg (and update its use throughout)
    APPARENTLY I already have this, build_active_MP_for_outages or one of other similar functions
    
I need to basically replace everything in DOVSOutages which uses build_mp_for_outg
"""


def build_active_MP_for_outages_df(
    df_outage, 
    prem_nb_col, 
    df_mp_curr=None, 
    df_mp_hist=None, 
    assert_all_PNs_found=True, 
    drop_inst_rmvl_cols=False, 
    outg_rec_nb_col='OUTG_REC_NB',  #TODO!!!!!!!!!!!!!!!!!!!!!!! what if index?!
    is_slim=False, 
    dt_on_ts_col='DT_ON_TS', 
    df_off_ts_full_col='DT_OFF_TS_FULL', 
    consolidate_PNs_batch_size=1000, 
    df_mp_serial_number_col='mfr_devc_ser_nbr', 
    df_mp_prem_nb_col='prem_nb', 
    df_mp_install_time_col='inst_ts', 
    df_mp_removal_time_col='rmvl_ts', 
    df_mp_trsf_pole_nb_col='trsf_pole_nb'
):
    r"""
    Similar to build_active_MP_for_outages
    """
    #-------------------------
    assert(prem_nb_col in df_outage.columns and 
           dt_on_ts_col in df_outage.columns and 
           df_off_ts_full_col in df_outage.columns)
    #-------------------------
    if not is_slim:
        PNs = df_outage[prem_nb_col].unique().tolist()
    else:
        PNs = Utilities_df.consolidate_column_of_lists(
            df=df_outage, 
            col=prem_nb_col, 
            sort=True,
            include_None=False,
            batch_size=consolidate_PNs_batch_size, 
            verbose=False
        )
    #-----
    PNs = [x for x in PNs if pd.notna(x)]
    #-------------------------
    mp_df_curr_hist_dict = MeterPremise.build_mp_df_curr_hist_for_PNs(
        PNs=PNs, 
        mp_df_curr=df_mp_curr,
        mp_df_hist=df_mp_hist, 
        join_curr_hist=False, 
        addtnl_mp_df_curr_cols=None, 
        addtnl_mp_df_hist_cols=None, 
        assert_all_PNs_found=assert_all_PNs_found, 
        assume_one_xfmr_per_PN=True, 
        drop_approx_duplicates=True
    )
    df_mp_curr = mp_df_curr_hist_dict['mp_df_curr']
    df_mp_hist = mp_df_curr_hist_dict['mp_df_hist']
    #-------------------------
    # Only reason for making dict is to ensure outg_rec_nbs are not repeated 
    active_SNs_in_outgs_dfs_dict = {}

    if not is_slim:
        for outg_rec_nb_i, df_i in df_outage.groupby(outg_rec_nb_col):
            # Don't want to include outg_rec_nb_i=-2147483648
            if int(outg_rec_nb_i) < 0:
                continue
            # There should only be a single unique dt_on_ts and dt_off_ts_full for each outage
            if(df_i[dt_on_ts_col].nunique()!=1 or 
               df_i[df_off_ts_full_col].nunique()!=1):
                print(f'outg_rec_nb_i = {outg_rec_nb_i}')
                print(f'df_i[dt_on_ts_col].nunique()       = {df_i[dt_on_ts_col].nunique()}')
                print(f'df_i[df_off_ts_full_col].nunique() = {df_i[df_off_ts_full_col].nunique()}')
                print('CRASH IMMINENT!')
                assert(0)
            # Grab power out/on time and PNs from df_i
            dt_on_ts_i       = df_i[dt_on_ts_col].unique()[0]
            df_off_ts_full_i = df_i[df_off_ts_full_col].unique()[0]
            PNs_i            = df_i[prem_nb_col].unique().tolist()

            # Just as was done above for PNs, NaN values must be removed from PNs_i
            #   The main purpose here is to remove instances where PNs_i = [nan]
            #   NOTE: For case of slim df, the NaNs should already be removed
            # After removal, if len(PNs_i)==0, contine
            PNs_i = [x for x in PNs_i if pd.notna(x)]
            if len(PNs_i)==0:
                continue
            
            # Build active_SNs_df_i and add it to active_SNs_in_outgs_dfs_dict
            # NOTE: assume_one_xfmr_per_PN=True above in MeterPremise.build_mp_df_curr_hist_for_PNs,
            #       so does not need to be set again (i.e., assume_one_xfmr_per_PN=False below)
            active_SNs_df_i = MeterPremise.get_active_SNs_for_PNs_at_datetime_interval(
                PNs=PNs_i,
                df_mp_curr=df_mp_curr, 
                df_mp_hist=df_mp_hist, 
                dt_0=df_off_ts_full_i,
                dt_1=dt_on_ts_i,
                assume_one_xfmr_per_PN=False, 
                output_index=None,
                output_groupby=None, 
                assert_all_PNs_found=False
            )
            active_SNs_df_i[outg_rec_nb_col] = outg_rec_nb_i
            assert(outg_rec_nb_i not in active_SNs_in_outgs_dfs_dict)
            active_SNs_in_outgs_dfs_dict[outg_rec_nb_i] = active_SNs_df_i
    else:
        for outg_rec_nb_i, row_i in df_outage.iterrows():
            # NOTE: assume_one_xfmr_per_PN=True above in MeterPremise.build_mp_df_curr_hist_for_PNs,
            #       so does not need to be set again (i.e., assume_one_xfmr_per_PN=False below)
            active_SNs_df_i = MeterPremise.get_active_SNs_for_PNs_at_datetime_interval(
                PNs=row_i[prem_nb_col],
                df_mp_curr=df_mp_curr, 
                df_mp_hist=df_mp_hist, 
                dt_0=row_i[df_off_ts_full_col],
                dt_1=row_i[dt_on_ts_col],
                assume_one_xfmr_per_PN=False, 
                output_index=None,
                output_groupby=None, 
                assert_all_PNs_found=False
            )
            active_SNs_df_i[outg_rec_nb_col] = outg_rec_nb_i
            assert(outg_rec_nb_i not in active_SNs_in_outgs_dfs_dict)
            active_SNs_in_outgs_dfs_dict[outg_rec_nb_i] = active_SNs_df_i
    #-------------------------
    active_SNs_df = pd.concat(list(active_SNs_in_outgs_dfs_dict.values()))
    #-------------------------
    if drop_inst_rmvl_cols:
        active_SNs_df = active_SNs_df.drop(columns=[df_mp_install_time_col, df_mp_removal_time_col])
    #-------------------------
    return active_SNs_df

In [None]:
import shutil

def offset_int_tagged_files_in_dir(
    files_dir, 
    file_name_regex, 
    offset_int=None, 
    new_0_int=None, 
    new_dir=None, 
    file_name_glob=None, 
    copy_and_rename=False, 
    return_rename_summary=False
):
    r"""
    Offset all of the files in files_dir by offset_int.
    The directory files_dir is expected to contain files of the form [file_idx_0, file_idx_1, ..., file_idx_n] where
        idx_0, idx_1, ..., idx_n are integers.
    The files can either be renamed using the offset_int argument OR the new_0_int argument, BUT NOT BOTH.
        For the case of offset_int:
            The files in this directory will be renamed [file_{idx_0+offset_int}, file_{idx_1+offset_int}, ..., file_{idx_n+offset_int}]
        For the case of new_0_int:
            The files in this directory will be renamed [file_{new_0_int}, file_{new_0_int+1}, ..., file_{new_0_int+n_files-1}]
    The files can simply be moved/renamed (copy_and_rename==False), or copied to the new directory (copy_and_rename==True and 
        new_dir not None)
    -------------------------
    files_dir:
        The directory housing the files to be renamed
        
    file_name_regex:
        A regex patten used to both identify the files to be renamed and to find the integer tag for each file.
        NOTE: file_name_regex MUST have some sort of digit capture (e.g., contain '(\d*)')
              e.g., for the case of end events, one would use file_name_regex = r'end_events_(\d*).csv'
              
    offset_int/new_0_int:
        These direct how the files will be renamed.  
        ONLY ONE OF THESE SHOULD BE USED ==> one should always be set to None and the other should be set to some int value
        offset_int:
            Take the identifier/tag ints, and simply shift them by offset_int.
        new_0_int:
            Start the identifier tags at new_0_int, and label from new_0_int to new_0_int + len(files in files_dir)-1
            
    new_dir:
        The directory to which the renamed files will be saved.
        Default value is None, which means the files will be saved in the input files_dir
              
    file_name_glob:
        Used in Utilities.find_all_paths to help find the paths to be renamed.  By default this is set to None, which is then
            changed to = '*', meaning the glob portion doesn't trim down the list of files at all, but returns all contained
            in the directory.  Therefore, file_name_regex does all of the work, which is fine and really as designed
            
    copy_and_rename:
        Directs whether to call rename or copy.
        copy_and_rename=False:
            Default behavior, which means the files will be renamed and replaced.
        copy_and_rename=False:
            The files will be copied and renamed, with the originals kept intact.  This is only possible if new_dir is not None
            and new_dir != files_dir
        
    """
    #-------------------------
    # Exclusive or for offset_int and new_0_int (meaning, one but not both must not be None)
    assert(offset_int is not None or new_0_int is not None)
    assert(not(offset_int is not None and new_0_int is not None))
    #-------------------------
    assert(os.path.isdir(files_dir))
    if file_name_glob is None:
        file_name_glob = '*'
    if new_dir is None:
        new_dir = files_dir
    #-------------------------
    if new_dir==files_dir:
        copy_and_rename = False
    #-------------------------
    if not os.path.exists(new_dir):
        os.makedirs(new_dir)
    #-------------------------
    paths = Utilities.find_all_paths(
        base_dir=files_dir, 
        glob_pattern=file_name_glob, 
        regex_pattern=file_name_regex
    )
    #-------------------------
    paths_w_tags = []
    for path in paths:
        tag = re.findall(file_name_regex, path)
        print(path)
        print(file_name_regex)
        print(tag)
        print()
        # Should have only been one tag found per path
        if len(tag)>1:
            print(tag)
        assert(len(tag)==1)
        tag=int(tag[0])
        paths_w_tags.append((path, tag))
    # NOTE: Want to sort in reverse so that highest is first.  This is so there are no naming issues when the rename occurs.  
    #       E.g., imaging there are 10 members in paths, tagged 0 through nine, and they are to be offset by 1
    #         If we started with the lowest tag, file_0, shifting it by 1 would make it file_1, which already exists!
    #         If, instead, we start with the highest tag, file_9, it shifts by 1 to file_10, which is not an issue.
    paths_w_tags = natsorted(paths_w_tags, key=lambda x: x[1], reverse=True)
    #-------------------------
    rename_summary = {}
    for i,path_w_tag_i in enumerate(paths_w_tags):
        path_i = path_w_tag_i[0]
        tag_i  = path_w_tag_i[1]
        file_name_i = os.path.basename(path_i)
        #-----
        assert(str(tag_i) in file_name_i)
        if offset_int is not None:
            repl_int_i = str(tag_i+offset_int)
        elif new_0_int is not None:
            # Remember, sorted in descending order
            repl_int_i = str(len(paths_w_tags)+new_0_int-(i+1))
        else:
            assert(0)
        new_file_name_i=file_name_i.replace(str(tag_i), str(repl_int_i))
        #-----
        assert(new_file_name_i not in os.listdir(new_dir))
        new_path_i = os.path.join(new_dir, new_file_name_i)
        #-----
        assert(path_i not in rename_summary.keys())
        rename_summary[path_i] = new_path_i
        #-----
        if copy_and_rename:
            shutil.copy(src=path_i, dst=new_path_i)
        else:
            os.rename(path_i, new_path_i)
    #-------------------------
    if return_rename_summary:
        return rename_summary
    
    
    
def offset_int_tagged_files_w_summaries_in_dir(
    files_dir, 
    file_name_regex, 
    offset_int=None, 
    new_0_int=None, 
    new_dir=None, 
    file_name_glob=None, 
    
    summary_files_dir=None,
    summary_file_name_regex=None,
    summary_file_name_glob=None, 
    summary_new_dir=None, 
    
    copy_and_rename=False, 
    return_rename_summary=False
):
    r"""
    """
    #-------------------------
    if summary_files_dir is None:
        summary_files_dir = os.path.join(files_dir, 'summary_files')
    if summary_file_name_regex is None:
        summary_file_name_regex = file_name_regex.replace('_(\d*).csv', '_([0-9]*)_summary.json')
    if summary_file_name_glob is None:
        if file_name_glob is None:
            summary_file_name_glob = '*'
        else:
            summary_file_name_glob = file_name_glob.replace('_*.csv', '*_summary.json')
    if summary_new_dir is None:
        if new_dir is None:
            summary_new_dir = os.path.join(files_dir, 'summary_files')
        else:
            summary_new_dir = os.path.join(new_dir, 'summary_files')
    #-------------------------
    files_rename_summary = offset_int_tagged_files_in_dir(
        files_dir=files_dir, 
        file_name_regex=file_name_regex, 
        offset_int=offset_int, 
        new_0_int=new_0_int, 
        new_dir=new_dir, 
        file_name_glob=file_name_glob, 
        copy_and_rename=copy_and_rename, 
        return_rename_summary=True
    )
    #-------------------------
    summaries_rename_summary = offset_int_tagged_files_in_dir(
        files_dir=summary_files_dir, 
        file_name_regex=summary_file_name_regex, 
        offset_int=offset_int, 
        new_0_int=new_0_int, 
        new_dir=summary_new_dir, 
        file_name_glob=summary_file_name_glob, 
        copy_and_rename=copy_and_rename, 
        return_rename_summary=True
    )
    #-------------------------
    assert(len(files_rename_summary)==len(summaries_rename_summary))
    if return_rename_summary:
        return (files_rename_summary, summaries_rename_summary)

In [None]:
def build_drct_subset_dir_from_end_events_dir(
    files_dir,
    save_dir, 
    drct_subset_type='drct_strict', 
    xfmr_equip_typ_nms_of_interest = ['TRANSFORMER, OH', 'TRANSFORMER, UG'], 
    file_path_glob = r'end_events_[0-9]*.csv', 
    file_path_regex = None, 
    outg_rec_nb_col='outg_rec_nb', 
    trsf_pole_nb_col = 'trsf_pole_nb', 
    batch_size=100, 
    verbose=True, 
    n_update=1, 
    cols_and_types_to_convert_dict=None, 
    to_numeric_errors='coerce', 
    assert_all_cols_equal=True    
):
    r"""
    From an EndEvents directory, save the subset of direct outage transformers to a new directory.
    NOTE: end_events_dfs must have trsf_pole_nb information!!!!!
            So, older versions, which don't include this information, will not work!
    NOTE: summary_files subdirectory will not be transferred. 
          So, if one needs that info, grab it from the original directory
          
    xfmr_equip_typ_nms_of_interest:
        Used only for the case of drct_subset_type=='drct_strict'
    """
    #-------------------------
    assert(drct_subset_type in ['drct_strict', 'drct'])
    #-------------------------
    paths = Utilities.find_all_paths(
        base_dir=files_dir, 
        glob_pattern=file_path_glob, 
        regex_pattern=file_path_regex
    )
    if len(paths)==0:
        print(f'No paths found in files_dir = {files_dir}')
        return None
    paths=natsorted(paths)
    #-------------------------
    save_args = dict(
        save_to_file=True, 
        save_dir=save_dir, 
        save_name=r'end_events.csv', 
        save_summary=False, 
        index=True
    )
    #-----
    save_args = GenAn.prepare_save_args(save_args)
    save_args['offset_int'] = GenAn.get_next_summary_file_tag_int(save_args)
    # This is really intended to build new directories with subsets of data
    #   ==> offset_int should be 0
    assert(save_args['offset_int']==0)
    #-------------------------
    batch_idxs = Utilities.get_batch_idx_pairs(len(paths), batch_size)
    n_batches = len(batch_idxs)    
    if verbose:
        print(f'n_paths = {len(paths)}')
        print(f'batch_size = {batch_size}')
        print(f'n_batches = {n_batches}')
    #-------------------------
    counter = 0
    for i, batch_i in enumerate(batch_idxs):
        if verbose and (i+1)%n_update==0:
            print(f'{i+1}/{n_batches}')
        i_beg = batch_i[0]
        i_end = batch_i[1]
        #-----
        # NOTE: make_all_columns_lowercase=True because...
        #   EMR would return lowercase outg_rec_nb or outg_rec_nb_gpd_for_sql
        #   Athena maintains the original case, and does not conver to lower case,
        #     so it returns OUTG_REC_NB or OUTG_REC_NB_GPD_FOR_SQL
        end_events_df_i = GenAn.read_df_from_csv_batch(
            paths=paths[i_beg:i_end], 
            cols_and_types_to_convert_dict=cols_and_types_to_convert_dict, 
            to_numeric_errors=to_numeric_errors, 
            make_all_columns_lowercase=True, 
            assert_all_cols_equal=assert_all_cols_equal
        )
        if end_events_df_i.shape[0]==0:
            continue
        #-------------------------
        if outg_rec_nb_col not in end_events_df_i.columns.tolist():
            outg_rec_nb_col = f'{outg_rec_nb_col}_gpd_for_sql'
        assert(outg_rec_nb_col in end_events_df_i.columns.tolist())
        #-----
        if trsf_pole_nb_col not in end_events_df_i.columns.tolist():
            trsf_pole_nb_col = f'{trsf_pole_nb_col}_gpd_for_sql'
        assert(trsf_pole_nb_col in end_events_df_i.columns.tolist())
        #-------------------------
        cols_b4_merge = end_events_df_i.columns.tolist()
        end_events_df_i = DOVSOutages.append_outg_info_to_df(
            df=end_events_df_i, 
            outg_rec_nb_idfr=outg_rec_nb_col, 
            contstruct_df_args=None, 
            build_sql_function = DOVSOutages_SQL.build_sql_outage, 
            build_sql_function_kwargs=dict(
                include_DOVS_EQUIPMENT_TYPES_DIM=True, 
                cols_of_interest=['OUTG_REC_NB', 'LOCATION_ID'], 
                select_cols_DOVS_EQUIPMENT_TYPES_DIM=['EQUIP_TYP_NM']
            )
        )
        #-------------------------
        if drct_subset_type=='drct_strict':
            end_events_df_i = end_events_df_i[(end_events_df_i['LOCATION_ID']==end_events_df_i[trsf_pole_nb_col]) & 
                                              (end_events_df_i['EQUIP_TYP_NM'].isin(xfmr_equip_typ_nms_of_interest))]
        elif drct_subset_type=='drct':
            end_events_df_i = end_events_df_i[end_events_df_i['LOCATION_ID']==end_events_df_i[trsf_pole_nb_col]]
        else:
            assert(0)
        #-------------------------
        # Drop the columns which were added from DOVS (via DOVSOutages.append_outg_info_to_df) and used to 
        #   identify drct or drct_strict types
        end_events_df_i = end_events_df_i.drop(columns=set(end_events_df_i.columns.tolist()).difference(set(cols_b4_merge)))
        #-------------------------
        save_args_i = copy.deepcopy(save_args)
        batch_int = counter + save_args_i['offset_int']
        save_args_i['save_name'] = Utilities.append_to_path(save_args_i['save_name'], appendix=f'_{batch_int}', 
                                                            ext_to_find=save_args_i['save_ext'], append_to_end_if_ext_no_found=True)
        # Call prepare_save_args again to compile save_path, save_summary_path, etc.                                          
        save_args_i = GenAn.prepare_save_args(save_args_i, make_save_dir_if_dne=True)        
        #-----
        end_events_df_i.to_csv(save_args_i['save_path'], index=save_args_i['index'])
        #-------------------------
        counter += 1


# -----------------------------------------------------------------------------------------------
# !!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!
# -----------------------------------------------------------------------------------------------

In [None]:
#----------------------------------------------------------------------------------------------------
# VARIABLES TO BE SET BY USER!
#----------------------------------------------------------------------------------------------------
save_dfs_to_file   = False
read_dfs_from_file = True
save_end_events    = True

#-------------------------
# run_date is used to collect all results from a given acquisiton run together.
# As such, run_date should be set to the first date of the acquisition run, and
#   SHOULD NOT be changed for each individual date in a run (which typically lasts
#   over the course of days/weeks)
run_date = '20231004'

#-------------------------
date_0 = '2023-04-01'
date_1 = '2023-09-30'
search_time_half_window = datetime.timedelta(days=31)

#--------------------------------------------------
# NOTE: below, states and opcos should be consistent!
#       i.e., e.g., if states='OH', then opcos should be 'oh' (or None, I suppose)
#-------------------------
# states used to 
#   (1) find transformers which suffered at least one outage from DOVS
#   (2) find all transformers from MeterPremise
# states can be:
#   - a single string, e.g. 'OH'
#   - a list of strings, e.g., ['OH', 'WV']
#   - None
# NOTE: states tend to be upper-case!
states=['OH']

#-------------------------
# opcos used with AMIEndEvents to
#  (1) find the premise numbers which recorded an event between date_0 and date_1.
#  (2) selection/acquisiton of end_device_events
# opcos can be:
#   - a single string, e.g. 'oh'
#   - a list of strings, e.g., ['oh', 'tx']
#   - None
# NOTE: opcos tend to be lower-case!
# NOTE: Acceptable opcos appear to be: ['ap', 'im', 'oh', 'pso', 'swp', 'tx']
opcos='oh'

In [None]:
#----------------------------------------------------------------------------------------------------
#----------------------------------------------------------------------------------------------------
# DFs will be saved in save_dir_base
# Collection of end events files will be saved in os.path.join(save_dir_base, 'EndEvents')
save_dir_base = os.path.join(
    Utilities.get_local_data_dir(), 
    r'dovs_and_end_events_data', 
    run_date, 
    f"{date_0.replace('-','')}_{date_1.replace('-','')}", 
    'Outgs_Full'
)
#-------------------------
end_events_save_args = dict(
    save_to_file=save_end_events, 
    save_dir = os.path.join(save_dir_base, 'EndEvents'), 
    save_name=r'end_events.csv', 
    index=True
)
#-------------------------
print(f"save_dir_base = {save_dir_base}")
print('end_events_save_args')
for k,v in end_events_save_args.items():
    print(f"\t{k} : {v}")
#-------------------------
if save_dfs_to_file or save_end_events:
    if not os.path.exists(save_dir_base):
        os.makedirs(save_dir_base)
    #-----
    if save_end_events and not os.path.exists(end_events_save_args['save_dir']):
        os.makedirs(end_events_save_args['save_dir'])

In [None]:
#----------------------------------------------------------------------------------------------------
#----------------------------------------------------------------------------------------------------
assert(save_dfs_to_file+read_dfs_from_file <=1) # Should never both read and write!
assert(pd.to_datetime(date_1)-pd.to_datetime(date_0) > 2*search_time_half_window)
#--------------------------------------------------
if not read_dfs_from_file:
    conn_outages = Utilities.get_utldb01p_oracle_connection()
    conn_aws = Utilities.get_athena_prod_aws_connection()

# ---------------------------------------------------------------
# OUTAGES
# ---------------------------------------------------------------

In [None]:
#----------------------------------------------------------------------------------------------------
# Find outages between date_0 and date_1 for states
#----------------------------------------------------------------------------------------------------
start=time.time()
print('-----'*20+f'\nFinding outages between {date_0} and {date_1} for states={states}\n'+'-----'*10)
if read_dfs_from_file:
    print(f"Reading df_outage_OG from file: {os.path.join(save_dir_base, 'df_outage_OG.pkl')}")
    df_outage_OG = pd.read_pickle(os.path.join(save_dir_base, 'df_outage_OG.pkl'))
else:
    sql_outage_full = DOVSOutages_SQL.build_sql_std_outage(
        mjr_mnr_cause=None, 
        include_premise=True, 
        date_range=[date_0, date_1], 
        states=states
    ).get_sql_statement()
    #-----
    print(f'sql_outage_full:\n{sql_outage_full}\n\n')
    #-----
    df_outage_OG = pd.read_sql_query(
        sql_outage_full, 
        conn_outages, 
        dtype={
            'CI_NB':np.int32, 
            'CMI_NB':np.float64, 
            'OUTG_REC_NB':np.int32
        }
    )
    if save_dfs_to_file:
        df_outage_OG.to_pickle(os.path.join(save_dir_base, 'df_outage_OG.pkl'))
#-----
print(f"df_outage_OG.shape = {df_outage_OG.shape}")
print(f"# OUTG_REC_NBs     = {df_outage_OG['OUTG_REC_NB'].nunique()}")
print(f'\ntime = {time.time()-start}\n'+'-----'*20)

In [None]:
if read_dfs_from_file:
    # No real reason to read in df_mp_outg_OG, as it's not used after df_mp_outg is built
    # df_mp_outg_OG = pd.read_pickle(os.path.join(save_dir_base, 'df_mp_outg_b4_dupl_rmvl.pkl'))
    df_mp_outg = pd.read_pickle(os.path.join(save_dir_base, 'df_mp_outg_full.pkl'))
else:
    df_mp_outg_OG = build_active_MP_for_outages_df(
        df_outage=df_outage_OG, 
        prem_nb_col='PREMISE_NB', 
        is_slim=False, 
        assert_all_PNs_found=False
    )
    #-----
    df_mp_outg_OG['inst_ts'] = pd.to_datetime(df_mp_outg_OG['inst_ts'])
    df_mp_outg_OG['rmvl_ts'] = pd.to_datetime(df_mp_outg_OG['rmvl_ts'])
    #-----
    if save_dfs_to_file:
        df_mp_outg_OG.to_pickle(os.path.join(save_dir_base, 'df_mp_outg_b4_dupl_rmvl.pkl'))
    #-------------------------
    df_mp_outg = MeterPremise.drop_approx_mp_duplicates(
        mp_df = df_mp_outg_OG.copy(), 
        fuzziness=pd.Timedelta('1 hour'), 
        assert_single_overlap=True, 
        addtnl_groupby_cols=['OUTG_REC_NB'], 
        gpby_dropna=False
    )
    #-----
    if save_dfs_to_file:
        df_mp_outg.to_pickle(os.path.join(save_dir_base, 'df_mp_outg_full.pkl'))

In [None]:
df_outage = DOVSOutages.merge_df_outage_with_mp(
    df_outage=df_outage_OG.copy(), 
    df_mp=df_mp_outg, 
    merge_on_outg=['OUTG_REC_NB', 'PREMISE_NB'], 
    merge_on_mp=['OUTG_REC_NB', 'prem_nb'], 
    cols_to_include_mp=None, 
    drop_cols = None, 
    rename_cols=None, 
    inplace=True
)

In [None]:
df_outage

In [None]:
# Below, 'prim' stands for primary, and means the meters are connected directly to a transformer pole causing an outage
df_outage_prim = df_outage[df_outage['LOCATION_ID']==df_outage['trsf_pole_nb']].copy()

# Below, 'prim_strict' stands for primary strict, and means the  meters are connected directly 
# to a transformer pole causing an outage, AND the equipment type causing the outage is a transformer
# (exact EQUIP_TYP_NMs given in xfmr_equip_typ_nms_of_interest)
xfmr_equip_typ_nms_of_interest = ['TRANSFORMER, OH', 'TRANSFORMER, UG']
df_outage_prim_strict = df_outage[(df_outage['LOCATION_ID']==df_outage['trsf_pole_nb']) & 
                                  (df_outage['EQUIP_TYP_NM'].isin(xfmr_equip_typ_nms_of_interest))].copy()

print(df_outage.shape)

# -----------------------------------------------------------------------------------------------
# !!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!
# -----------------------------------------------------------------------------------------------

In [None]:
df_outage_slim_OLD             = DOVSOutages.consolidate_df_outage_OLD(df_outage)
df_outage_prim_slim_OLD        = DOVSOutages.consolidate_df_outage_OLD(df_outage_prim)
df_outage_prim_strict_slim_OLD = DOVSOutages.consolidate_df_outage_OLD(df_outage_prim_strict)

In [None]:
# df_outage_slim_OLD             = DOVSOutages.consolidate_df_outage_OLD(df_outage)
# df_outage_prim_slim_OLD        = DOVSOutages.consolidate_df_outage_OLD(df_outage_prim)
# df_outage_prim_strict_slim_OLD = DOVSOutages.consolidate_df_outage_OLD(df_outage_prim_strict)


df_outage_slim             = DOVSOutages.consolidate_df_outage(
    df_outage, 
    addtnl_grpby_cols=['trsf_pole_nb'], 
    set_outg_rec_nb_as_index=False
)
df_outage_prim_slim        = DOVSOutages.consolidate_df_outage(
    df_outage_prim, 
    addtnl_grpby_cols=['trsf_pole_nb'], 
    set_outg_rec_nb_as_index=False
)
df_outage_prim_strict_slim = DOVSOutages.consolidate_df_outage(
    df_outage_prim_strict, 
    addtnl_grpby_cols=['trsf_pole_nb'], 
    set_outg_rec_nb_as_index=False
)

# Save CSVs if save_dfs_to_file = True

In [None]:
if save_dfs_to_file:
    df_outage.to_pickle(os.path.join(save_dir_base, 'df_outage.pkl'))
    df_outage_prim.to_pickle(os.path.join(save_dir_base, 'df_outage_prim.pkl'))
    df_outage_prim_strict.to_pickle(os.path.join(save_dir_base, 'df_outage_prim_strict.pkl'))
    #-----
    df_outage_slim.to_pickle(os.path.join(save_dir_base, 'df_outage_slim.pkl'))
    df_outage_prim_slim.to_pickle(os.path.join(save_dir_base, 'df_outage_prim_slim.pkl'))
    df_outage_prim_strict_slim.to_pickle(os.path.join(save_dir_base, 'df_outage_prim_strict_slim.pkl'))

# -----------------------------------------------------------------------------------------------
# !!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!
# -----------------------------------------------------------------------------------------------

In [None]:
# df_outage_slim = pd.read_pickle(os.path.join(save_dir_base, 'df_outage_slim.pkl'))

In [None]:
df_outage_slim = DOVSOutages.set_search_time_in_outage_df(
    df_outage=df_outage_slim, 
    search_time_half_window=search_time_half_window
)

In [None]:
print(df_outage_slim['OUTG_REC_NB'].nunique())
print(len(DOVSOutages.get_prem_nbs_from_consolidated_df_outage(df_outage_slim)))

In [None]:
del df_outage_OG
del df_outage
del df_outage_prim
del df_outage_prim_strict

In [None]:
#-------------------------
usg_split_to_CTEs=True
df_construct_type=DFConstructType.kRunSqlQuery
contstruct_df_args_end_events=None
addtnl_groupby_cols=['OUTG_REC_NB', 'trsf_pole_nb']

cols_of_interest_end_dev_event = ['*']
# batch_size=10
batch_size=30
verbose=True
n_update=1

In [None]:
cols_of_interest_end_dev_event

In [None]:
end_events_sql_function_kwargs = dict(
    cols_of_interest=cols_of_interest_end_dev_event, 
    df_outage=df_outage_slim, 
    date_only=True, 
    split_to_CTEs=usg_split_to_CTEs, 
    join_mp_args=False, 
    df_args = dict(
        addtnl_groupby_cols=addtnl_groupby_cols, 
        mapping_to_ami={'PREMISE_NBS':'premise_nbs'}, 
        is_df_consolidated=True
    ), 
    field_to_split='df_outage', 
    field_to_split_location_in_kwargs=['df_outage'], 
    save_and_dump=True, 
    sort_coll_to_split=True,
    batch_size=batch_size, verbose=verbose, n_update=n_update
)
addtnl_end_events_sql_function_kwargs = dict(
    build_sql_function_kwargs=dict(
        schema_name='meter_events', 
        table_name='events_summary_vw', 
        opco=opcos
    )
)
end_events_sql_function_kwargs = {**end_events_sql_function_kwargs, 
                                  **addtnl_end_events_sql_function_kwargs}

In [None]:
# start=time.time()
# end_events = AMIEndEvents(
#     df_construct_type=df_construct_type, 
#     contstruct_df_args = contstruct_df_args_end_events, 
#     build_sql_function=AMIEndEvents_SQL.build_sql_end_events_for_outages, 
#     build_sql_function_kwargs=end_events_sql_function_kwargs, 
#     init_df_in_constructor=True, 
#     save_args=end_events_save_args
# )
# end_events_build_time = time.time()-start

In [None]:
start=time.time()

while True:
    try:
        end_events = AMIEndEvents(
            df_construct_type=df_construct_type, 
            contstruct_df_args = contstruct_df_args_end_events, 
            build_sql_function=AMIEndEvents_SQL.build_sql_end_events_for_outages, 
            build_sql_function_kwargs=end_events_sql_function_kwargs, 
            init_df_in_constructor=True, 
            save_args=end_events_save_args
        )
        break # stop the loop if the function completes sucessfully
    except Exception as e:
        print("Function errored out!", e)
        print("Retrying ... ")
        
end_events_build_time = time.time()-start
print(end_events_build_time)