In [None]:
def arange(
        arg1: float,
        arg2: float or None = None,
        arg3: float or None = None,
        arg4: bool or None = None):
    
    '''
    Realization of simple range (based on np.arange) with protection from 
    float large decimals, e.g. 1.100000000009 except 1.1)
    
    default:
        arg1 - start
        arg2 - stop
        arg3 - step
        arg4 - endpoint (if True: 'stop' value included in range; if False: 'stop' value not included in range)

    variations:
        arange(arg1) -> range(start=0, stop=arg1, step=1, endpoint=False)
        
        arange(arg1, arg2):
            arange(float, float) -> (start=arg1, stop=arg2, step=1, endpoint=False)
            arange(float, bool) -> range(start=0, stop=arg1, step=1, endpoint=arg2)
            
            
        arange(arg1, arg2, arg3):
            arange(float, float, float) -> (start=arg1, stop=arg2, step=arg3, endpoint=False)
            arange(float, float, bool) -> range(start=arg1, stop=arg2, step=1, endpoint=arg3)
            
        arange(arg1, arg2, arg3, arg4):
            arange(float, float, float, bool) -> range(start=arg1, stop=arg2, step=arg3, endpoint=arg4)

    dependencies:
        libraries: numpy, decimal, numbers
    '''

    # list of argument values
    arg_values = locals().values()

    # create list with decimals of arguments values
    round_idxs = []
    for i in arg_values:
        if (isinstance(i, numbers.Number) and not
            isinstance(i, bool)):
            decimals = decimal.Decimal(str(i)).as_tuple().exponent
            round_idxs.append(abs(decimals))
    # find maximum number of decimals - 
    # all values would be round to it later to avoid X.XXXXXXXXXX float
    round_dec = max(round_idxs)
    
    # True/False marker if result should be all integers
    is_int = False

    # if only one argument: arange(arg1)
    if ((arg1 is not None) & (arg2 is None) &
        (arg3 is None) & (arg4 is None)):
        # equivalent (start=0, stop=arg1, step=1, endpoint=False)
        start = 0
        stop = arg1
        # return empty array if start and stop equals
        if start == stop:
            arr = np.empty(0)
            return arr
        step = 1
        endpoint = False
        # rememeber decimal number of stop variable
        round_dec_for_stop = decimal.Decimal(str(stop)).as_tuple()
        round_dec_for_stop = abs(round_dec_for_stop.exponent)
        
        if isinstance(arg1, int):
            is_int = True

    # if two arguments: arange(arg1, arg2)
    if ((arg1 is not None) & (arg2 is not None) &
        (arg3 is None) & (arg4 is None)):
        
        # if second argument boolean: arange(number1, True)
        if isinstance(arg2, bool):
            # equivalent (start=0, stop=arg1, step=1, endpoint=arg2)
            start = 0
            stop = arg1
            step = 1
            endpoint = arg2
            # rememeber decimal number of stop variable
            round_dec_for_stop = decimal.Decimal(str(stop)).as_tuple()
            round_dec_for_stop = abs(round_dec_for_stop.exponent)
        # if second argument not boolean: arange(number1, number2)
        else:
            # equivalent (start=arg1, stop=arg2, step=1, endpoint=False)
            start = arg1
            stop = arg2
            # return empty array if start and stop equals
            if start == stop:
                arr = np.empty(0)
                return arr
            step = 1
            endpoint = False
            # rememeber decimal number of stop variable
            round_dec_for_stop = decimal.Decimal(str(stop)).as_tuple()
            round_dec_for_stop = abs(round_dec_for_stop.exponent)

        if isinstance(arg1, int) & isinstance(arg2, int):
            is_int = True

    # if three arguments: arange(arg1, arg2, arg3)
    if ((arg1 is not None) & (arg2 is not None) &
        (arg3 is not None) & (arg4 is None)):
        # if third argument boolean: arange(number1, number2, True)
        if isinstance(arg3, bool):
            # equivalent (start=arg1, stop=arg2, step=1, endpoint=arg3)
            start = arg1
            stop = arg2
            # return empty array if start and stop equals
            if start == stop:
                arr = np.empty(0)
                return arr
            step = 1
            endpoint = arg3
            # rememeber decimal number of stop variable
            round_dec_for_stop = decimal.Decimal(str(stop)).as_tuple()
            round_dec_for_stop = abs(round_dec_for_stop.exponent)
        # if third argument not boolean: arange(number1, number2, number3)
        else:
            # equivalent (start=arg1, stop=arg2, step=arg3, endpoint=False)
            start = arg1
            stop = arg2
            # return empty array if start and stop equals
            if start == stop:
                arr = np.empty(0)
                return arr
            step = arg3
            endpoint = False
            # rememeber decimal number of stop variable
            round_dec_for_stop = decimal.Decimal(str(stop)).as_tuple()
            round_dec_for_stop = abs(round_dec_for_stop.exponent)

        if (isinstance(arg1, int) & isinstance(arg2, int) &
               isinstance(arg3, int)):
            is_int = True

    # if all arguments: arange(arg1, arg2, arg4, True)
    if ((arg1 is not None) & (arg2 is not None) &
        (arg3 is not None) & (arg4 is not None)):
        # equivalent (start=arg1, stop=arg2, step=arg3, endpoint=arg4)
        start = arg1
        stop = arg2
        # return empty array if start and stop equals
        if start == stop:
            arr = np.empty(0)
            return arr
        step = arg3
        endpoint = arg4
        # rememeber decimal number of stop variable
        round_dec_for_stop = decimal.Decimal(str(stop)).as_tuple()
        round_dec_for_stop = abs(round_dec_for_stop.exponent)

        if (isinstance(arg1, int) & isinstance(arg2, int) &
            isinstance(arg3, int)):
            is_int = True

    # arr = step * np.arange(start/step, stop/step)
    arr = np.arange(start, stop, step)
    # round array to avoid X.XXXXXXXXXXXX float
    arr = np.around(arr, decimals=round_dec)
    # if last value of arr plus step equals to stop it concatenates to arr
    last_value = arr[-1]
    # also round this value to avoid X.XXXXXXXXXXXX float (number decimals as in stop variable)
    last_value_plus_step = np.around(last_value+step, round_dec_for_stop)
    if endpoint and last_value_plus_step==stop:
        arr = np.concatenate([arr,[stop]])
    if is_int:
        arr = np.around(arr, decimals=0)
        arr = arr.astype(int)

    return arr

In [None]:
# smooth data

def smoothed(x, y=None, n=300, k=3, return_type='df', datetime_index=False):
    '''
    Smooth data for plots
    
    Arguments:
    x: pd.DataFrame, pd.Series
    y: array-type
    n: length of linespace
    k: smoothing scale
    return_type: 
        - if 'array' - return x_new, y_new
        - if 'dict' - returns dict with {'x': x_new, 'y': y_new}

    If x == pd.DataFrame functon returns pd.DataFrame anyway

    Libraries:
    from scipy.interpolate import make_interp_spline, BSpline
    '''

    if datetime_index:
        start = x.index[0]
        end = x.index[-1]
        time_range = \
            pd.date_range(start=start, end=end, periods=n)
        x = x.reset_index(drop=True)

    if isinstance(x, pd.DataFrame):
        var_name = x.columns[0] if x.columns[0] != 0 else 'variable'
        x_index = x.index
        x_new = np.linspace(x_index.min(), x_index.max(), n)
        df = pd.DataFrame(index=x_new, columns=x.columns)
        for col in x.columns:
            y = x[col]
            spl = scipy.interpolate.make_interp_spline(x_index, y, k=k)  # type: BSpline
            y_new = spl(x_new)
            df[col] = y_new
        if return_type == 'df':
            if datetime_index:
                df.index = time_range
            return df
        if return_type == 'array':
            return np.array(df.index), np.array(df.iloc[:, 0])
        
    elif isinstance(x, pd.Series):
        var_name = x.name
        y = x.copy()
        x = x.index
        
        # n represents number of points to make between T.min and T.max
        x_new = np.linspace(x.min(), x.max(), n) 
    
        spl = scipy.interpolate.make_interp_spline(x, y, k=k)  # type: BSpline
        y_new = spl(x_new)
    
        if return_type == 'dict':
            if datetime_index:
                ret_dict = {
                    'x': time_range,
                    'y': y_new
                    }
            else:
                ret_dict = {
                    'x': x_new,
                    'y': y_new
                    }
            return ret_dict
        elif return_type == 'array':
            if datetime_index:
                return time_range, y_new
            else:
                return x_new, y_new
        elif return_type == 'df':
            if datetime_index:
                df = pd.DataFrame(data=y_new, index=time_range, columns=[var_name])
            else:
                df = pd.DataFrame(data=y_new, index=x_new, columns=[var_name])
            return df
    else:
        y = x.copy()
        x = arange(len(x))

        # n represents number of points to make between T.min and T.max
        x_new = np.linspace(x.min(), x.max(), n) 
    
        spl = scipy.interpolate.make_interp_spline(x, y, k=k)  # type: BSpline
        y_new = spl(x_new)
        
        if return_type == 'dict':
            if datetime_index:
                ret_dict = {
                    'x': time_range,
                    'y': y_new
                    }
            else:
                ret_dict = {
                    'x': x_new,
                    'y': y_new
                    }
            return ret_dict
        elif return_type == 'array':
            if datetime_index:
                return time_range, y_new
            else:
                return x_new, y_new
        elif return_type == 'df':
            if datetime_index:
                df = pd.DataFrame(data=y_new, index=time_range, columns=['variable'])
            else:
                df = pd.DataFrame(data=y_new, index=x_new, columns=['variable'])
            return df

In [None]:
# saturate and alpha colors

def saturate_color(color_rgb, saturation=0.75):
    color_hls = colorsys.rgb_to_hls(
        color_rgb[0], color_rgb[1], color_rgb[2])
    color_hls_saturated = (
        color_hls[0], color_hls[1], saturation*color_hls[2])
    color_rgb_saturated = colorsys.hls_to_rgb(
        color_hls_saturated[0], color_hls_saturated[1], color_hls_saturated[2])
    return color_rgb_saturated


def alpha_color(color, alpha):
    new_color = tuple (x + (1 - x) * (1 - alpha) for x in color)
    return new_color


def saturate_palette(palette, saturation=0.75):
    palette_saturated = [saturate_color(i, saturation=saturation) for i in palette]
    return palette_saturated


def alpha_palette(palette, alpha=0.90):
    palette_alphed = [alpha_color(i, alpha=alpha) for i in palette]
    return palette_alphed

In [None]:
# check if there are NaNs in df

def is_nan(df):
    ret = df[df.isna().any(axis=1)]
    shape = df[df.isna().any(axis=1)].shape
    if shape[0] > 0:
        return ret
    else:
        print("No NaN values in DataFrame")

In [None]:
# save-load

def loadit(name, dir='files'):
    result = pd.read_pickle(f'{dir}{name}.pkl')
    return result

def saveit(file, name, dir='files'):
    # check if dir exists and create it if not
    if not os.path.exists(dir):
        os.mkdir(dir)
    # save file
    filehandler = open(f'{dir}/{name}.pkl', 'wb') 
    pickle.dump(file, filehandler)
    filehandler.close()
    print(f"File '{name}.pkl' saved in directory '{dir}'")

def savefig(name, dir='img', format='png', dpi=100, transparent=True,  figure=None, **kwargs):
    '''
    Saves figure as PNG to 'img/' dir
    '''
    if figure is None:
       figure = fig
    if dir is None:
        dir = 'img'
    else:
        if dir != 'img':
            dir = f'img/{dir}'
        else:
            pass
    # check if dir exists and create it if not
    if not os.path.exists(dir):
        os.mkdir(dir)
    figure.savefig(
        f'{dir}/{name}.{format}',
        transparent=transparent,
        bbox_inches='tight',
        dpi=dpi, 
        format=format,
        **kwargs
    )
    print(f"Image '{name}.{format}' successfully saved into '{dir}' directory")

In [None]:
def ci_bootstrap(
        data, statistic=np.mean, n_bootstrap=1000,
        confidence_level=0.95, random_state=42):
    '''
    Returns: dict(statistic, std, ci_min, ci_max, margin)
    '''
    data_ = (data,)
    bootstrap = scipy.stats.bootstrap(
        data=data_,
        statistic=statistic,
        n_resamples=n_bootstrap,
        confidence_level=confidence_level,
        random_state=random_state
    )
    ci_min = bootstrap.confidence_interval[0]
    ci_max = bootstrap.confidence_interval[1]
    if isinstance(data, pd.DataFrame):
        stat = data.apply(statistic)
        stat = np.array(stat)
        std = np.array(np.std(data, ddof=1))
    else:
        stat = statistic(data)
        std = np.std(data, ddof=1)
    margin = stat - ci_min

    return_dct = {
        'statistic': stat,
        'std': std,
        'ci_min': ci_min,
        'ci_max': ci_max,
        'margin': margin,
    }
    return return_dct

In [None]:
def ci_t_distribution(
        data=None, mean=None, std=None, n=None, confidence_level=0.95):

    if data is not None:
        arr = np.array(data)
        n = len(arr)
        mean = np.mean(arr)
        se = scipy.stats.sem(arr)
        
    if mean and std and n is not None:
        se = std / np.sqrt(n)

    t = scipy.stats.t.ppf((1+confidence_level) / 2, n-1)
    margin = t * se
    ci_min = mean - margin
    ci_max = mean + margin

    return_dct = {
        'min': ci_min,
        'max': ci_max,
        'mean': mean,
        'margin': margin,
        't': t
    }
    return return_dct

In [None]:
# normality tests

def test_normality(data, alpha=0.05):
    
    tests_names = []
    pvalue = []
    condition = []
        
    # Kolmogorov-Smirnov
    ks = stats.kstest(data, 'norm')
    pvalue_ks = ks.pvalue
    tests_names.append('Kolmogorov-Smirnov')
    pvalue.append(pvalue_ks)
    if pvalue_ks < alpha:
        condition.append('Not normal')
    else:
        condition.append('Normal')

    # Anderson-Darling
    and_dar = stats.anderson(data, dist='norm')
    and_dar_sign = and_dar.critical_values[2]
    and_dar_statistic = and_dar.statistic
    tests_names.append('Anderson-Darling (s)')
    pvalue.append(and_dar_statistic)
    if and_dar_statistic > and_dar_sign:
        condition.append('Not normal')
    else:
        condition.append('Normal')

    # Shapiro-Wilk
    pvalue_sw = stats.shapiro(data).pvalue
    tests_names.append('Shapiro-Wilk')
    pvalue.append(pvalue_sw)
    if pvalue_sw < alpha:
        condition.append('Not normal')
    else:
        condition.append('Normal')

    # jarque-bera test
    jb_name = ["Jarque-Bera", "Chi^2", "Skew", "Kurtosis"]
    jb_statistic = sms.jarque_bera(data)
    jb = dict(zip(jb_name, jb_statistic))
    pvalue_jb = jb['Chi^2']
    tests_names.append('Jarque-Bera')
    pvalue.append(pvalue_jb)
    if pvalue_jb < alpha:
        condition.append('Not normal')
    else:
        condition.append('Normal')
    
    # D’Agostino and Pearson
    dagp = stats.normaltest(data)
    pvalue_dagp = dagp.pvalue
    tests_names.append('D’Agostino-Pearson')
    pvalue.append(pvalue_dagp)
    if pvalue_dagp < alpha:
        condition.append('Not normal')
    else:
        condition.append('Normal')

    pvalue = [np.round(i, 4) for i in pvalue]
    results_df = pd.DataFrame({
        'Test': tests_names,
        'P or Statistic (s)': pvalue,
        'Condition': condition,
    })
    
    return results_df

In [None]:
def order_X_y(data, target):
    '''
    Move Target variable column to the end of DataFrame
    '''
    columns = data.columns.tolist()
    columns.append(columns.pop(columns.index(target)))
    df = data[columns].copy()
    
    return df

In [None]:
def feature_importance_display(
        features, importance,
        top=None, imp_min_level=None, only_features=True):

    '''
     
    '''

    feature_importance = pd.DataFrame({
        'Feature': features,
        'Importance': importance
    })
    if imp_min_level is not None:
        loc_row = feature_importance['Importance'] > imp_min_level
        feature_importance = (feature_importance
                              .loc[loc_row, :]
                              .sort_values('Importance', ascending=False)
                              .reset_index(drop=True))
    if top is not None:
        feature_importance = (feature_importance
                             .sort_values('Importance', ascending=False)
                             .reset_index(drop=True))
        feature_importance = feature_importance.loc[0:top-1]

    if only_features:
        feature_importance = feature_importance['Feature']
        
    return feature_importance

In [None]:
def rgb_to_hex(x):
    color_hex = matplotlib.colors.to_hex(x)
    return color_hex

In [None]:
def outliers_column_iqr(data, feature, scale=1.5):

    '''
    Add nominative (1/0) column '{feature}_is_out' in DataFrame, that indicates outliers for Feature
    '''

    df = data.copy()

    q1 = df[feature].quantile(0.25)
    q3 = df[feature].quantile(0.75)
    iqr = q3 - q1
    lower_boundary = q1 - scale*iqr
    upper_boundary = q3 + scale*iqr
    condition = ((df[feature] < lower_boundary) |
                 (df[feature] > upper_boundary))
    df[feature+'_is_out'] = condition.astype(int)

    return df

In [None]:
def correlation_w_target(data, target):

    '''
    Create sorted DataFrame with correlations to Target 
    '''
    
    df = (data
          .corr()[target]
          .sort_values(ascending=False, key=abs)[1:]
          .to_frame())
    return df

In [None]:
def check_columns_match(data):

    '''
    Check if all columns in DataFrame are equla and return no equal if not
    '''

    df = data.copy()
    df['is_equal'] = df.eq(df.iloc[:, 0], axis=0).all(1).astype(int)
    equal_sum = df['is_equal'].sum()

    if equal_sum == len(df):
        print('All values matched')
        return None
    else:
        loc = df['is_equal'] == 0, df.columns != 'is_equal'
        result = df.loc[loc].copy()
        return result      

In [None]:
def fillna_na(data, features_list):

    '''
    Fill all NaNs in DataFrame by 'NA'
    '''

    df = data.copy()
    for feature in features_list:
        df[feature] = df[feature].fillna('NA')

    return df

In [None]:
def is_equal(data1, data2):

    if data1.equals(data2):
        print('Equal')
    else:
        # display rows with differences
        data1[~data1.apply(tuple, 1).isin(data2.apply(tuple, 1))]

In [None]:
def remove_duplicated_whitespaces(x):

    '''
    Remove duplicated whitespaces in x (String variable)
    '''
    
    return str.join(' ', str(x).split())

In [None]:
def replace_with_dict(x, replace_dict):

    '''
    In argument 'x' replaces all replace_dict keys by replace_dict values
    '''
    
    for key in replace_dict.keys():
        x = x.replace(key, replace_dict[key])
        
    return x

In [None]:
def df_cutted_rows(data, start, end):
    '''
    Cut n=='start' rows at the beginning of DataFrame and 
    n=='end' rows at the end of DataFrame 
    '''
    if end == 0:
        slice_ = (slice(start, None), slice(None, None))
    else:
        # create slice, that cut rows and stay all columns
        slice_ = (slice(start, -end), slice(None, None))
    # unpack slice_ in .iloc
    df = data.iloc[*slice_].copy()

    return df

In [None]:
def last_row_to_first(data):

    '''
    Make the last row of DataFrame to be the first
    '''

    df = data.copy()
    # extract last row with 'Год' from 'pci_month'
    first_row = df.iloc[-1].to_frame().T
    # add it as first row to 'pci_month'
    df = pd.concat([first_row, df], axis=0)
    # remove last row from 'pci_month'
    df = df.iloc[:-1].copy()

    return df

In [None]:
def np_index(array, value):
    '''
    Returns index of Value which is in Array
    '''
    return np.where(array == value)[0][0]

In [None]:
def normalized_by_first(data, return_type='df'):

    '''
    Normalize kind: 
        first_value == first_value
        second_value = second_value / first_value
        third_value = third_value / first_value
    '''
    
    first_value = data[0]
    
    data_new = [(x/first_value) for x in data]

    if return_type == 'df':
        df = pd.DataFrame(data=data_new, index=data.index)
        return df
    if return_type == 'series':
        series = pd.Series(data=data_new, index=data.index)
        return series
    elif return_type == 'array':
        array = np.array(data_new)
        return array
    elif return_type == 'list':
        lst = list(data_new)
        return lst
    else:
        print("'return_type' must be 'df', 'series', 'array', 'list'")
    
    return data_new

In [None]:
def normalized(data, reshape=True, return_type='df'):

    '''
    MinMaxScaler 0/1 
    '''
    
    if (isinstance(data, pd.Series) | 
        isinstance(data, pd.DataFrame)):
        idxs = data.index.copy()
    if reshape:
        data = np.array(data).reshape(-1, 1)
    data_new = MinMaxScaler().fit_transform(data)
    if return_type == 'df':
        data_new = pd.DataFrame(data=data_new, index=idxs)
    elif return_type == 'array':
        pass
    else:
        print("return_type must be 'df' or 'array'")
        return None
        
    return data_new

In [None]:
def to_date(x, kind='%B %Y', translate=False):
    '''
    String to Date
    '''
    months_list = [
        'январь', 'февраль', 'март', 'апрель', 'май', 'июнь', 'июль',
        'август', 'сентябрь', 'октябрь', 'ноябрь', 'декабрь',
        'Январь', 'Февраль', 'Март', 'Апрель', 'Май', 'Июнь', 'Июль',
        'Август', 'Сентябрь', 'Октябрь', 'Ноябрь', 'Декабрь'
    ]
    # if months in Russian
    if translate:
        # split string to list
        x = x.split()
        # for every element in list
        for i in x:
            # if element is month
            if i in months_list:
                # find its index
                i_index = x.index(i)
                # translate element and access new value with it
                new_value = months_translate(i, kind='rus-eng', capitalize=True)
                # change old month to new one
                x[i_index] = new_value
        # join all elements of list to one string
        x = ' '.join(x)
    # transform string to date
    x = dt.datetime.strptime(x, kind)
    return x

In [None]:
def months_translate(x, kind='rus-eng', add_year=None, capitalize=True):

    '''
    Transform russian month name to english
    'январь' --> 'January'
    
    if add_year==2021: 'январь' --> 'January 2021'
    if capitalize==False: 'январь' --> 'january'
    '''
    
    # lowercase data
    x_old = x.lower()
    # create repalce dict
    if kind == 'rus-eng':
        repalce_dict = {
            'январь': 'january',
            'февраль': 'february',
            'март': 'march',
            'апрель': 'april',
            'май': 'may',
            'июнь': 'june',
            'июль': 'july',
            'август': 'august',
            'сентябрь': 'september',
            'октябрь': 'october',
            'ноябрь': 'november',
            'декабрь': 'december'
        }
    elif kind == 'eng-rus':
        repalce_dict = {
            'january': 'январь',
            'february': 'февраль',
            'march': 'март',
            'april': 'апрель',
            'may': 'май',
            'june': 'июнь',
            'july': 'июль',
            'august': 'август',
            'september': 'сентябрь',
            'october': 'октябрь',
            'november': 'ноябрь',
            'december': 'декабрь'
        }
    else:
        print("'kind' must be 'rus-eng' or 'eng-rus'")
    # for all keys and values in dict, replace x by value if x and key are equal
    for k, v in repalce_dict.items():
        if x_old == k:
            x_new = v
        else:
            pass

    if capitalize:
        x_new = x_new.capitalize()

    if add_year is not None:
        x_new = x_new + ' ' + str(add_year)

    return x_new

In [None]:
def axis_new_year(
        months=[1, 4, 7, 10],
        month_format='%b',
        year_format='%Y',
        add_year_axis=True,
        year_axis_pad=-0.105,
        language='eng',
        months_as_minor=False,
        months_pad=5,
        capitalize=True,
        ax=None):

    '''
    /// IMPORTANT: If use language=='rus' then use set_location('EN') after plt.show() or reset_location=True
        if current ax is last in figure, 
        because axis_new_year() function changes location to 'ru_RU'

    /// Also, 'language' argument, that've set in last ax, aplied to all axes of plot.  
    
    Modificate date format of plots from datetime (for example, '2021-01-01') to:
    
    ---|-------|----- ... ---|--------|-------
      Jan     Feb           Dec      Jan      
      2021                           2022         
    '''
    
    # set ax
    if ax is None: ax = plt.gca()

    # specify 1st month for major ticks and other months for minor ticks
    if months_as_minor:
        # minor months - all except 1
        months_minor = list(filter(lambda i: i != 1, months))
        months_major = 1
        loc_month_minor = mdates.MonthLocator(bymonth=months_minor)
    else:
        months_major = months

    # major ticks
    loc_month_major = mdates.MonthLocator(bymonth=months_major)
    # set format of months labels
    fmt_month = mdates.DateFormatter(month_format)
    # major ticks every year
    loc_year = mdates.YearLocator()
    # set format of year labels
    fmt_year = mdates.DateFormatter(year_format)
    
    # set month major ticks
    ax.xaxis.set_major_locator(loc_month_major)
    ax.xaxis.set_major_formatter(fmt_month)
    # set month minor ticks if necessary
    if months_as_minor:
        ax.xaxis.set_minor_locator(loc_month_minor)
        ax.xaxis.set_minor_formatter(fmt_month)
        ax.tick_params(axis='x', which='minor', pad=months_pad)

    # set secondary axis with major ticks as year
    if add_year_axis:
        second_xaxis = ax.secondary_xaxis(year_axis_pad)
        second_xaxis.xaxis.set_major_locator(loc_year)
        second_xaxis.xaxis.set_major_formatter(fmt_year)

    # hide the second x-axis spines and ticks
    second_xaxis.spines['bottom'].set_visible(False)
    second_xaxis.tick_params(bottom=False)

    # translate months if necessary
    if language=='eng':
        locale.setlocale(locale.LC_ALL,'en_US')
    elif language=='rus':
        locale.setlocale(locale.LC_ALL,'ru_RU.UTF-8')
    else:
        print("'language' have to be 'eng' or 'rus'")

    # capialize months if necessary
    if capitalize:
        function = lambda x,pos: mdates.DateFormatter(month_format)(x,pos).capitalize()
        ax.xaxis.set_major_formatter(matplotlib.ticker.FuncFormatter(function))
        ax.xaxis.set_minor_formatter(matplotlib.ticker.FuncFormatter(function))

In [None]:
def set_location(loc='EN'):
    if loc=='EN':
        locale.setlocale(locale.LC_ALL,'en_US')
    elif loc=='RU':
        locale.setlocale(locale.LC_ALL,'ru_RU.UTF-8')
    else:
        print("Location have to be 'EN' or 'RU'")

In [None]:
def customize_axis(
        ax=None, x_offset=10, y_offset=10, ticks_width=1, axis_width=1,
        ticks_color='0.9', label_color='0.5', axis_color=None, **kwargs):

    if ax is None:
        ax = plt.gca()

    ax.tick_params(width=ticks_width, colors=ticks_color, labelcolor=label_color)

    ax.spines['bottom'].set_position(('outward', x_offset))
    ax.spines['left'].set_position(('outward', y_offset))

    ax.spines['bottom'].set_linewidth(axis_width)
    ax.spines['left'].set_linewidth(axis_width)

    if axis_color is not None:
        ax.spines['bottom'].set_color(axis_color)
        ax.spines['left'].set_color(axis_color)

In [None]:
def outward_axis(ax=None, x_offset=5, y_offset=5):

    if ax is None:
        ax = plt.gca()

    ax.spines['bottom'].set_position(('outward', x_offset))
    ax.spines['left'].set_position(('outward', y_offset))

In [None]:
def axis_rstyle(
        yticks: list | None = None,
        xticks: list | None = None,
        yslice: list | None = None,
        xslice: list | None = None,
        ylim: list | None = None,
        xlim: list | None = None,
        x_spine_lim: list | None = None,
        x_axis_hide: bool = False,
        y_spine_lim: list | None = None,
        y_axis_hide: bool = False,
        offset_left: float = 5,
        offset_bottom: float = 10,
        ticks_pad_left: float = 6,
        ticks_pad_bottom: float = 6,
        linewidth: float = 0.75,
        margin: bool = True,
        customize_colors: bool = True,
        spines_color: str ='#AAAAAA',
        ticks_color: str ='#AAAAAA',
        ticklabels_color: str ='#808080',
        grid: bool = False,
        ax=None):
    
    '''
    xticks: tuple (x_min, x_max, step)
    yticks: tuple (y_min, y_max, step)

    Dependencies: 
        import: collections
        functions: arange
    '''
    
    if ax is None: ax = plt.gca()

    # order of steps (important):
        # 1 - get ticks
        # 2 - set margins if necessary
        # 3 - manipulations with sticks
        # 4 - update ticks
        # 5 - spines modification
        # 6 - set limits
        # 7 - tick params
        # 8 - grid

    # get ticks
    x_ticks = ax.get_xticks()
    y_ticks = ax.get_yticks()

    if margin is not None:
        if isinstance(margin, collections.abc.Iterable):
            ax.margins(*margin)
        else:
            margin = 0.01 if margin is True else margin
            # calculate margin coefficients coeff0 and coeff1 the way
            # margins have to be equal
            # 1st step: find size of figure/ax -> figisize (or ax) 
            # size should be like (ax_width, ax_height)
            # 2d step: suggest margin_x should be equals 0.025, then
                # ax_width * margin_x = ax_height * margin_y
                # margin_y = (margin_x * ax_width) / ax_height
            # so, calculated by this way values of margin_x and margin_y 
            # would make both margins equal and NOT depend on figure(or ax) size
            ax_height, ax_width = ax.bbox.height, ax.bbox.width
            margin_y = margin * ax_width / ax_height
            ax.margins(x=margin, y=margin_y)

    # declare xticks and yticks if necessary
    if xticks is not None:
        # if step not specified
        if len(xticks) == 2:
            # define step equals default step
            xstep = x_ticks[1] - x_ticks[0]
            # make xticks shape (3,)
            xticks = np.append(xticks, xstep)
        x_ticks = arange(xticks[0], xticks[1], xticks[2], True)
    if yticks is not None:
        # if step not specified
        if len(yticks) == 2:
            # define step equals default step
            ystep = y_ticks[1] - y_ticks[0]
            # make yticks shape (3,)
            yticks = np.append(yticks, ystep)
        y_ticks = arange(yticks[0], yticks[1], yticks[2], True)

    # declare xticks and yticks with slices if necessary
    if xslice is not None:
        xslice_ = slice(*xslice)
        x_ticks = x_ticks[xslice_]
    if yslice is not None:
        yslice_ = slice(*yslice)
        y_ticks = y_ticks[yslice_]

    # update ticks
    ax.set_xticks(x_ticks)
    ax.set_yticks(y_ticks)

    # set limits if necessary
    if xlim is not None:
        ax.set_xlim(xlim[0], xlim[1])
        x_ticks = [x for x in x_ticks if x <= xlim[1]]
        x_ticks = [x for x in x_ticks if x >= xlim[0]]
    if ylim is not None:
        ax.set_ylim(ylim[0], ylim[1])
        y_ticks = [y for y in y_ticks if y <= ylim[1]]
        y_ticks = [y for y in y_ticks if y >= ylim[0]]

    # customize spines
    ax.spines['bottom'].set_visible(True)
    ax.spines['bottom'].set_bounds(x_ticks[0], x_ticks[-1])
    ax.spines['bottom'].set_position(('outward', offset_bottom))
    ax.spines['left'].set_visible(True)
    ax.spines['left'].set_bounds(y_ticks[0], y_ticks[-1])
    ax.spines['left'].set_position(('outward', offset_left))
    ax.spines['top'].set_visible(False)
    ax.spines['right'].set_visible(False)

    if x_spine_lim:
        ax.spines['bottom'].set_bounds(x_spine_lim[0], x_spine_lim[-1])
    if y_spine_lim:
        ax.spines['left'].set_bounds(y_spine_lim[0], y_spine_lim[-1])

    if customize_colors:
        ax.spines['bottom'].set_color(spines_color)
        ax.spines['left'].set_color(spines_color)
        ax.tick_params(which='both', color=ticks_color)
        ax.tick_params( which='both', labelcolor=ticklabels_color)

    if linewidth:
        ax.spines['bottom'].set_linewidth(linewidth)
        ax.spines['left'].set_linewidth(linewidth)
        ax.tick_params(which='both', width=linewidth)
    
    # set tick params and colors
    ax.tick_params(
        which='both', direction='out', bottom=True, size=3, left=True)
    ax.tick_params(
        axis='x', pad=ticks_pad_bottom)
    ax.tick_params(
        axis='y', pad=ticks_pad_left)

    if x_axis_hide:
        ax.spines['bottom'].set_visible(False)
        ax.tick_params(bottom=False)
    if y_axis_hide:
        ax.spines['left'].set_visible(False)
        ax.tick_params(left=False)

    # grid customization (exclude grid lines at the edge of spines)
    if grid:
        if not isinstance(grid, bool):
            raise TypeError ("'grid' agrument must be Bool")
            
        ax.grid(False)
        x_ticks_ = ax.get_xticks()
        y_ticks_ = ax.get_yticks()

        for i in x_ticks_:
            if (i == x_ticks_[0]) | (i == x_ticks_[-1]):
                pass
            else:
                ax.plot(
                    [i, i], [y_ticks_[0], y_ticks_[-1]],
                    lw=0.5, ls=':', color='#D9D9D9')
        for i in y_ticks_:
            if (i == y_ticks_[0]) | (i == y_ticks_[-1]):
                pass
            else:
                ax.plot(
                    [x_ticks_[0], x_ticks_[-1]], [i, i],
                    lw=0.5, ls=':', color='#D9D9D9')
    else:
        ax.grid(False)

In [None]:
def to_round(x, scale=1, error='skip'):
    
    '''
    Round x if possible
    '''
    try:
        return round(x, ndigits=scale)
    except TypeError:
        if error == 'type':
            print(f'TypeError: {x}')
        elif error == 'skip':
            pass
        else:
            print("'error' must be 'type' or 'skip'")
            return
        return x

In [None]:
def to_float(x):
    '''
    Convert x to Float if possible
    '''
    try:
        return float(x)
    except ValueError:
        print(f'ValueError: {x}')
        return x

In [None]:
def to_int(x, errors=False):
    '''
    Convert x to Int if possible
    '''
    try:
        return int(x)
    except ValueError:
        return x
        if errors:
            print(f'ValueError: {x}')
    except TypeError:
        return x
        if errors:
            print(f'ValueError: {x}')

In [None]:
def to_string(x):
    '''
    Convert x to String if possible
    '''
    try:
        return str(x)
    except ValueError:
        print(f'ValueError: {x}')
        return x
    except TypeError:
        print(f'ValueError: {x}')
        return x

In [None]:
def skewness(df):

    df = pd.DataFrame(df.skew(numeric_only=True),
                      columns=['Skewness'],
                      index=None)

    df['Highly skewed'] = (abs(df['Skewness']) > 0.5)
    df['abs'] = abs(df['Skewness'])

    df = df.sort_values(by=['abs', 'Highly skewed'], ascending=False)
    df = df.drop('abs', axis=1)

    return df

In [None]:
def kurtosis(df):

    df = pd.DataFrame(df.kurtosis(numeric_only=True),
                      columns=['Kurtosis'],
                      index=None)
    df['Type'] = np.nan

    df.loc[df['Kurtosis'] > 1, 'Type'] = 'Too Peaked'
    df.loc[df['Kurtosis'] < -1, 'Type'] = 'Too Flat'
    df.loc[(df['Kurtosis'] <= 1) & (df['Kurtosis'] >= -1), 'Type'] = 'Normal'
    
    df['abs'] = abs(df['Kurtosis'])
    df = df.sort_values(by=['abs', 'Type'], ascending=False)
    df = df.drop('abs', axis=1)

    return df

In [None]:
def not_none(x):
    if x is not None:
        return True
    else:
        return False

In [None]:
def axis_add_xaxis(
        labels,
        offset_first_axis=30,
        offset_others=15,
        labelsize=7,
        labelcolor='#808080',
        sepoffset=None,
        sepwidth=0.5,
        seplength=10,
        sepcolor='#AAAAAA',
        ax=None):

    if ax is None: ax = plt.gca()

    ax_xticks = ax.get_xticks()
    ax_xticks_length = len(ax_xticks)
    ax_xticks_min, ax_xticks_max = ax_xticks[0], ax_xticks[-1]
    ax_xticks_lim = ax.get_xlim()
    ax.tick_params(axis='x', labelcolor=labelcolor, labelsize=labelsize)

    for i, l in enumerate(labels):

        # add spaces between labels
        if len(l) < len(ax_xticks):
            len_corrected = (2*len(l)-1) + 2
            l = list_add_after_every(l, ' ', 1)
            l = [''] + l
        else:
            len_corrected = len(l)
        
        t = np.linspace(ax_xticks_min, ax_xticks_max, len_corrected)

        ax_labels = ax.secondary_xaxis('bottom')
        ax_labels.set_xticks(ticks=t, labels=l)
        ax_labels.spines['bottom'].set_visible(False)
        ax_labels.spines['bottom'].set_position(('outward', offset_first_axis))
        ax_labels.tick_params(
            axis='x', bottom=False, labelcolor=labelcolor, labelsize=labelsize)

        # ax for separator lines
        ax_sep = ax.secondary_xaxis('bottom')
        ax_sep.set_xticks(ticks=t)
        ax_sep.spines['bottom'].set_visible(False)
        ax_sep_offset = sepoffset or 2.5
        ax_sep.spines['bottom'].set_position(('outward', offset_first_axis+offset_others-ax_sep_offset))
        ax_sep.tick_params(
            axis='x', width=sepwidth, length=seplength,
            labelbottom=False,
            color=sepcolor, labelcolor=labelcolor)

        # hide every second tick (not count first and last)
        for i in ax_sep.xaxis.get_major_ticks()[1:-1][::2]:
            i.set_visible(False)
        ax_sep.xaxis.get_major_ticks()[0].set_visible(False)
        ax_sep.xaxis.get_major_ticks()[-1].set_visible(False)

        offset_first_axis += offset_others

In [None]:
def put_column_after(data, column_to_move, column_insert_after):

    '''
    Moves 'column_to_move' from its position to the position after 'column_insert_after'

    Before:
     col1 | column_insert_after | col2 | col3 | col4 | column_to_move | col5
    -------------------------------------------------------------------------
    
    After:
     col1 | column_insert_after | column_to_move | col2 | col3 | col4 | col5
    -------------------------------------------------------------------------
    '''

    df = data.copy()
    
    col = df.pop(column_to_move)
    idx = df.columns.get_loc(column_insert_after) + 1
    df.insert(idx, column_to_move, col)

    return df

In [None]:
def data_describe(data):
    
    df = data.copy()
    # varibles types
    dtypes = df.dtypes.rename('Type').to_frame()
    # frequency
    frequency = df.count().rename('Count').to_frame()
    # unique values
    unique = df.nunique().rename('Unique').to_frame()
    # NaNs
    nans = df.isnull().sum().rename('NaN').to_frame()
    # NaNs fraction
    nans_frac = df.isnull().mean().round(2)
    nans_frac = nans_frac.rename('Percentages').to_frame()
    # list with results
    results_list = [dtypes, frequency, unique, nans, nans_frac]
    # df with results
    results = pd.concat(results_list, axis=1)
    results['Percentages'] = (results['Percentages'] * 100).astype('int64')
    results = results.sort_values(['NaN'], ascending=False)
    
    return results

In [None]:
def plot_gridplot(
        data, features, target=None, figsize=None, ncols=3, kind='reg',
        plot_shape='rectangle', markersize=15, hscale=1, pscale=1, regplot_kwargs={},
        pointplot_kwargs={}, scatterplot_kwargs={}, histplot_kwargs={}):

    nrows = math.ceil(len(features) / ncols)
    nplots = np.arange(1, len(features)+1)

    if plot_shape == 'square':
        whscale=(2,2)
    if plot_shape == 'rectangle':
        whscale=(4,2.5)

    if figsize is not None:
        figsize = figsize
    else:
        width = whscale[0] * ncols
        height = whscale[1] * nrows
        figsize_width = width * pscale
        figsize_height = height * pscale
        figsize = (figsize_width, figsize_height)
    
    fig = plt.figure(figsize=figsize)
    
    if kind == 'reg':
        for feature, plot in zip(features, nplots):
            plt.subplot(nrows, ncols, plot)
            sns.regplot(
                data=data,
                x=feature,
                y=target,
                scatter_kws={
                    'ec': '#606060',
                    's': markersize,
                    'alpha': 0.9
                },
                **regplot_kwargs
            )
            plt.ylabel(None)

    if kind == 'point':
        for feature, plot in zip(features, nplots):
            plt.subplot(nrows, ncols, plot)
            sns.pointplot(
                data=data,
                x=feature,
                y=target,
                markersize=markersize,
                linestyle='none',
                capsize=0.031,
                err_kws={'lw': 0.81*pscale},
                **pointplot_kwargs
            )
            plt.ylabel(None)
            plt.xticks(rotation=45)

    if kind == 'hist':
        for feature, plot in zip(features, nplots):
            plt.subplot(nrows, ncols, plot)
            sns.histplot(
                data=data,
                x=feature,
                alpha=0.95,
                **histplot_kwargs
            )
            plt.ylabel(None)

    if kind == 'scatter':
        for feature, plot in zip(features, nplots):
            plt.subplot(nrows, ncols, plot)
            sns.scatterplot(
                data=data,
                x=feature,
                y=target,
                s=markersize,
                **scatter_kwargs
            )
            plt.ylabel(None)
        
    plt.subplots_adjust(hspace=0.4*hscale)
    plt.show()
    return fig

In [None]:
def legend_inline(
        ncols=None,
        loc='lower left',
        bbox_to_anchor=(0,1),
        frameon=False,
        ax=None):

    if ax is None: ax = plt.gca()
    ncols_fact = len(ax.get_legend_handles_labels()[0])

    ncols = ncols or ncols_fact or 6

    params = {
        'ncols': ncols,
        'loc': loc,
        'bbox_to_anchor': bbox_to_anchor,
        'frameon': frameon
    }
    
    return params

def legend_mid(
        frameon=False,
        loc='upper left',
        bbox_to_anchor=(1,1),
        markersize=1,
        labelspacing=0.5,
        alignment='left'
):

    params = {
        'frameon': frameon,
        'loc': loc,
        'bbox_to_anchor': bbox_to_anchor,
        'markerscale': markersize,
        'alignment': alignment,
        'labelspacing': labelspacing,
    }
    
    return params

In [108]:
def ts_detrending_differences(data, add_nan=True):

    arr = np.asarray(data)

    if add_nan:
        result = [np.NaN]
    else:
        result = []
    
    for i in range(1, len(arr)):
        val = arr[i] - arr[i - 1]
        result.append(val)

    return result

In [215]:
def ts_scatterplot_matrix(
        x,
        y=None,
        lags=12,
        ncols=3,
        s=10,
        figsize=(10, 5),
        figtitle=None,
        lowess=False,
        return_fig=False,
        constrained_layout=True):

    if y is None:
        first_lag = 1
        nrows = math.ceil(len(arange(lags))/ncols)
        remove_axs = nrows*ncols - len(arange(lags))
    else:
        first_lag = 0
        nrows = math.ceil(len(arange(lags, True))/ncols)
        remove_axs = nrows*ncols - len(arange(lags, True))

    idx = 0

    fig, ax = plt.subplots(
        nrows, ncols, figsize=figsize, constrained_layout=constrained_layout)
    ax = ax.ravel()

    for i in arange(first_lag, lags, True):
        x_shift = np.roll(x, i, axis=0)[i:]
        if y is None:
            x_ = x[i:].copy()
        else:
            x_ = y[i:].copy()
        corr = scipy.stats.pearsonr(x_shift, x_)[0]
        corr = str(round(corr, 4))
        ax[idx].scatter(x_shift, x_, s=s)
        # title
        ax[idx].set_title(f'Lag: {i}', pad=10)
        
        # correlation coefficient
        ax[idx].annotate(
            corr, xy=(0.9, 0.9), xycoords='axes fraction', size=9,
            bbox=dict(
                facecolor='#FEFEFE', edgecolor=palette[0], lw=0.25,
                boxstyle='square'))
        
        # calculate smooth with LOWESS
        if lowess:
            lowess = statsmodels.nonparametric.smoothers_lowess.lowess
            lowess_line = lowess(x_, x_shift, return_sorted=True)
            ax[idx].plot(
                lowess_line[:, 0], lowess_line[:, 1],
                lw=2, color=palette[1])

        idx +=1

    if remove_axs != 0:
        # remove unnecessary plots
        for i in arange(remove_axs, 0, -1):
            fig.delaxes(ax[-i])

    if constrained_layout is False:
        plt.subplots_adjust(wspace=0.3, hspace=0.5)
        y_adj = 0.95
    else:
        y_adj = None

    if figtitle is not None:
        fig.suptitle(figtitle, y=y_adj)

    plt.show()

    if return_fig:
        return fig

In [None]:
def plot_acf(
        acf_w_alphas=None, data=None, lags=40, partial=False, scatter=False, s=2,
        transparency_lines=1, color_lines=None, exclude_first=True,
        transparency_significant=0.15, color_significant=None, **kwargs):

    if acf_w_alphas is None:
        acf_w_alphas = ts_acf_calculate(data, lags=lags, partial=partial, **kwargs) 
        
    acf = acf_w_alphas[:, 0]
    alphas = acf_w_alphas[:, 1:]
    
    lags = len(acf)
    xticks = arange(lags)
    color_palette = plt.rcParams['axes.prop_cycle'].by_key()['color']

    color_significant = color_significant or color_palette[2]
    color_lines = color_lines or color_palette[0]

    if exclude_first:
        acf[0] = 0
        alphas[:1] = 0

    if scatter:
        plt.scatter(
            x=xticks,
            y=acf,
            s=s
        )
    for i in arange(lags):
        plt.plot(
            [i, i],
            [0, acf[i]],
            color=color_lines,
            alpha=transparency_lines
        )
    if exclude_first:
        plt.fill_between(
            arange(lags)[1:],
            (alphas[:, 0] - acf)[1:],
            (alphas[:, 1] - acf)[1:],
            lw=0,
            color=color_significant,
            alpha=transparency_significant
        )
    else:
        plt.fill_between(
            arange(lags),
            alphas[:, 0] - acf,
            alphas[:, 1] - acf,
            lw=0,
            color=color_significant,
            alpha=transparency_significant
        )

    plt.plot([-1, lags], [0, 0])
    plt.gca().spines[['bottom', 'left']].set_visible(False)
    plt.grid(False)
    plt.xlim(-2, lags+1)
    plt.show()

In [None]:
def ts_acf_calculate(data, lags=36, alpha=0.05, partial=False, **kwargs):

    if partial:
        acf_result = statsmodels.tsa.stattools.pacf(
            data, nlags=lags, alpha=alpha, method='ywadjusted', **kwargs)
    else:
        acf_result = statsmodels.tsa.stattools.acf(
            data, nlags=lags, alpha=alpha, missing='none', **kwargs)

    acf = acf_result[0]
    alphas = acf_result[1]
    result = np.hstack([acf.reshape(-1,1), alphas])
    
    return result

In [None]:
def ts_acf_last_significant_index(data, lags=36, partial=False):
    '''
    Return index of first insignificant element in ACF or PACF

    Attributes:
        ci - confident intervals for ACF value (example, result[1] of statsmodels.tsa.stattools.acf)
    '''
    acf = ts_acf_calculate(data, lags=lags, partial=partial)
    ci = acf[:, 1:]
    
    for i, j in enumerate(ci):
        status = np.all(j > 0) if j[0] > 0 else np.all(j < 0)
        if not status:
            break
    return i-1

In [3]:
def ax_current():
    return plt.gca()

In [None]:
def legend_create_handles(
        n=None,
        kind='line',
        labels=True,
        colors=None,
        alpha=1,
        markersize=3):

    if n is None:
        n = len(ax_current().get_legend_handles_labels()[0])
    if colors is None:
        colors = plt.rcParams['axes.prop_cycle'].by_key()['color']
    
    handles = []

    if kind == 'line':
        marker = None
        markersize = None
        linestyle = '-'
        alpha = alpha
    elif kind == 'rect':
        marker = 's'
        markersize = markersize
        linestyle = 'None'
        alpha = alpha
    elif kind == 'point':
        marker = 'o'
        markersize = markersize
        linestyle = 'None'
        alpha = alpha
    else:
        print("'kind' must be 'line', 'rect' or 'point'")

    for i, (v, c) in enumerate(zip(arange(n), colors)):
        handles.append(
            Line2D(
                [], [], marker=marker, markersize=markersize,
                linestyle=linestyle, lw=1.5, color=c, alpha=alpha),
        )

    result = dict(handles=handles)
    
    if (kind == 'rect') | ((kind == 'point')):
        result['handletextpad'] = 0

    if labels:
        if labels is True:
            labels = ax_current().get_legend_handles_labels()[1]
        else:
            pass
        result['labels'] = labels

    return result

In [3]:
def axis_adjust_barplot(
        axis='x',
        line_hidden=False,
        labelsize=9,
        labelcolor='#606060',
        weight='bold', 
        pad=-5,
        ax=None,
        **kwargs):
    
    if ax is None: ax = plt.gca()
        
    if axis == 'x':
        ax.spines['bottom'].set_bounds(
            ax.patches[0].get_x(),
            ax.patches[-1].get_x() + ax.patches[-1].get_width())
        ax.set_xticklabels(ax.get_xticklabels(), weight=weight)

        if line_hidden:
            ax.spines['bottom'].set_visible(False)
            ax.tick_params(axis='x', bottom=False)
        
    if axis == 'y':
        ax.spines['left'].set_bounds(
            ax.patches[0].get_y(),
            ax.patches[-1].get_y() + ax.patches[-1].get_height())
        ax.set_yticklabels(ax.get_yticks(), weight=weight)

        if line_hidden:
            ax.spines['left'].set_visible(False)
            ax.tick_params(axis='y', left=False)

    ax.tick_params(
        axis=axis, labelsize=labelsize, labelcolor=labelcolor,
        pad=pad, **kwargs) 

In [None]:
def save_session(name, directory='sessions'):
    if directory != 'sessions':
        directory = f'sessions/{directory}/'
    else:
        directory = 'sessions/'
    # check if dir exists and create it if not
    if not os.path.exists(directory):
        os.mkdir(directory)
    # save session
    dill.dump_session(directory+name)


def load_session(name, directory='sessions'):
    if directory != 'sessions':
        directory = f'sessions/{dir}/'
    else:
        directory = 'sessions/'
    # load session
    dill.load_session(directory+name)

In [None]:
def test_LLR(m1, m2, df=1):

    '''
    Dependencies:
        - from scipy.stats import chi2
    '''
    
    l1 = m1.llf
    l2 = m2.llf
    lr = (2*(l2-l1))
    p = chi2.sf(lr, df).round(3)
    
    return print(f'p-value: {p}')

In [264]:
def generate_feature_by_minutes(data, feature, first_month):

    df = data.copy()
    
    df_agg = (df
              .groupby(by=['month', 'day', 'hour', 'minute'], as_index=False)
              .agg({feature:'median'}))
    
    new_feature = (df
         .reset_index()
         .merge(df_agg,
             on=['month', 'day', 'hour', 'minute'], how='left',
             suffixes=('', '_by_minutes'))
         .set_index('index', drop=True)).asfreq('10min')[feature+'_by_minutes'].copy()

    len_values_of_the_first_month = len(df.loc[first_month])
    new_feature = new_feature.shift(len_values_of_the_first_month).copy()
    new_feature.index.name = None

    return new_feature

In [None]:
def dt_column_to_index(data, column, format=None, **kwargs):
    df = data.copy()
    df[column] = pd.to_datetime(df[column], format=format, **kwargs)
    df = df.set_index(column)
    df.index.name = None

    return df

In [None]:
def ts_arima_forecast(model, steps, data, exog=None, ci=[80, 95]):

    df = data.copy()
    results = model.get_forecast(steps=steps, exog=exog)

    forecasts = pd.DataFrame(
        index = pd.date_range(
            df.index[0], results.predicted_mean.index[-1], freq=df.index.freq),
        data=pd.concat([
            df.iloc[:, 0], results.predicted_mean], axis=0),
        columns=['data'])

    forecasts['is_forecast'] = np.where(
        forecasts.index.date < results.predicted_mean.index[0].date(), 0, 1)

    for ci_value in ci:
        alpha = (100 - ci_value) / 100
        forecasts[f'lower_ci{ci_value}'] = \
            results.conf_int(alpha=alpha).iloc[:, 0]
        forecasts[f'upper_ci{ci_value}'] = \
            results.conf_int(alpha=alpha).iloc[:, 1]

    return forecasts

In [370]:
def ts_plot_arima_forecast(
        forecasts=None, model=None, steps=100, data=None,
        exog=None, ci=[80, 95], alpha_ci1=0.2, alpha_ci2=0.1):

    if model is not None:
        forecasts = ts_arima_forecast(model, steps=steps, data=data, exog=exog, ci=[80, 95])

    plt.plot(
    forecasts[forecasts['is_forecast']==0]['data'])
    
    plt.plot(
        forecasts[forecasts['is_forecast']==1]['data'],
        color=saturate_color(palette[2], 1.5),
        label='Forecast')
    
    plt.fill_between(
        x=forecasts[forecasts['is_forecast']==1].index,
        y1=forecasts[forecasts['is_forecast']==1]['lower_ci80'],
        y2=forecasts[forecasts['is_forecast']==1]['upper_ci80'],
        lw=0,
        color=palette[2],
        alpha=alpha_ci1,
        label='Level 80%')
    
    plt.fill_between(
        x=forecasts[forecasts['is_forecast']==1].index,
        y1=forecasts[forecasts['is_forecast']==1]['lower_ci95'],
        y2=forecasts[forecasts['is_forecast']==1]['upper_ci95'],
        lw=0,
        color=palette[2],
        alpha=alpha_ci2,
        label='Level 95%')
    
    plt.legend(**legend_inline(), labelspacing=0.75);

In [None]:
def ts_arima_fourier_create_exog(y, period1=144, period2=72, order1=9, order2=9):
    
    fourier_s1 = statsmodels.tsa.deterministic.Fourier(period1, order1)
    fourier_s1_y = fourier_s1.in_sample(y.index)
    fourier_s2 = statsmodels.tsa.deterministic.Fourier(period2, order2)
    fourier_s2_y = fourier_s2.in_sample(y.index)
    fourier_s12_y = pd.concat([fourier_s1_y, fourier_s2_y], axis=1)
    
    return fourier_s12_y

In [492]:
def cv_split_indexes(data, start, train_size, test_size, size_unit, n_splits, freq):

    '''
    Sliding window TS-split
    '''
    
    df = data.copy()

    train_window = {size_unit: train_size}
    train_offset = pd.offsets.DateOffset(**train_window)

    if isinstance(start, str):
        start_date = pd.to_datetime(start) - train_offset
    else:
        start_date = start - train_offset

    train_indexes_list = []
    test_indexes_list = []

    for n in arange(n_splits):
        
        train_window = {size_unit: train_size}
        train_offset = pd.offsets.DateOffset(**train_window)
        train_start = start_date
        train_end = train_start + train_offset
        train_indexes = pd.date_range(train_start, train_end, freq=freq)[:-1]
        train_indexes_list.append(train_indexes)
        
        test_window = {size_unit: test_size}
        test_offset = pd.offsets.DateOffset(**test_window)
        test_start = start_date + train_offset
        test_end = test_start + test_offset
        test_indexes = pd.date_range(test_start, test_end, freq=freq)[:-1]
        test_indexes_list.append(test_indexes)

        start_date = start_date + test_offset

    return train_indexes_list, test_indexes_list

In [452]:
def cv_model_evaluation(
        data, start, train_size, test_size, size_unit, n_splits, freq,
        orders, fourier_periods, fourier_orders, exog_variables=None):

    time_start = time.time()
    
    train_indexes, test_indexes = cv_split_indexes(
        data, start, train_size, test_size, size_unit, n_splits, freq)

    if len(train_indexes) != len(test_indexes):
        print('ERROR')

    results = {}
    results_ = {}
    models = {}
    models_names = {}

    for split_number in arange(len(train_indexes)):
        
        train_data = data.loc[train_indexes[split_number]]
        test_data = data.loc[test_indexes[split_number]]
        
        if exog_variables is not None:
            exog_variables_train = exog_variables.loc[train_indexes[split_number]]
            exog_variables_test = exog_variables.loc[test_indexes[split_number]]

        for order in orders:
            for fourier_period in fourier_periods:
                for fourier_order in itertools.product(fourier_orders, repeat=len(fourier_period)):

                    # create df with fourier exogs for train dataset
                    exogs_df_train = pd.DataFrame()
                    # create df with fourier exogs for test dataset
                    exogs_df_test = pd.DataFrame()
                    for p, k in zip(fourier_period, fourier_order):

                        fourier_train = statsmodels.tsa.deterministic.Fourier(p, k)
                        exogs_train = fourier_train.in_sample(train_data.index)
                        exogs_df_train = exogs_train.join(exogs_df_train)

                        fourier_test = statsmodels.tsa.deterministic.Fourier(p, k)
                        exogs_test = fourier_test.in_sample(test_data.index)
                        exogs_df_test = exogs_test.join(exogs_df_test)

                    if exog_variables is not None:
                        exogs_df_train = exogs_df_train.join(exog_variables_train)
                        exogs_df_test = exogs_df_test.join(exog_variables_test)
                    
                    # fit model
                    model = SARIMAX(
                        train_data, exog=exogs_df_train,
                        order=order,
                        seasonal_order=(0, 0, 0, 0),
                    ).fit(maxiter=1000, disp=False)

                    # calculate steps for forecast
                    steps = len(test_data)
                    # get forecast
                    forecast = model.get_forecast(steps=steps, exog=exogs_df_test)
                    # y_pred
                    y_pred = forecast.predicted_mean
                    # y_test
                    y_test = test_data
                    # RMSE
                    rmse = root_mean_squared_log_error(y_pred, y_test)

                    k_list = list(fourier_order)

                    model_name = (tuple(order), list(fourier_period), list(fourier_order))
                    model_name_str = f'{tuple(order)}, {list(fourier_period)}, {list(fourier_order)}'

                    models_names[model_name_str] = model_name
                    
                    if model_name_str in results:
                        results[model_name_str] = np.append(results[model_name_str], rmse)
                    else:
                        results[model_name_str] = np.array([rmse])

    # change models in 'result' by 'modelN'-type names and create separate models list
    for (i, key), m in zip(enumerate(results.keys()), models_names.keys()):
        # models[f'model{i}'] = key
        models[f'model{i}'] = models_names[m]
        results_[f'model{i}'] = results[key]

    results_full = {}
    results_full['models'] = models
    results_full['splits'] = results_
    
    clear_output()
    
    time_finish = time.time() - time_start
    time_finish = dt.timedelta(seconds=np.round(time_finish))

    print(f'Execution time: {time_finish}')

    return results_full

In [None]:
def list_add_after_every(lst, element, add_every):
    # using [item for subgroup in groups for item in subgroup]
    lst_new = [
        x for y in (lst[i:i+add_every] + [element] * (i < len(lst) - add_every + 1) 
                    for i in range(0, len(lst), add_every)) for x in y
    ]
    return lst_new

In [700]:
def axis_remove_xaxis(ax=None):

    if ax is None: ax = plt.gca()

    ax.spines['bottom'].set_visible(False)
    ax.tick_params(bottom=False, labelbottom=False)

In [None]:
def slice_weekly_data(start):

    # range_ = pd.date_range(start=start, periods=1, freq='W')
    # end = dt.datetime.strftime(range_.date[0], '%Y-%m-%d')

    start_dt = dt.datetime.strptime(start, '%Y-%m-%d')
    end_dt = start_dt + dt.timedelta(days=6)
    end = dt.datetime.strftime(end_dt, '%Y-%m-%d')

    return slice(start, end)

In [976]:
def fit_model_with_fourier(order, fourier_period, fourier_order, train_data, exog=None):

    exogs_df_train = \
        ts_arima_fourier_get_exogs(fourier_period, fourier_order, train_data)

    if exog is not None:
        exogs_df_train = exogs_df_train.join(exog)

    # fit model
    model = SARIMAX(
        train_data, exog=exogs_df_train,
        order=order,
        seasonal_order=(0, 0, 0, 0),
    ).fit(maxiter=1000, disp=False)

    return model

In [977]:
def ts_arima_fourier_get_exogs(fourier_period, fourier_order, train_data, test_data=None):

    # create df with fourier exogs for train dataset
    exogs_df_train = pd.DataFrame()
    # create df with fourier exogs for test dataset
    exogs_df_test = pd.DataFrame()
    
    for p, k in zip(fourier_period, fourier_order):

        fourier_train = statsmodels.tsa.deterministic.Fourier(p, k)
        exogs_train = fourier_train.in_sample(train_data.index)
        exogs_df_train = exogs_train.join(exogs_df_train)

        if test_data is not None:
            fourier_test = statsmodels.tsa.deterministic.Fourier(p, k)
            exogs_test = fourier_test.in_sample(test_data.index)
            exogs_df_test = exogs_test.join(exogs_df_test)

    if test_data is not None:
        return exogs_df_train, exogs_df_test
    else:
        return exogs_df_train

In [1153]:
def weekly_rmse(data, start, exog=None, periods=4):

    rmse_lst = []
    for i in arange(periods):
        
        train_august_start = data.loc[start].index[0] - dt.timedelta(weeks=2)
        train_august_end = train_august_start + dt.timedelta(days=14) - dt.timedelta(minutes=10)
        train_august = data.loc[train_august_start:train_august_end, 'target1'].copy()
    
        test_august_start = data.loc[start].index[0]
        test_august_end = data.loc[start].index[0] + dt.timedelta(weeks=1) - dt.timedelta(minutes=10)
        test_august = data.loc[test_august_start:test_august_end, 'target1'].copy()
    
        model = fit_model_with_fourier(
            (1,1,2), [144], [4], train_data=train_august, exog=exog)
    
        train_exogs, test_exogs = ts_arima_fourier_get_exogs(
            fourier_period=fourier_period,
            fourier_order=fourier_order,
            train_data=train_data,
            test_data=test_data)

        if exog is not None:
            train_exogs = train_exogs.join(exog)
            test_exogs = test_exogs.join(exog)
            
        forecast = model.get_forecast(steps=len(test_august), exog=test_exogs)
        rmse = root_mean_squared_log_error(forecast.predicted_mean, test_august)
        rmse_lst.append(rmse)

        start_dt = dt.datetime.strptime(start, '%Y-%m-%d') + dt.timedelta(days=7)
        start = dt.datetime.strftime(start_dt, '%Y-%m-%d')
        print(start)

    return rmse_lst

In [1209]:
def week_model_forecasts(
        order, fourier_period, fourier_order,
        data=None, exog=None, start=None, end=None, periods=None):

    '''
    Data - before two weeks since forecast start
    '''

    start_dt = dt.datetime.strptime(start, '%Y-%m-%d')
    end_dt = dt.datetime.strptime(end, '%Y-%m-%d')
    train_start_dt = start_dt - dt.timedelta(weeks=2)
    
    forecasts = pd.DataFrame(
        columns=['forecast', 'lower80', 'upper80', 'lower95', 'upper95'])

    for i in arange(periods):

        train_start = dt.datetime.strftime(train_start_dt, '%Y-%m-%d')
        train_end_dt = train_start_dt + dt.timedelta(weeks=2) - dt.timedelta(minutes=10)
        train_end = dt.datetime.strftime(train_end_dt, '%Y-%m-%d')
        
        test_start_dt = train_start_dt + dt.timedelta(weeks=2)
        test_end_dt = test_start_dt + dt.timedelta(weeks=1) - dt.timedelta(minutes=10)
        test_start = dt.datetime.strftime(test_start_dt, '%Y-%m-%d')
        test_end = dt.datetime.strftime(test_end_dt, '%Y-%m-%d')
        
        train_data = data.loc[train_start:train_end].copy()
        test_data = data.loc[test_start:test_end].copy()

        train_exogs, test_exogs = ts_arima_fourier_get_exogs(
            fourier_period=fourier_period,
            fourier_order=fourier_order,
            train_data=train_data,
            test_data=test_data)

        if exog is not None:
            # train_exogs = train_exogs.join(exog)
            test_exogs = test_exogs.join(exog)

        model = fit_model_with_fourier(
            order, fourier_period, fourier_order, train_data=train_data, exog=exog)
        
        forecast = model.get_forecast(steps=len(test_data), exog=test_exogs)

        y_pred = forecast.predicted_mean
        ci80 = forecast.conf_int(alpha=0.2)
        ci95 = forecast.conf_int(alpha=0.05)
        lower80 = ci80.iloc[:, 0].copy()
        upper80 = ci80.iloc[:, 1].copy()
        lower95 = ci95.iloc[:, 0].copy()
        upper95 = ci95.iloc[:, 1].copy()

        forecast_df = pd.DataFrame(
            index=['forecast', 'lower80', 'upper80', 'lower95', 'upper95'],
            columns=test_data.index,
            data = [y_pred, lower80, upper80, lower95, upper95])

        forecast_df = forecast_df.T

        if not len(forecasts):
            forecasts = forecast_df
        else:
            forecasts = pd.concat([forecasts, forecast_df], axis=0)

        train_start_dt += dt.timedelta(weeks=1)

    return forecasts