# Get DataFrame dictionary and set another variables

In [None]:
import os
import ast

df_dictionary = pd.read_csv("../data/utils/DF_DICTIONARY.csv")

years_of_military_dictatorship = [
    (1930,1932),
    (1943,1946),
    (1955,1958),
    (1962,1963),
    (1966,1973),
    (1976,1983)
]

def get_df_categorical_values():
    file_path = "../data/utils/DF_CATEGORICAL_COLORS.csv"
    if os.path.exists(file_path):
        df_categorical_values = pd.read_csv("../data/utils/DF_CATEGORICAL_COLORS.csv")
        df_categorical_values["Indicator Colors"] = df_categorical_values["Indicator Colors"].apply(ast.literal_eval)
    else:
        df_categorical_values = pd.DataFrame(columns=["Indicator Name", "Indicator Colors"])
        df_categorical_values.to_csv("../data/utils/DF_CATEGORICAL_COLORS.csv", index=False, quoting=1)
    return df_categorical_values

df_categorical_values = get_df_categorical_values()
    
base_colors_category = {
    "grey":(178/255, 198/255, 213/255),
    "green":(48/255, 152/255, 152/255),
    "yellow":(255/255, 159/255, 0/255),
    "orange":(244/255, 99/255, 30/255),
    "red":(203/255, 4/255, 4/255)
}

Setting initial plots styles

In [None]:
sns.set_style(
    rc={
        "figure.figsize": (8, 6)
    },
    style="whitegrid"
)

# DataFrame Dictionary Functions

In [None]:
def set_new_dictionary_entry(indicator_name, indicator_code, indicator_code_snake_case, indicator_name_spanish=""):
    df_dictionary = pd.read_csv("../data/utils/DF_DICTIONARY.csv")
    if df_dictionary.loc[df_dictionary["Indicator Code"] == indicator_code].empty:
        df_dictionary = pd.concat(
            [
                df_dictionary,
                pd.DataFrame({
                    "Indicator Name": [indicator_name],
                    "Indicator Code": [indicator_code],
                    "Indicator Code Snake Case": [indicator_code_snake_case],
                    "Indicator Name in Spanish": [indicator_name_spanish]
                })
            ]
        )
    df_dictionary.to_csv("../data/utils/DF_DICTIONARY.csv", index=False)
    return df_dictionary
    
def get_indicator_name(indicator_code, indicator_code_in_snake_case=False):
    if indicator_code_in_snake_case == False:
        df_filtered = df_dictionary.loc[df_dictionary['Indicator Code'] == indicator_code, 'Indicator Name']
        if df_filtered.empty:
            return ''
        else:
            return df_filtered.iloc[0]
    else:
        df_filtered = df_dictionary.loc[df_dictionary['Indicator Code Snake Case'] == indicator_code, 'Indicator Name']
        if df_filtered.empty:
            return ''
        else:
            return df_filtered.iloc[0]
        
def transform_indicator_code(indicator_code, to_snake_case=False):
    if to_snake_case:
        return str.replace(indicator_code,".","_").lower()
    else:
        return str.replace(indicator_code,"_",".").upper()
    
def set_new_categorical_colors(indicator_name, colors):
    df_categorical_values = get_df_categorical_values()
    if (df_categorical_values.loc[df_categorical_values["Indicator Name"] == indicator_name].empty):
        
        if colors is None:
            colors=sns.color_palette("hls", len(indicator_values_order))
            
        df_categorical_values = pd.concat(
            [
                df_categorical_values,
                pd.DataFrame({
                    "Indicator Name": [indicator_name],
                    "Indicator Colors": [colors]
                })
            ]
        )
    df_categorical_values.to_csv("../data/utils/DF_CATEGORICAL_COLORS.csv", index=False)
    return df_categorical_values

def get_indicator_color(indicator_name):
        df_filtered = df_categorical_values.loc[df_categorical_values['Indicator Name'] == indicator_name, "Indicator Colors"]
        if df_filtered.empty:
            return ''
        else:
            return df_filtered.iloc[0]

# Get columns

In [1]:
def get_columns_with_missing_values(data):
    columns_with_missing_values = data.isna().sum().sort_values(ascending=False)
    columns_with_missing_values = columns_with_missing_values[columns_with_missing_values > 0].index.to_list()
    return columns_with_missing_values

In [None]:
def get_columns(data):
    indicators_names = [item + ", " + get_indicator_name(item, True) for item in data.columns]
    categorical_cols = data.select_dtypes(include=['object']).columns
    numeric_cols = data.select_dtypes(include=['number']).columns
    columns_with_missing_values = get_columns_with_missing_values(data)
    return indicators_names, categorical_cols, numeric_cols, columns_with_missing_values

# Filter by quantity of missing values

In [None]:
def filter_by_cols_first(df, percentage):
    cols_filtered_first = df.dropna(thresh=int(df.shape[0] * percentage), axis=1) #thresh: quantity of non-null rows or column required
    return cols_filtered_first.dropna(thresh=int(cols_filtered_first.shape[1] * percentage), axis=0)

In [None]:
def filter_by_rows_first(df, percentage):
    rows_filtered_second = df.dropna(thresh=int(df.shape[1] * percentage), axis=0)
    return rows_filtered_second.dropna(thresh=int(rows_filtered_second.shape[0] * percentage), axis=1)


In [None]:
def add_indicators_of_year(df, year):
    new_year_indicator = arg_di_df[arg_di_df["year"] == year]
    new_year_indicator.missing.missing_case_summary()
    df = pd.concat([df, new_year_indicator]).sort_index()

# Plot functions

In [None]:
def plot_2_missing_variables(data, variable_1, variable_2, type_of_plot="scatterplot", is_save_figure=False): 
    if type_of_plot=="scatterplot":
        data_with_na_filled = (
            data
                .apply(
                    axis="rows",
                    func= lambda column: column.fillna(column.min()) if column.name in [variable_1.split("_NA")[0], variable_2.split("_NA")[0]] else column 
                )
                .assign(
                    nullity=lambda df: (df[variable_1] == "Missing") | (df[variable_2] == "Missing")
                
                )
        )
        plt.xlabel(get_indicator_name(variable_1.split("_NA")[0],True))
        plt.ylabel(get_indicator_name(variable_2.split("_NA")[0],True))
        sns.scatterplot(data=data_with_na_filled, x=variable_1.split("_NA")[0], y=variable_2.split("_NA")[0], hue="nullity")
    
    if is_save_figure:
        save_figure(f"Scatterplot of {get_indicator_name(variable_1.split('_NA')[0],True)} and {get_indicator_name(variable_2.split('_NA')[0],True)}", "eda/scatterplots")
    
    plt.show()

In [None]:
def scatter(data, variable_1, variable_2, hue=None): 
    data_with_na_filled = (
        data
            .apply(
                axis="rows",
                func= lambda column: column.fillna(column.min()) if column.name in [variable_1, variable_2] else column 
            )
    )
    plt.xlabel(get_indicator_name(variable_1))
    plt.ylabel(get_indicator_name(variable_2))
    sns.scatterplot(data=data_with_na_filled, x=variable_1, y=variable_2, hue=hue)
    plt.show()

In [None]:
def plot_hist_missing_and_filled_data(df, columns):
    return (
        df
            .select_columns(*columns)
            .missing.bind_shadow_matrix(true_string=True, false_string=False)
            .apply(
                axis=1,
                func=lambda column: column.fillna(column.mean())
                if "_NA" not in column.name
                else column,
            )
            .pivot_longer(index="*_NA")
            .pivot_longer(
                index=["variable", "value"],
                names_to="variable_NA",
                values_to="value_NA"
            )
            .assign(
                valid=lambda df: df.apply(
                    axis=1, func=lambda column: column.variable in column.variable_NA
                )
            )
            .query("valid")
            .pipe(
                lambda df: (
                    sns.displot(
                        data=df,
                        x="value",
                        hue="value_NA",
                        col="variable",
                        common_bins=False,
                        facet_kws={"sharex": False, "sharey": False}
                    )
                )
            )
    )

In [None]:
def time_series(data, x, y=[], figsize=(12,5), plot_labels=None, background_colors="dictatorship", title="", is_save_figure=False):
    
    plt.figure(figsize=figsize)
    plt.xticks(ticks=data[x][::5])
    plt.grid(True)
    plt.title(title)
        
    colors=sns.color_palette("hls", len(y))
    for i, item in enumerate(y):
        if plot_labels is None:
            plot_label=get_indicator_name(item,True)
        else:
            plot_label=plot_labels[i]
        sns.lineplot(x=data[x],y=data[item],label=plot_label, color=colors[i % len(colors)])
        
    plt.ylabel("")
    plt.xlabel("year")
    
    if background_colors == "dictatorship":
        min_year=data["year"].min()
        for start, end in years_of_military_dictatorship[4:]:
            plt.axvspan(max(start,min_year),end,color="green",alpha=0.3,label="Periods of Military Dictatorship")
    elif background_colors == "gdp_growth":
        color_map = {"Negative growth": "black", "Low growth": "grey", "Moderate growth": "lightgrey", "High growth": "white"}
        start_year = data["year"].iloc[0]
        current_type_of_growth = data["ny_gdp_mktp_kd_zg_cat"].iloc[0]
        for i in range(1, len(data)):
                if data["ny_gdp_mktp_kd_zg_cat"].iloc[i] != current_type_of_growth or i == len(data) - 1:
                    end_year = data['year'].iloc[i]
                    plt.axvspan(start_year, end_year, color=color_map[current_type_of_growth], alpha=0.3,label="GDP " + current_type_of_growth)
                    start_year = end_year
                    current_type_of_growth = data["ny_gdp_mktp_kd_zg_cat"].iloc[i]
                
    handles, labels = plt.gca().get_legend_handles_labels()
    by_label = dict(zip(labels, handles))
    plt.legend(by_label.values(), by_label.keys())
    
    if is_save_figure:
        save_figure(title, "eda/time_series")

In [2]:
def plot_count_of_years_of_military_dictatorship(df, is_save_figure=False):
    indicator_name = df_dictionary.loc[df_dictionary["Indicator Code"] == "year.of.dictatorship",["Indicator Name"]].iloc[0,0]
    title="Quantity of years of dictatorship"
    
    years_of_dictatorship = pd.DataFrame(df.rename(columns={"year_of_dictatorship": indicator_name}).astype(bool).value_counts(indicator_name,ascending=True))
    years_of_dictatorship = years_of_dictatorship.rename(columns={"count": "Count"}).T
    years_of_dictatorship.plot(kind="barh",stacked=True, color=["green","grey"],title=title)
    
    if is_save_figure:
        save_figure(title)

In [None]:
def plot_categorical(data, column, hue):
    dict_colors = get_indicator_color(hue)
    sns.countplot(data=data, x=column, hue=hue, palette=dict_colors, hue_order=list(dict_colors.keys()), order=list(get_indicator_color(column).keys()))
    plt.xlabel(get_indicator_name(column,True))
    plt.xticks(rotation=45)
    plt.legend(title=get_indicator_name(hue,True), bbox_to_anchor=(1.05, 1), loc='upper left')
    plt.show()

### Imputation Plots

In [1]:
def plot_imputation_by_statistic(data, variable, statistic = "mean"):
    statistic = statistic.lower()
    if statistic == "mean":
        data[variable] = data[variable].fillna(data[variable].mean())
    elif statistic == "mode":
        data[variable] = data[variable].fillna(data[variable].mode().iloc[0])
    elif statistic == "median":
        data[variable] = data[variable].fillna(data[variable].median())
    else:
        raise ValueError("Statistic must be one of: 'mean', 'median', or 'mode'")

            
    sns.displot(
        data=data,
        x=variable,
        hue=variable+"_NA"
    )
    
    plt.title(f"Distribution of '{get_indicator_name(variable, True)}' after {statistic} imputation")
    plt.xlabel(variable)
    plt.ylabel("Count")

# Save figures

In [None]:
def save_figure(fig_name, subfolder="eda", add_timestamp=False, tight=True, dpi=300):
    folder_path = f"../reports/figures/{subfolder}"
    os.makedirs(folder_path, exist_ok=True)
    
    if add_timestamp:
        timestamp = datetime.now().strftime("%Y%m%d-%H%M%S")
        fig_name = f"{fig_name}_{timestamp}"

    file_path = os.path.join(folder_path, f"{fig_name}.png")

    # Save
    plt.savefig(file_path, dpi=dpi, bbox_inches='tight' if tight else None)
    print(f"Figure saved to: {file_path}")

# Detect outliers

In [4]:
def detect_outliers(data, columns=None, treatment=False):
    if columns is None:
        numerical_cols = data.select_dtypes(include=['number']).columns
    else:
        numerical_cols = data[columns].select_dtypes(include=['number']).columns

    print("Outliers detection:")

    for col in numerical_cols:
        if col == "year_of_dictatorship":
            continue
        
        Q1 = data[col].quantile(0.25)
        Q3 = data[col].quantile(0.75)
        IQR = Q3 - Q1
        
        outliers = (data[col] < (Q1 - 1.5 * IQR)) | (data[col] > (Q3 + 1.5 * IQR))
        if(outliers.any()):
            print(f"Outliers of {get_indicator_name(col,True)}")
            print(data.loc[(outliers[outliers].index)][['year',col]])
        else:
            print(f"There are no outliers for {get_indicator_name(col,True)}")
            
        if treatment:
            lower_bound = Q1 - 1.5 * IQR
            upper_bound = Q3 + 1.5 * IQR
            data[col] = np.clip(data[col], lower_bound, upper_bound)


## Rename columns

In [None]:
def remove_prefix(column_name):
    prefixes = ["ordinalencoder__", "onehotencoder__", "remainder__"]
    for prefix in prefixes:
        if column_name.startswith(prefix):
            return column_name.removeprefix(prefix)
    return column_name

# Old functions

## Get the percentage of variables with a minimum percentage of missing values

In [None]:
def percentage_of_variables_with_pct_missing(df: pd.DataFrame, percentage: float, greater_than_pct = True):
    variable_summary = df.missing.missing_variable_summary()
    if greater_than_pct:
        return round(variable_summary[variable_summary['pct_missing'] >= percentage]['variable'].count() / variable_summary.shape[0], 2)
    else:
        return round(variable_summary[variable_summary['pct_missing'] <= percentage]['variable'].count() / variable_summary.shape[0], 2)

## Get the column names with a maximum percentage of missing values

In [None]:
def columns_with_pct_missing(df: pd.DataFrame, percentage: float, greater_than_pct = True):
    variable_summary = df.missing.missing_variable_summary()
    if greater_than_pct:
        missing_variables_code = list(variable_summary[variable_summary['pct_missing'] >= percentage]['variable'])
    else:
        missing_variables_code = list(variable_summary[variable_summary['pct_missing'] <= percentage]['variable'])
    return missing_variables_code

## Get the rows with more missing values

In [None]:
def cases_with_pct_missing(df: pd.DataFrame, percentage: float, greater_than_pct = True):
    variable_summary = df.missing.missing_case_summary()
    if greater_than_pct:
        indexes_of_cases = list(variable_summary[variable_summary['pct_missing'] > percentage]['case'])
    else:
        indexes_of_cases = list(variable_summary[variable_summary['pct_missing'] < percentage]['case'])
    return arg_di_df.iloc[indexes_of_cases]

## Plot functions

In [None]:
def plot_imputation_by_mean(data, columns):
    (
        data[columns]
            .missing.bind_shadow_matrix(true_string=True, false_string=False)
            .apply(
                axis=0,
                func=lambda column: column.fillna(column.mean())
                    if "_NA" not in column.name
                    else column,
            ).iloc[0:20]
            .pipe(lambda df: pd.melt(df, id_vars=[x + "_NA" for x in columns]))
            .pipe(lambda df: pd.melt(df, id_vars=["variable", "value"], var_name="variable_NA", value_name="value_NA"))
            .assign(
                valid=lambda df: df.apply(
                    axis=1, func=lambda column: column.variable in column.variable_NA
                )
            )
            .query("valid")
            .pipe(
                lambda df: (
                    sns.displot(
                        data=df,
                        x="value",
                        hue="value_NA",
                        col="variable",
                        common_bins=False,
                        facet_kws={"sharex": False, "sharey": False}
                    )
                )
            )
    )

In [1]:
def plot_missing_variables_grouped_by_categories(data, categorical_variable_1, categorical_variable_2, variable_with_missing_values):
    data["variable_with_missing_values_na"] = data[variable_with_missing_values].isna().replace([True, False], ["NA", "!NA"])
    
    size_series = (
        data[[categorical_variable_1, categorical_variable_2, "variable_with_missing_values_na"]]
            .groupby([categorical_variable_1, categorical_variable_2, "variable_with_missing_values_na"], dropna=False, as_index=True)
            .size()
    )

    group_sizes_df = size_series.reset_index(name="size")

    stacked_data = group_sizes_df.pivot_table(
        index=[categorical_variable_1, categorical_variable_2],
        columns="variable_with_missing_values_na",
        values="size",
        fill_value=0  # Fill missing values with 0
    )

    stacked_data.plot(
        kind="bar",
        stacked=True,
        color={"!NA": "grey", "NA": "red"},
        edgecolor="black",
    )