In [1]:
import os
import pathlib

import pandas as pd
from numpy import ndarray, unique, nan

from helpers.columns import COL_HEAD
from helpers.predict_data import predict, is_significant

In [2]:
def get_predicted_mortality(df: pd.DataFrame):
    countries = unique(df[COL_HEAD.LOCATION])
    sexes = unique(df[COL_HEAD.SEX])
    ages = unique(df[COL_HEAD.AGE])

    const_df = {
                    COL_HEAD.LOCATION: [],
                    COL_HEAD.SEX: [],
                    COL_HEAD.AGE: [],
                    COL_HEAD.MEAN_OR_EXPECTED_MORTALITY: [],
                    COL_HEAD.MORTALITY: [],
                    COL_HEAD.EXCESS_MORTALITY_BASE: [],
                    COL_HEAD.STANDARD_DEVIATION: [],
                }

    for _, country in enumerate(countries):
        for _, sex in enumerate(sexes):
            for _, age in enumerate(ages):
                X = df[(df[COL_HEAD.LOCATION] == country) & (df[COL_HEAD.SEX] == sex) & (df[COL_HEAD.AGE] == age)][
                    [COL_HEAD.YEAR, COL_HEAD.WEEK, COL_HEAD.MORTALITY]
                ].values
                X = X.astype(int)

                total_excess, sum_expected, sum_actual, total_excess_std = predict(X)

                const_df[COL_HEAD.LOCATION].append(country)
                const_df[COL_HEAD.SEX].append(sex)
                const_df[COL_HEAD.AGE].append(age)
                const_df[COL_HEAD.MEAN_OR_EXPECTED_MORTALITY].append(sum_expected)
                const_df[COL_HEAD.MORTALITY].append(sum_actual)
                const_df[COL_HEAD.EXCESS_MORTALITY_BASE].append(total_excess)
                const_df[COL_HEAD.STANDARD_DEVIATION].append(total_excess_std)

    return pd.DataFrame(const_df)

In [3]:
def get_final_year():
    allowed_years = [COL_HEAD.YEAR_2020, COL_HEAD.YEAR_2021, COL_HEAD.YEAR_2022]
    msg = f'Please provide final year to analize: [allowed are: {allowed_years}] '

    get_year = input(msg)
    while get_year not in allowed_years:
        get_year = input(msg)
    
    print(f'Will analyze data for {get_year}')
    return get_year

In [4]:
def get_start_week():
    allowed_weeks = [str(week) for week in range(1, 53)]
    msg = 'Please provide start week [allowed between 1 and 52]: '
    
    get_week = input(msg)
    while get_week not in allowed_weeks:
        get_week = input(msg)
    
    print(f'Will analyze data from week: {get_week}')
    return int(get_week)

In [5]:
def concat_column_vals(df: pd.DataFrame, main_col, additional_col, brackets):
    return df[main_col].map(str) + brackets[0] + df[additional_col].map(str) + brackets[1]

In [6]:
#  Get Cleaned Mortality from eurostat
cleaned_mortality_file = pathlib.Path(f'{os.getcwd()}/../../../cleaned_data/Eurostat_mortality_2015_2022(Bulgaria).csv')
df = pd.read_csv(cleaned_mortality_file)

In [7]:
analize_cols = [COL_HEAD.SEX, COL_HEAD.AGE, COL_HEAD.LOCATION, COL_HEAD.WEEK, 
                COL_HEAD.YEAR_2015, COL_HEAD.YEAR_2016, COL_HEAD.YEAR_2017, COL_HEAD.YEAR_2018, COL_HEAD.YEAR_2019]
final_year = get_final_year()
analize_cols.append(final_year)
df = df.loc[:, analize_cols]
df = df[df['Age']!='Total']

Will analyze data for 2021


In [8]:
start_week = get_start_week()
df = df[df['Week'].ge(start_week)]

Will analyze data from week: 1


In [9]:
group_cols = [COL_HEAD.AGE, COL_HEAD.SEX, COL_HEAD.LOCATION]
df2 = df[df[final_year].isnull()].copy()
indices = df2.groupby(group_cols).filter(lambda x: min(x['Week']) < 53).index
df.drop(indices, axis=0, inplace=True)
del df2

In [10]:
df = df.fillna(method='pad')

In [11]:
df = df.melt(id_vars=[COL_HEAD.LOCATION, COL_HEAD.WEEK, COL_HEAD.SEX, COL_HEAD.AGE],
                         var_name=COL_HEAD.YEAR,
                         value_name=COL_HEAD.MORTALITY)
    
df[COL_HEAD.YEAR] = df[COL_HEAD.YEAR].astype(int)

In [12]:
predicted_mort = get_predicted_mortality(df)

In [None]:
predicted_mort[COL_HEAD.Z_SCORE] = predicted_mort.apply(lambda x:
                                x[COL_HEAD.EXCESS_MORTALITY_BASE] / x[COL_HEAD.STANDARD_DEVIATION]
                                if x[COL_HEAD.STANDARD_DEVIATION] != 0
                                else nan,
                                axis=1)
                                
predicted_mort[COL_HEAD.IS_SIGNIFICANT] = predicted_mort.apply(lambda x: is_significant(x[COL_HEAD.Z_SCORE]), axis=1)

predicted_mort[COL_HEAD.CONFIDENCE_INTERVAL] = predicted_mort.apply(lambda x: 1.96 * x[COL_HEAD.STANDARD_DEVIATION],
                                            axis=1).round(1)

predicted_mort[COL_HEAD.LB_MEAN_MORTALITY] = predicted_mort[COL_HEAD.MEAN_OR_EXPECTED_MORTALITY] - predicted_mort[COL_HEAD.CONFIDENCE_INTERVAL]
predicted_mort[COL_HEAD.UB_MEAN_MORTALITY] = predicted_mort[COL_HEAD.MEAN_OR_EXPECTED_MORTALITY] + predicted_mort[COL_HEAD.CONFIDENCE_INTERVAL]

predicted_mort[COL_HEAD.P_SCORE] = predicted_mort.apply(lambda x:
                                    (
                                            (x[COL_HEAD.MORTALITY] - x[COL_HEAD.MEAN_OR_EXPECTED_MORTALITY])
                                            /
                                            x[COL_HEAD.MEAN_OR_EXPECTED_MORTALITY]
                                    ) * 100
                                    if x[COL_HEAD.MEAN_OR_EXPECTED_MORTALITY] != 0
                                    else 0,
                                    axis=1).round(1)

predicted_mort[COL_HEAD.P_SCORE_FLUCTUATION] = predicted_mort.apply(lambda x:
                                                x[COL_HEAD.P_SCORE]
                                                -
                                                (
                                                        (
                                                                (x[COL_HEAD.MORTALITY] - x[COL_HEAD.UB_MEAN_MORTALITY])
                                                                / x[COL_HEAD.UB_MEAN_MORTALITY]
                                                        )
                                                        * 100
                                                )
                                                if x[COL_HEAD.UB_MEAN_MORTALITY] != 0
                                                else nan,
                                                axis=1).round(1)

predicted_mort = predicted_mort.round(1)

predicted_mort[COL_HEAD.MEAN_MORTALITY_DECORATED] = concat_column_vals(predicted_mort,
                                                                    COL_HEAD.MEAN_OR_EXPECTED_MORTALITY,
                                                                    COL_HEAD.CONFIDENCE_INTERVAL,
                                                                    [' (±', ')'])

predicted_mort[COL_HEAD.EXCESS_MORTALITY_DECORATED] = concat_column_vals(predicted_mort,
                                                                    COL_HEAD.EXCESS_MORTALITY_BASE,
                                                                    COL_HEAD.CONFIDENCE_INTERVAL,
                                                                    [' (±', ')'])

predicted_mort[COL_HEAD.P_SCORE_DECORATED] = concat_column_vals(predicted_mort,
                                                            COL_HEAD.P_SCORE,
                                                            COL_HEAD.P_SCORE_FLUCTUATION,
                                                            ['% (±', '%)'])

In [None]:
file_name = f'Predicted_excess_mortality_by_location_sex_age_year:{final_year}.csv'
path = pathlib.Path(f'{os.getcwd()}/../../../output_data/Excess_mortality/{file_name}')
predicted_mort.to_csv(path, index=False)