import requests
import pandas as pd
from io import StringIO
import matplotlib.pyplot as plt
import datetime

# Download DATA

In [4]:
#Define variables and login
ID_dict = {"SHDPZDRQ20152B1GMIRSA": 'GDP_index', 'SHDPVYDQ20152B1GNMLSA': 'GDP_vol', 'SHDPVYDQ20151B1GNISNA': 'GDP_yoy', "SPRUMMIR20151SA":"Industry_index",
           'SCCSUM2005IR00': 'CPI_index_2016',"SCCSUM2015IR00":"CPI_index_2017" ,'SCCSUM2005IS00': 'CPI_yoy_2016','SCCSUM2015IS00': 'CPI_yoy_2017',
          'MIRFMSPR3XRATPECD': 'PRIBOR_3M', 'MIRFMSPR12RATPECD': 'PRIBOR_12M', 'MEXRHCZEUQNAJEXRVALNOMQ': 'EUR_CZK'}

my_IDs = ','.join(ID_dict.keys())
my_api_key = "202326111959266710376710375FTR2ZOFR7GADZLV"
#my_IDs= "SHDPZDRQ20152B1GMIRSA,SHDPVYDQ20152B1GNMLSA,SHDPVYDQ20151B1GNISNA,SPRUMMIR20151SA,SPRUMMIS20151NA,SCCSUM2005IR00, SCCSUM2015IR00,SCCSUM2005IS00,SCCSUM2015IS00,SCCSUM2015IK00,MIRFMSPR3XRATPECD,MIRFMSPR12RATPECD,MEXRHCZEUDNAJEXRVALNOMD"


In [7]:
def download_data_from_api(api_key, indicator_ids):
    url = f'https://www.cnb.cz/aradb/api/v1/data?indicator_id_list={indicator_ids}&api_key={api_key}'
    response = requests.get(url)
    data=response.text
    if response.status_code == 200:
        df = pd.read_csv(StringIO(data), delimiter=';', decimal=',', thousands=None)

        df['period'] = pd.to_datetime(df['period'], format='%Y%m%d')

        df = df.drop(columns="snapshot_id")

        return df
    else:
        print("Failed to fetch data from the API")
        return None

In [8]:
#Download data
my_df = download_data_from_api(my_api_key, my_IDs)
if my_df is not None:
    print(my_df)

                  indicator_id     period       value
0        SHDPZDRQ20152B1GMIRSA 2023-09-30   99.280690
1        SHDPZDRQ20152B1GMIRSA 2023-06-30   99.580160
2        SHDPZDRQ20152B1GMIRSA 2023-03-31   99.622176
3        SHDPZDRQ20152B1GMIRSA 2022-12-31  100.120022
4        SHDPZDRQ20152B1GMIRSA 2022-09-30  101.350071
...                        ...        ...         ...
12984  MEXRHCZEUDNAJEXRVALNOMD 2015-01-08   27.865000
12985  MEXRHCZEUDNAJEXRVALNOMD 2015-01-07   27.785000
12986  MEXRHCZEUDNAJEXRVALNOMD 2015-01-06   27.695000
12987  MEXRHCZEUDNAJEXRVALNOMD 2015-01-05   27.675000
12988  MEXRHCZEUDNAJEXRVALNOMD 2015-01-02   27.700000

[12989 rows x 3 columns]


if set(ID_dict) == set(my_df["indicator_id"]):
  print("All requested series downloaded successfully")
else:
  print("Some requested series are missing")

# Processing the data

In [None]:
def process_data(my_df, ID_dict):
    my_df['year_month'] = my_df['period'].dt.to_period('M')

    monthly_df = my_df.groupby(['indicator_id', 'year_month']).mean().reset_index()

    monthly_df = pd.pivot(monthly_df, index="year_month", columns="indicator_id", values="value")

    monthly_df.rename(columns=ID_dict, inplace=True)

    return monthly_df


In [None]:
def data_to_current_date(df):
    ct = datetime.datetime.now()
    current_year_month = ct.strftime("%Y-%m")
    df = df[:current_year_month]

    return df

In [None]:
def recalculate_series_to_differente_base(df,column_recalculate, column_contains_base_year, base_year):
#Provide a pandas data frame with index in a date format, and two columns, which are part of one index, but with a different base or one column, but you want to recalculate
#it to a different base year. In the latter case, insert the same column name twice. Provide the base year and the function recalculates the column_recalculate to the  designated base.

    df['year'] = df.index.year

    # Extract data for the year 2015 and calculate the mean for CPI_index_2016
    base_year_df = df[df['year'] == base_year].copy()
    base_mean = base_year_df[column_contains_base_year].mean()

    # Normalize CPI_index_2016 based on the mean value
    df[column_recalculate] = df[column_recalculate].apply(lambda x: x * 100 / base_mean)

    return df

In [None]:
def combine_consecutive_series(df,column1,column2,partition_year, name_new_column):
#Provide a pandas data frame with index in a date format, and two columns, which are part of one series, one ending on a specific date and the other continuing from the following date.
#Provide the year (partition_year) when the first part of the series (column1) ends. Finally provide the name of the new combined column (name_new_column).

    df['year'] = df.index.year

    df[name_new_column] = df.apply(lambda row: row[column1] if row['year'] <= partition_year else row[column2], axis=1)
    df = df.drop([column1, column2], axis=1)

    return df

In [None]:
def growth_from_yoy_index(df,yoy_index_column,new_growth_column):
  df[new_growth_column]=df_combed[yoy_index_column].apply(lambda a: a -100)
  return df

In [None]:
def growth_from_index(df,index_column,new_growth_column):
  df[new_growth_column] = df_combed[index_column].pct_change() * 100
  return df

In [None]:
def backward_fill_quarterly_data(df, start_date, columns):
#Provide a data frame with a date type index, column, or list of columns you want to backward fill (eg. quarterly data to monthly data) and start_date from which you want to start the filling.
    # Subset the DataFrame from the specified start date
    temp_df = df.copy()
    temp_df = temp_df.loc[start_date:]
    temp_df[columns] = temp_df[columns].fillna(method='bfill')
    df = df.combine_first(temp_df)

    return df

In [None]:
def subset_df(df, columns):
  df_sub=df.copy()
  df_sub=df_sub.filter(columns)
  return df_sub

In [None]:
#Process data
df_combed = process_data(my_df, ID_dict)
df_combed =recalculate_series_to_differente_base(df_combed,"CPI_index_2016", "CPI_index_2016", 2015)
df_combed = combine_consecutive_series(df_combed,"CPI_index_2016","CPI_index_2017",2016, "CPI_index")
df_combed = combine_consecutive_series(df_combed,"CPI_yoy_2016","CPI_yoy_2017",2016, "CPI_yoy")
df_combed = growth_from_yoy_index(df_combed,"CPI_yoy","Inflation_yoy")
df_combed = growth_from_yoy_index(df_combed,"GDP_yoy","GDP_growth")
df_combed = growth_from_index(df_combed,"Industry_index","Industry_growth")
df_combed = data_to_current_date(df_combed)
df_combed

In [None]:
#Backward Fill in the quarterly data
quarterly_columns = ["GDP_yoy", "GDP_vol", "GDP_index", "GDP_growth"]
df_combed = backward_fill_quarterly_data(df_combed, '1996-03', quarterly_columns)
df_combed = backward_fill_quarterly_data(df_combed, '1998-03', "EUR_CZK")
df_combed

In [None]:
#Divide the data set into time series in percentages and time series as an index with 2015=100
list_index=["CPI_index","GDP_index","Industry_index","EUR_CZK"]
df_index=subset_df(df_combed, list_index)

list_growth=["EUR_CZK","PRIBOR_12M","PRIBOR_3M","Inflation_yoy","GDP_growth","Industry_growth"]
df_growth=subset_df(df_combed, list_growth)

# Visualisation

In [None]:
#!pip install dash
import pandas as pd
import plotly.express as px

In [None]:
def visualize_ts(df,index_column,graph_name):
  df[index_column] = df.index.astype(str)
  fig = px.line(df, x=index_column, y=df.columns, title=graph_name, labels={index_column: 'Year-Month'})

  for trace in fig.data:
      trace.update(visible='legendonly')

  fig.update_layout(
      updatemenus=[
          dict(
              active=0,
              buttons=list([
                  dict(label='Show All',
                      method='update',
                      args=[{'visible': True},
                            {'title': graph_name}]),
                  dict(label='Hide All',
                      method='update',
                      args=[{'visible': ['legendonly'] * len(df.columns)},
                            {'title': graph_name}])
              ]),
          )
      ]
  )
  return fig

In [None]:
fig_index = visualize_ts(df_index,'year_month', 'Index Time Series Visualization')
fig_index.show()

In [None]:
fig_growth = visualize_ts(df_growth,'year_month', 'Growth Time Series Visualization')
fig_growth.show()

# Forecasting

In [None]:
import pandas as pd
import numpy as np
from statsmodels.tsa.api import VAR
from statsmodels.tsa.stattools import adfuller
import matplotlib.pyplot as plt
%matplotlib inline
plt.style.use('seaborn-darkgrid')
import warnings
warnings.filterwarnings('ignore')

In [None]:
def VAR_forecast(df,info_criteria):
#Choose info_criteria from ic{‘aic’, ‘fpe’, ‘hqic’, ‘bic’, None}
  model = VAR(df)
  results = model.fit(maxlags=8, ic=info_criteria)
  lag_order = results.k_ar
  forecast_values=results.forecast(df.values[-lag_order:], 5)

  VAR_forecast_df = pd.DataFrame(forecast_values, columns=df.columns)
  VAR_forecast_df.index = pd.period_range(start=df.index[-1], periods=6, freq='Q')[1:]
  VAR_forecast_df.columns = VAR_forecast_df.columns.map(lambda x: f"{x}_VAR_forecast")

  return VAR_forecast_df

In [None]:
def data_to_quarterly(df, data_freq):
  df_quarterly=df.copy()
  df_quarterly = df_quarterly.resample(data_freq).mean()
  df_quarterly = df_quarterly.dropna()

  return df_quarterly

In [None]:
#Resample data to quarterly by averaging
df_quarterly = data_to_quarterly(df_growth, "Q")
df_quarterly

In [None]:
var_list = ["GDP_growth","Inflation_yoy","PRIBOR_3M","EUR_CZK"]

# Create a dataframe to save the p-values of the ADF tests
adf_table = pd.DataFrame(index=var_list, columns=['pvalue','Integration_order'])

# Apply the ADF test to the each time series
for ts in var_list:
    adf_table.loc[var_list,'pvalue'] = adfuller(df_quarterly[f'{ts}'],autolag='aic',regression='c')[1]

for ts in var_list:
    if adf_table.loc[ts,'pvalue']<0.05:
      adf_table.loc[ts,'Integration_order'] = 0
    else:
      adf_table.loc[ts,'Integration_order'] = 1
      print("Some of the time series are not stationary. Please difference them.")

adf_table

In [None]:
var_data = subset_df(df_quarterly,var_list)
VAR_forecast_df=VAR_forecast(var_data,"aic")
VAR_forecast_df

In [None]:
#Download CNB forecasts
ID_dict_forecast = {'MGDPGDPXXADJYOYPECCOPQ': 'GDP_growth_CNB_forecast', 'MCPIHINXXNAJYOYPECQ': 'Inflation_yoy_CNB_forecast',"MPIRSIRXXXRATPECNOMQ":"PRIBOR_3M_CNB_forecast" ,'MPERXMEXXXEXRVALNOMQ': 'EUR_CZK_CNB_forecast'}
IDs_forecast= ','.join(ID_dict_forecast.keys())
CNB_forecast = download_data_from_api(my_api_key, IDs_forecast)
if CNB_forecast is not None:
    print(CNB_forecast)