# LIBS

!pip install import_ipynb
!pip install awswrangler
!pip install tensorflow
#!pip install tensorflow==2.11.0 
!pip install keras
!pip install fastparquet

In [1]:
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
plt.style.use('datalobsterplot.mplstyle')
plt.rcParams["figure.figsize"] = (20,5)
import awswrangler as wr


## COSMETICS

In [2]:
# CONFIGURE PANDAS TO DISPLAY FULL TEXT IN CELLS
pd.set_option('display.max_colwidth', None)

In [3]:
# GET RID OF GRAY SPACES

from IPython.display import display, HTML, clear_output
display(HTML(
    '<style>'
        '#notebook { padding-top:0px !important; } ' 
        '.container { width:100% !important; } '
        '.end_space { min-height:0px !important; } '
    '</style>'
))

## ML LIBs

In [4]:
import keras
from keras import layers
from keras.models import Sequential
from keras.layers import Dense,Input, Conv1D, MaxPooling1D, UpSampling1D
from keras.layers import LSTM, Input, Dropout
from keras.layers import RepeatVector
from keras.layers import TimeDistributed
from keras.regularizers import l1, l2

2023-09-12 14:59:05.895250: I tensorflow/core/platform/cpu_feature_guard.cc:182] This TensorFlow binary is optimized to use available CPU instructions in performance-critical operations.
To enable the following instructions: AVX2 AVX512F FMA, in other operations, rebuild TensorFlow with the appropriate compiler flags.


In [14]:
from sklearn.preprocessing import MinMaxScaler, StandardScaler
from sklearn.model_selection import train_test_split

In [15]:
from tensorflow.keras.models import Model
import tensorflow as tf

In [16]:
from scipy.special import expit

In [17]:
#Save Model
import tempfile
import boto3 as bt
import joblib
#Save plot
import io
#
from tqdm import tqdm
#
from IPython.display import clear_output
#
from statistics import mean
#
from datetime import datetime
import time

# READ DATA

In [18]:
# GET THE DATA USING YEAR MONTH DAY - GET CERTAIN DAYS OF 1 MONTH (DEPRECATED!!)
def read_days_files(path,year,month,days,time_column,file_format=".parquet",use_columns=["tm","m_rms","a_rms","l"]):
    dfs=[]
    for day in days:
        file_paths = wr.s3.list_objects(path.format(year,month,day))
        if len(file_paths)>0:
            for file_path in file_paths:
                df=pd.read_parquet(file_path,columns=use_columns)
                dfs.append(df)
    if len(dfs)>0:
        df= pd.concat(dfs)
        df.sort_values(time_column,inplace=True)
        df.reset_index(drop=True,inplace=True)
        return df
    else:
        return pd.DataFrame([])

In [20]:
# READ DATA PARQUETS 


def read_dates_files(path,dates,time_column,file_format=".parquet",use_columns=["tm","m_rms","a_rms","l"]):
    """
    Reads days of parquet files and from a list of dates
    Arguments:
        path: path of the parquet files, name must be of format ending with /{}/{}/{}/ for the year month and day to be filled
        dates: list of dates
        time_column: which column of the parquet file will be transformed to a date time and for sorting
        file_format: csv of pdf etc...
        use_colums: which columns will be extracted from the parquet file
    Returns:
        A concatenated pandas dataframe that contains the selected columns and the selected dates. 
    """
    dfs=[]
    for date in dates:
        date=pd.to_datetime(date)
        year=str(date.year).zfill(2)
        month=str(date.month).zfill(2)
        day=str(date.day).zfill(2)
        
        file_paths = wr.s3.list_objects(path.format(year,month,day))

        if len(file_paths)>0:
            for file_path in file_paths:
                try:
                    df=pd.read_parquet(file_path,columns=use_columns)
                    dfs.append(df)
                except Exception as E:
                    print("Dosya okunamadı "+"_".join([year,month,day]))
                    print(E)
     
    if len(dfs)>0:
        df= pd.concat(dfs)
        df.sort_values(time_column,inplace=True)
        df.reset_index(drop=True,inplace=True)
        return df
    else:
        return pd.DataFrame([])

In [8]:
# READ DATA BETWEEN START AND END (THIS ONE IS USED)

def read_spesific_files(path,start_end_df,use_cols,time_column):
    """
    Reads time intervals from parquet files, 
    Arguments:
        path: path of the parquet files, name must be of format ending with /{}/{}/{}/ for the year month and day to be filled
        start_end_df: pandas dataframe that must have start and end columns defining the desired intervals
        use_cols: which columns will be extracted from the parquet file
        time_column: which column of the parquet file will be transformed to a date time and for sorting

    Returns:
        A concatenated pandas dataframe that contains the selected columns and the selected time intervals. 
    """
    
    starts=start_end_df.start
    ends=start_end_df.end
    
    dfs=[]

    for i in range(len(starts)):
        dates=pd.date_range(starts.iloc[i].date(),ends.iloc[i].date())
        df=read_dates_files(path,dates,time_column,file_format=".parquet",use_columns=use_cols)
        if time_column in df.columns:
            df=df.loc[(ends.iloc[i]>df[time_column])&(df[time_column]>starts.iloc[i])] # the interval
            dfs.append(df)
    
    if len(dfs)>0:
        df= pd.concat(dfs)
        df.sort_values(time_column,inplace=True)
        df.reset_index(drop=True,inplace=True)
        return df
    else:
        return pd.DataFrame([])

# General

In [21]:
# USED TO WRITE THE MODELS, 

def get_s3_Bucketname_and_Key(path):
    """
    Split the path text and extracth bucket name, key and filename, 
    Arguments:
        path: path of the parquet file

    Returns:
        bucket name, key and filename
    """
    s3_bucket_name=path.split("/")[2]
    s3_key="/".join(path.split("/")[3:])
    s3_file_name="/".join(path.split("/")[-1:])
    return s3_bucket_name,s3_key,s3_file_name

In [1]:
def get_existing_subasset_train_cols(cols,path,start_end_df,time_column):
    
    """
    Get the colum names of a specific path, 
    Arguments:
        cols: the columns that we want to train
        path: path of the s3 file /{}/{}/{}/
        start_end_df: dataframe containing the start and end times
        time_column: which column of the parquet file will be transformed to a date time and for sorting

    Returns:
        Returns a list of columns that exist among the ones entered as input to the function. 
        If a desired column does not exist in the data, it will be excluded from the original list. 
        Ideally, the input and output will be the same.
        
    """
    existing_train_cols=cols.copy()
    for i in range(len(start_end_df)):
        df=read_spesific_files(path,start_end_df.iloc[i:i+1],use_cols=None,time_column=time_column)
        if len(df)==0:
            continue
        subasset_one_cycle_all_cols=df.columns
        subasset_one_cycle_existing_cols=[col for col in cols if col in subasset_one_cycle_all_cols]
        existing_train_cols=[col for col in subasset_one_cycle_existing_cols if col in existing_train_cols]
    
    return existing_train_cols

#### Lınear Reconstructed Columns 

In [15]:
def change_shape(np_arr):
    """
    Reshape the 3 dimensianl array to 2 dimensional array. The arrays are already 2 dimensional with the 3rd dimension haveing a single column  
    Arguments:
        np_arr: input array

    Returns:
        2 reshaped array. 
    """
    np_arr=np_arr.reshape(np_arr.shape[0],np_arr.shape[1])
    return np_arr

In [16]:
def reconstructed_array_to_signal_mean(X_,window_size=128):
    """
    Create the reconstructed signal using the mean:
        XXXXXXXX
         XXXXXXXX
           XXXXXXX
            |
            mean of this column of values is registered as the reconstructed signal at that location. 
            
        For the beginning and the end, the mean is taken on a smaller number of signals, 1,2,3 ... untill windowsize
    Arguments:
        X_: List of reconstructed signal segments. Each element is a list of [windowsize]
            [
                [T1,T2,T3]
                [T2,T3,T4]
                [T3,T4,T5]
                ...
            
            ]
            T is for time
        window_size: Size of the window

    Returns:
        One single list for the reconstruction
    """
    array=np.zeros((len(X_)+(window_size-1)))
    for i in range(len(X_)):
        array[i:window_size+i]+=X_[i]
    for i in range(len(array)):
        if i < window_size:
            array[i]/=(i+1)
        elif i> len(array)-window_size:
            array[i]/=len(array)-i
        else:
            array[i]/=window_size
    
    return array

In [None]:
def reconstructed_array_to_signal_variance(X_,window_size=128):
    
    """
    Create the reconstructed signal using the variance:
        XXXXXXXX
         XXXXXXXX
           XXXXXXX
            |
            variance of this column of values is registered as the reconstructed signal at that location. 
            
        For the beginning and the end, the variance is taken on a smaller number of signals, 1,2,3 ... untill windowsize
    Arguments:
        X_: List of reconstructed signal segments. Each element is a list of [windowsize]
            [
                [T1,T2,T3]
                [T2,T3,T4]
                [T3,T4,T5]
                ...
            
            ]
            T is for time
        window_size: Size of the window

    Returns:
        One single list for the containing the variance
    """
    array=[[] for i in range(len(X_)+(window_size-1))]
    
    for i in range(len(X_)):
        for j in range(window_size):
            array[i+j].append(X_[i][j])
    
    for i in range(len(array)):
        array[i]=np.var(array[i])
    
    for i in range(window_size-1):
        array[i]=0
        array[i*-1-1]=0
    array=np.array(array)
    return array

#### VIZUALIZATION

In [28]:
def plot_multi_cols(df,cols,time_column):   # DEPRECATED
    
    """
    Plot multiple columns with respect to time, 
    Arguments:
        df: the dataframe
        cols: the columns that we want to plot
        time_column: which column of the df  will be transformed to a date time 

    Returns:
       No return value, show plot.
        
    """
    
    
    df.index=df[time_column]  
    for col in cols:
        df[col].plot(label=col)
        plt.legend()
        plt.show()
    df.reset_index(inplace=True,drop=True)

In [27]:
def plot_twinx(data1,data2,timedata,title,data1_ylabel,data2_ylabel,is_save_plot,plot_vertical_line,save_bucket=None,save_key=None,save_format="pdf",vertical_line_points=None):
    """
    Create a plot with 2 y axes
    Arguments:
        data1: series ax.plot compatible for left side
        data2: series ax.plot compatible for right side
        timedata: x axis values for the plot
        title: title of the graph
        data1_ylabel: y label for the left hand data
        data2_ylabel: y label for the right hand data
        is_save_plot: boolena indicating if the plot should be saved
        plot_vertical_line: add vertical lines or not
        save_bucket: bucket where the output graph will be saved
        save_key: key of the saved file
        save_format="pdf": format of the output file
        vertical_line_points: list of vertical lines (x axis coordonnates)

    Returns:
       No return value, show plot and save (optional)
        
    """
    
    
    
    colors =plt.rcParams['axes.prop_cycle'].by_key()['color']
    fig, ax1 = plt.subplots()
    
    color = colors[0]
    ax1.bar(timedata, data1, color = color,width=0.99)
    ax1.set_xlabel('time (s)')
    ax1.set_ylabel(data1_ylabel, color = color)
    ax1.tick_params(axis ='y', labelcolor = color)
    ax2 = ax1.twinx()
   
    color = colors[1]
    ax2.set_ylabel(data2_ylabel, color = color)
    ax2.bar(timedata, data2, color = color)
    ax2.tick_params(axis ='y', labelcolor = color)
       
    if  plot_vertical_line==True:
        for i in range(len(vertical_line_points)):
            if i==0:
                plt.axvline(x = vertical_line_points[i] , color = 'b', label = 'axvline - full height')
            else:
                plt.axvline(x = vertical_line_points[i] , color = 'r', label = 'axvline - full height')


    fig.suptitle(title, fontweight ="bold")
    if is_save_plot==True:
        save_plot(save_bucket,save_key,save_format)
    
    plt.show()

In [26]:
def save_plot(bucket,key,save_format):
    """
    Save a plot to a specific bucket, plot must be aloready plotted 
    Arguments:
        bucket: bucket location
        key: key
        save_format: file format of the saved plot 

    Returns:
       No return value, saves plot to bucket.
        
    """
    
    
    
    print(bucket,key)
    img_data = io.BytesIO()
    plt.savefig(img_data, format=save_format)
    img_data.seek(0)
    s3 = bt.resource('s3')
    bucket = s3.Bucket(bucket)
    bucket.put_object(Body=img_data, ContentType='image/{}'.format(save_format), Key=key)

# Save Model and Scaler

In [1]:
def save_all_models_info_totable(table_path,train_info_dict,cust_features_dict):
    """
    Read model ifnormation from a table, update the table with the new model training information and customer information, then save the model informations for later search and model retrieval to a parquet file. 
    Arguments:
        table_path: the original table to be updated
        train_info_dict: dictionary that hold the information to be inserted in to the table
        cust_features_dict: contains customer infomation such as location, asset, subasset

    Returns:
        Returns the latest addition to the table.
        
    """
    
    df=pd.read_parquet(table_path)
    
    columns_notIn_table=[ column for column in train_info_dict["features_for_model_info_table"] if column not in df.columns ]
    df[columns_notIn_table]=None
    
    train_info_dict.update(cust_features_dict)
    features_for_model_info_dict={key: train_info_dict[key] for key in train_info_dict["features_for_model_info_table"]}
    features_for_model_info_dict.update({"train_cols":"-".join(train_info_dict["train_cols"])})
    new_row_df=pd.DataFrame(features_for_model_info_dict,index=[-1])
    df=pd.concat([df,new_row_df])
    
    df.index = df.index + 1
    df = df.sort_index()
    
    df.to_parquet(table_path)
    
    return df.iloc[0:1]

In [42]:
def save_model_with_info(train_info_dict):
    """
    Gets a train information dictionary, extracts the file location information etc and saves to the appropriate locations. 
    This function should be run after the training of a model. 
    trained columns
    trained time intervals
    model in h5 format
    scaler in gz format
    training parameters: window size etc...
    
    
    Arguments:
        train_info_dict: contains the training parameters, customer information such as geolocation, model and model information


    Returns:
       Returns the updated train_info_dict with updated tools_path, customer features dictionary 
        
    """
    
    train_info_dict,cust_features_dict=create_model_and_scaler_name(train_info_dict)
    
    tools_path=train_info_dict["tools_path_prefix"]+cust_features_dict["cust_path"]+train_info_dict["model_name"]+"/"
    train_info_dict.update({"tools_path":tools_path})
    
    ####
    tools_path_s3_bucket_name,tools_path_s3_key,_=get_s3_Bucketname_and_Key(tools_path)
    plot_s3_key=tools_path_s3_key+"loss_history."+train_info_dict["plot_save_format"]
    plot_twinx(train_info_dict["loss_history"],train_info_dict["loss_history"],range(len(train_info_dict["loss_history"])),"loss-validation_loss","val_loss_history","loss_history",is_save_plot=True,save_bucket=tools_path_s3_bucket_name,save_key=plot_s3_key,save_format=train_info_dict["plot_save_format"],plot_vertical_line=False) 
    
    save_model_or_scaler_s3(train_info_dict["model"],train_info_dict["tools_path"],train_info_dict["model_name"])
    save_model_or_scaler_s3(train_info_dict["scaler"],train_info_dict["tools_path"],train_info_dict["scaler_name"])
    train_info_dict["train_data_info"].to_parquet(train_info_dict["tools_path"]+"train_data_info.parquet")
    pd.DataFrame(train_info_dict["train_cols"],columns=["train_cols"]).to_parquet(train_info_dict["tools_path"]+"train_cols.parquet")
    pd.DataFrame([train_info_dict["time_column"]],columns=["time_column"]).to_parquet(train_info_dict["tools_path"]+"time_column.parquet")
    
    return train_info_dict, cust_features_dict

In [2]:
def create_model_and_scaler_name(train_info_dict):
    """
    Creates a name for the scaler and the model, the model name if formed using the "model_name_content_inorder" structure. Later updates the model and scaler names from the training parameters
    
    Arguments:
        train_info_dict: contains the training parameters, customer information such as geolocation, model and model information


    Returns:
       Returns the updated train_info_dict with updated tools_path, customer features dictionary 
        
    """
    
    cust_features_dict=get_cust_features(train_info_dict["read_path"], train_info_dict["cust_list"])
    train_info_dict.update(cust_features_dict)
    if type(cust_features_dict)==dict:
        
        train_features_dict={key: train_info_dict[key] for key in train_info_dict["model_name_content_inorder"]}
        scaler_name="_".join([str(train_feature) for train_feature in train_features_dict.values() ])+"_scaler"+".gz"
        model_name="_".join([str(train_feature) for train_feature in train_features_dict.values() ])+"_model"+".h5"
        
        train_info_dict.update({"model_name":model_name,"scaler_name":scaler_name})
                    
        return train_info_dict,cust_features_dict

In [1]:
def get_cust_features(path,cust_list=["nurol","kutahya-ser","celik-halat"]):

    """
    Scans the path untill one of the items in the customer list is found (in the subfolder structure) and creates a customer features dictionary
    
    Arguments:
        path: path
        cust_list: list of customer names that we want to detect


    Returns:
       Returns the updated train_info_dict with updated tools_path, customer features dictionary 
        
    """
    for cust in cust_list:
        name_start_index=path.find(cust)
        if name_start_index!=-1:
            cust_features_list=path[name_start_index:].split("/")[:5]
            cust_features_names=["cust","country","location","asset","subasset"]
            cust_features_dict={cust_features_names[i]: cust_features_list[i] for i in range(len(cust_features_list))}
            cust_path="".join([feature+"/" for feature in cust_features_dict.values() ])
            cust_features_dict.update({"cust_path":cust_path})
            return cust_features_dict

In [6]:
def save_model_or_scaler_s3(model_or_scaler,save_path,model_or_scaler_name):
    """
    Save the model or the scaler to an s3 bucket. 
    Arguments:
        model_or_scaler: the model or the scaler object to be saved in h5 or gz format
        save_path: location to save the model
        model_or_scaler_name: name 

    Returns:
        No return value.
        
    """
    try:
        s3_client = bt.client('s3')
        save_path_s3_bucket_name,save_path_s3_key,_=get_s3_Bucketname_and_Key(save_path)
        model_or_scaler_s3_key=save_path_s3_key+model_or_scaler_name
        with tempfile.TemporaryFile() as fp:
            joblib.dump(model_or_scaler, fp)
            fp.seek(0)
            s3_client.put_object(Body=fp.read(), Bucket=save_path_s3_bucket_name, Key=model_or_scaler_s3_key)
        clear_output()
        print("Successfully Saved to {}".format(save_path))
    except Exception as E:
        print(E)
        print("Unsuccessfully Saved")

In [6]:
def read_model_or_scaler_s3(model_or_scaler_read_path,model_or_scaler_name):
    """
    Read the model in h5 format or scaler in gz format from the specified path and model name and return the model or scaler object
    Arguments:
        model_or_scaler_read_path: path of the model directory
        model_or_scaler_name: the name of the model or scaler file in h5 or gz format
    Returns:
        Returns the model or scaler object.
        
    """
    try:
        s3_client = bt.client('s3')
        read_path_s3_bucket_name,read_path_s3_key,_=get_s3_Bucketname_and_Key(model_or_scaler_read_path)
        model_or_scaler_s3_key=read_path_s3_key+model_or_scaler_name
        with tempfile.TemporaryFile() as fp:
            s3_client.download_fileobj(Fileobj=fp, Bucket=read_path_s3_bucket_name, Key=model_or_scaler_s3_key)
            fp.seek(0)
            model_or_scaler = joblib.load(fp)
        clear_output()
        print("Successfully Read")
        return model_or_scaler
    except: 
        print("Unsuccessfully Read")

In [299]:
def delete_model_or_scaler_s3(model_or_scaler_read_path_and_name): #(UNTESTED)
    """
    Deletes the specified model
    
    Arguments:
        model_or_scaler_read_path: path of the model directory AND filename
    Returns:
        Returns nothing.
        
    """
    try:
        s3_client = bt.client('s3')
        s3_bucket_name,s3_key,s3_file_name=get_s3_Bucketname_and_Key(model_or_scaler_read_path_and_name)
        s3_client.delete_object(Bucket=s3_bucket_name, Key=s3_key)
        print("Successfully Deleted")
    except: 
        print("Unsuccessfully Deleted")

# Train Funcs

In [1]:
def train_model_generic_on_batches(train_info_dict):    # must be modified if non aut encoder
    """
    Trains an auto encoder model, all model and training info are contained in the train_info_dict dictionary
    
    Arguments:
        train_info_dict: all information of the model and training
    Returns:
        train_info_dict: updated train info dictionary
        loss_history: loss information 
        val_loss_history: validation loss history
        all_loss: empty list, not used
    """
    early_stopping_patience=train_info_dict["early_stopping_patience"]
    loss_history=[]
    val_loss_history=[]
    all_loss=[]
    
    for epoch in range(train_info_dict["n_epochs"]):
        print(epoch)
        batch_loss_history=[]
        for i in tqdm(range(len(train_info_dict["train_data_info"]))):
            train_df=read_spesific_files(train_info_dict["read_path"],start_end_df=train_info_dict["train_data_info"].iloc[i:i+1],use_cols=train_info_dict["use_cols"],time_column=train_info_dict["time_column"])
            if len(train_df)==0:
                continue
                
            if train_info_dict["rolling_window_time"]!=None:
                train_df.index=train_df[train_info_dict["time_column"]]
                train_cols_func_dict={column:[train_info_dict["rolling_window_func"]] for column in train_info_dict["train_cols"]}                
                train_df.loc[:,train_info_dict["train_cols"]]=train_df[train_info_dict["train_cols"]].rolling(train_info_dict["rolling_window_time"]).agg(train_cols_func_dict)
                train_df.columns=train_df.columns.get_level_values(0)
                train_df.reset_index(drop=True,inplace=True)
            
            train_df_segment_step=int(len(train_df)/train_info_dict["train_df_segment_num"])
                                      
            for df_segment_order in range(train_info_dict["train_df_segment_num"]):
                X_normal,batch_num=create_segment_data(train_df,df_segment_order,train_df_segment_step,train_info_dict)
                if train_info_dict["shuffle"]==True:
                    np.random.shuffle(X_normal)
                for batch_order in range(batch_num):
                    train_info_dict,batch_loss=train_model_on_batch(X_normal,batch_order,train_info_dict) #X_normal = x normal data CHANGE IF NOT AN AUTOENCODER
                    batch_loss_history.append(batch_loss)
    
        loss_history.append(mean(batch_loss_history))
        
        #val_loss= model.evaluate(X_valid, expit(X_valid))
        #val_loss_history.append(val_loss)
        clear_output(wait=True)
        plot_twinx(loss_history,loss_history,range(len(loss_history)),"loss-validation_loss","val_loss_history","loss_history",is_save_plot=False,plot_vertical_line=False)    
        
        if epoch!=0:
            if loss_history[epoch]>loss_history[epoch-1]:
                early_stopping_patience-=1
            if early_stopping_patience==0:
                break
    return train_info_dict,loss_history,val_loss_history,all_loss

In [49]:
def train_model_on_batch(X_toTrain,batch_order,train_info_dict): # UPDATE FOR NON AUTOENCODERS
    """
    Trains the model on  batch
    
    Arguments:
        X_toTrain: array to train the model
        batch_order: if the xarray data is 1024 items long, which windows are we training on.
        train_info_dict: all model and training info
    Returns:
        train_info_dict: updated train info dictionary
        batch_loss: loss for this batch
    """    
    train_batch=X_toTrain[batch_order*train_info_dict["batch_size"]:(batch_order+1)*train_info_dict["batch_size"]]
    
    if train_info_dict["model_input_shape_len"]==2:
        X_toTrain=X_toTrain.reshape(X_toTrain.shape[0],X_toTrain.shape[1])
    
    batch_loss= train_info_dict["model"].train_on_batch(train_batch,train_batch)

    return train_info_dict,batch_loss

# PREPARE DATA

In [31]:
def get_data(df,train_info_dict):
    if train_info_dict["sample_time"]!=None:
        numeric_cols = list(df.select_dtypes(include=np.number).columns)
        object_cols = list(df.select_dtypes(include=np.object_).columns)
        numeric_cols_func_dict={column:[train_info_dict["sample_time_func"]] for column in numeric_cols}
        object_cols_fync_dict={column:["sum"] for column in object_cols}
        cols_func_dict={**numeric_cols_func_dict,**object_cols_fync_dict}
        
        df.set_index(train_info_dict["time_column"],inplace=True)
        df_sampled=df.resample(train_info_dict["sample_time"]).agg(cols_func_dict)
        df_sampled.columns=df_sampled.columns.get_level_values(0)
        df.reset_index(inplace=True)
        df_sampled.reset_index(inplace=True)
        df_sampled.dropna(subset=train_info_dict["train_cols"],inplace=True)
    
    elif train_info_dict["sample_time"]==None:
        df_sampled=df.copy(deep=True)
        df_sampled.dropna(subset=train_info_dict["train_cols"],inplace=True)
    
    try:
        df_sampled[train_info_dict["train_cols"]]=train_info_dict["scaler"].transform(df_sampled[train_info_dict["train_cols"]])
        
    except:
        print(df_sampled)
        print("we cant scale data")
    
    
    X_,X_list,sequence_df_list,divide_idx_list=get_divided_sequences(df_sampled,train_info_dict)
    return X_,X_list,sequence_df_list,divide_idx_list


In [None]:
def get_divided_sequences(df_sampled,train_info_dict):
    divide_idx_list=find_divide_idx(df_sampled,window_size=train_info_dict["window_size"],sample_time=train_info_dict["sample_time"],time_column=train_info_dict["time_column"])
    divide_idx_list=[-1]+divide_idx_list+[99999999999]
    print("divided points idxs")
    print(divide_idx_list[1:-1])
    
    X_list=[]
    sequence_df_list=[]
    for i in range(len(divide_idx_list)-1):
        sequence=df_sampled.iloc[divide_idx_list[i]+1:divide_idx_list[i+1]+1]
        X_= rolling_window(sequence[train_info_dict["train_cols"]], window_size=train_info_dict["window_size"],features_num=len(train_info_dict["train_cols"]))
        if len(X_)>0:
            X_list.append(X_)
            sequence_df_list.append(sequence)
    
    if len(X_list)>0:
        X_=np.concatenate(X_list)
    else:
        X_=np.array([]).reshape((0,train_info_dict["window_size"],len(train_info_dict["train_cols"]))) # to have empty array with size which is equal to model input size 
    
    
    return X_,X_list,sequence_df_list,divide_idx_list

In [14]:
def rolling_window(array, window_size,features_num ,stride=1):
    if len(array)>=(window_size):
        array = np.array(array)
        shape = (array.shape[0] - window_size + 1, window_size,features_num)
        strides = (array.strides[0],) + array.strides
        rolled = np.lib.stride_tricks.as_strided(array, shape=shape, strides=strides)
        return rolled[np.arange(0, shape[0], stride)]
    else:
        return []

In [None]:
def to_sequences_X(x, window_size=128):###########################################
    
    x_values = []
    for i in range(len(x)-(window_size-1)):
        x_values.append(x.iloc[i:(i+window_size)].values)
        
    return np.array(x_values)

In [None]:
def find_divide_idx(df,window_size,sample_time,time_column):###########################################

    divide_points_idx_list=[]
    
    if sample_time!=None:
        window_time=pd.Timedelta(sample_time)*window_size
    elif sample_time==None:
        window_time=((df[time_column].iloc[-1]-df[time_column].iloc[0])/len(df))*window_size #this row may cause problems

    
    print("average time diff between rows "+str(window_time/window_size))
    
    time_serie=np.array(df[time_column])
    
    for i in range(len(df)-1):    
        
        before_time=time_serie[i]
        after_time=time_serie[i+1]    
        
        if ((after_time-before_time)>window_time):
            divide_points_idx_list.append(i)
            
            
    return divide_points_idx_list

In [30]:
def create_segment_data(train_df,df_segment_order,train_df_segment_step,train_info_dict):
    train_df_segment=train_df.iloc[df_segment_order*train_df_segment_step:(df_segment_order+1)*train_df_segment_step]
    X_,X_list,sequence_df_list,divide_idx_list=get_data(train_df_segment,train_info_dict)
    X_=np.asarray(X_).astype('float32')
    batch_num=int(np.ceil((len(X_)/train_info_dict["batch_size"])))
    return X_,batch_num

# TEST FUNCS

In [15]:
def get_model_predict_errors(X,reconstructed_X,cols,loss_type):###########################################
    df=pd.DataFrame([])
    for i,col in enumerate(cols):
        try:
            if loss_type=="mse":
                if len(X.shape)==2:
                    predicted_error=tf.keras.losses.mse(reconstructed_X,X)
                else :
                    predicted_error=tf.keras.losses.mse(reconstructed_X[:,:,i],X[:,:,i])
            elif loss_type=="mae":
                if len(X.shape)==2:
                    predicted_error=tf.keras.losses.mae(reconstructed_X,X)
                else :
                    predicted_error=tf.keras.losses.mae(reconstructed_X[:,:,i],X[:,:,i])
            df["{}_ae_{}".format(col,loss_type)]=predicted_error
        except Exception as exception:
            print(exception)
            print("you may use loss_type except mae or mse")
        
    return df

In [22]:
def get_monolithic_anomaly_df(X_list,sequence_df_list,train_info_dict):
    anomaly_dfs=[]
    for j in range(len(X_list)):
        anomaly_df=create_anomaly_df(X_=X_list[j],sequence_df=sequence_df_list[j],train_info_dict=train_info_dict)
        anomaly_dfs.append(anomaly_df)
    if len(anomaly_dfs)>0:
        anomaly_df=pd.concat(anomaly_dfs)
        return anomaly_df
    else:
        return pd.DataFrame([])

In [17]:
def linear_sequenced_columns(X_,cols,window_size):###########################################
    df=pd.DataFrame([])
    for i,col in enumerate(cols):
        if len(X_.shape)==3:
            lineared_column =reconstructed_array_to_signal_mean(change_shape(X_[:,:,i:i+1]),window_size)
            df[col]=lineared_column
        else:
            lineared_column =reconstructed_array_to_signal_mean(X_,window_size)
            df[col]=lineared_column
    
    return df

In [2]:
def create_anomaly_df(X_,sequence_df,train_info_dict):

    anomaly_df=pd.DataFrame(sequence_df[:(-train_info_dict["window_size"]+1)][train_info_dict["time_column"]])
    if train_info_dict["model_input_shape_len"]==2:
        X_=X_.reshape(X_.shape[0],X_.shape[1])
    reconstructions_=train_info_dict["model"].predict(X_)
    
    predicted_errors_df=get_model_predict_errors(X_,reconstructions_,train_info_dict["train_cols"],train_info_dict["loss_type"])  

    lineared_reconstructed_columns=linear_sequenced_columns(reconstructions_,train_info_dict["train_cols"],train_info_dict["window_size"])

    predicted_error_column_names=[train_info_dict["error_column_template"].format(col,train_info_dict["loss_type"]) for col in train_info_dict["train_cols"]]
    anomaly_df.reset_index(inplace=True,drop=True)
    anomaly_df[predicted_error_column_names]=predicted_errors_df

    anomaly_df=pd.merge(left=sequence_df,right=anomaly_df,how="outer",on=train_info_dict["time_column"])
    anomaly_df[[train_info_dict["reconstructed_column_template"].format(col) for col in train_info_dict["train_cols"]]]=lineared_reconstructed_columns
    
    
    return anomaly_df

In [1]:
def get_train_info_dict(one_model_info_table):
    train_info_dict=one_model_info_table.to_dict('records')[0]
    
    model=read_model_or_scaler_s3(train_info_dict["tools_path"],train_info_dict["model_name"])
    scaler=read_model_or_scaler_s3(train_info_dict["tools_path"],train_info_dict["scaler_name"])
    train_cols=pd.read_parquet(train_info_dict["tools_path"]+"train_cols.parquet").values.ravel()
    time_column=pd.read_parquet(train_info_dict["tools_path"]+"time_column.parquet").values.ravel()[0]
    use_cols=list(train_cols)+[time_column]
    train_data_info=pd.read_parquet(train_info_dict["tools_path"]+"train_data_info.parquet")
    
    train_info_dict.update({
    "model":model,
    "scaler":scaler,
    "train_cols":list(train_cols),
    "time_column":time_column,
    "use_cols":use_cols,
    "train_data_info":train_data_info
    })
    
    return train_info_dict

In [21]:
def find_cust_subassets_models_info(read_path,model_info_with_wanted_features,cust_features_column_names, cust_list):
    cust_features_dict=get_cust_features(read_path, cust_list=cust_list)
    cust_subasset_mask=[(model_info_with_wanted_features[feature_key]==feature_value) for feature_key,feature_value in zip(cust_features_column_names,cust_features_dict.values())]
    cust_subasset_models_info=model_info_with_wanted_features.loc[sum(cust_subasset_mask)==len(cust_features_column_names)]
    
    return cust_subasset_models_info,cust_features_dict["cust_path"]

In [196]:
def create_anomaly_df(X_,sequence_df,train_info_dict):

    anomaly_df=pd.DataFrame(sequence_df[:(-train_info_dict["window_size"]+1)][train_info_dict["time_column"]])
    if train_info_dict["model_input_shape_len"]==2:
        X_=X_.reshape(X_.shape[0],X_.shape[1])
    reconstructions_=train_info_dict["model"].predict(X_)

    predicted_errors_df=get_model_predict_errors(X_,reconstructions_,train_info_dict["train_cols"],train_info_dict["loss_type"])  

    lineared_reconstructed_columns=linear_sequenced_columns(reconstructions_,train_info_dict["train_cols"],train_info_dict["window_size"],train_info_dict["reconstructed_column_template"])

    predicted_error_column_names=[train_info_dict["error_column_template"].format(col,train_info_dict["loss_type"]) for col in train_info_dict["train_cols"]]
    anomaly_df.reset_index(inplace=True,drop=True)
    anomaly_df[predicted_error_column_names]=predicted_errors_df

    anomaly_df=pd.merge(left=sequence_df,right=anomaly_df,how="outer",on=train_info_dict["time_column"])
    anomaly_df=pd.concat([anomaly_df,lineared_reconstructed_columns],axis=1)    

    return anomaly_df

In [163]:
def linear_sequenced_columns(X_,cols,window_size,reconstructed_column_template):###########################################
    df=pd.DataFrame([])
    for i,col in enumerate(cols):
        if len(X_.shape)==3:
            reconstructed_array_signal_mean =reconstructed_array_to_signal_mean(change_shape(X_[:,:,i:i+1]),window_size)
            reconstructed_array_signal_variance =reconstructed_array_to_signal_variance(change_shape(X_[:,:,i:i+1]),window_size)
            df[reconstructed_column_template.format(col)+"_mean"]=reconstructed_array_signal_mean
            df[reconstructed_column_template.format(col)+"_variance"]=reconstructed_array_signal_variance

        else:
            reconstructed_array_signal_mean =reconstructed_array_to_signal_mean(X_,window_size)
            reconstructed_array_signal_variance =reconstructed_array_to_signal_variance(X_,window_size)

            df[reconstructed_column_template.format(col)+"_mean"]=reconstructed_array_signal_mean
            df[reconstructed_column_template.format(col)+"_variance"]=reconstructed_array_signal_variance
    
    return df