In [1]:
!pip install tslearn

Collecting tslearn
  Downloading tslearn-0.6.3-py3-none-any.whl.metadata (14 kB)
Downloading tslearn-0.6.3-py3-none-any.whl (374 kB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m374.4/374.4 kB[0m [31m6.1 MB/s[0m eta [36m0:00:00[0m
[?25hInstalling collected packages: tslearn
Successfully installed tslearn-0.6.3


### Import Data and Modules

In [46]:
import pandas as pd
import pandas as pd
import numpy as np
import os

import plotly.graph_objects as go

In [47]:
from google.colab import drive
drive.mount('/content/drive')

path = '/content/drive/MyDrive/Leipzig/Divergence_Indicator_2_0/Data_Final'

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).


In [48]:
excel_data = pd.ExcelFile(path+'/Country_Cycle_Data/Cycle_Data.xlsx')

In [49]:
# Get the country names (sheet names)
sheet_names = excel_data.sheet_names

resampled_series_data_fc = {}
resampled_series_data_bc = {}
resampled_series_data_fisc = {}

# Find the global time range for quarterly dates
global_start_date = min(pd.to_datetime(dates.iloc[0]) for dates in [pd.to_datetime(pd.read_excel(excel_data, sheet_name=sheet).iloc[:, 0]) for sheet in sheet_names])
global_end_date = max(pd.to_datetime(dates.iloc[-1]) for dates in [pd.to_datetime(pd.read_excel(excel_data, sheet_name=sheet).iloc[:, 0]) for sheet in sheet_names])

Load GDP data for weighting

In [50]:
global_dates_gdp = pd.date_range(global_start_date, global_end_date, freq='QS')

# Load GDP data for each country
gdp_data = {}
for sheet in sheet_names:
    data_gdp = pd.read_excel(path + '/business_cycle_data_final_ERMcut.xlsx', sheet_name=sheet)
    country_dates_gdp = pd.to_datetime(data_gdp.iloc[:, 0])
    gdp_values = data_gdp['GDP'].values

    # Convert GDP data to a pandas Series indexed by the country's dates
    gdp_series = pd.Series(data=gdp_values, index=country_dates_gdp)

    # Reindex to align with global quarterly dates
    gdp_data[sheet] = gdp_series.reindex(global_dates_gdp, method='ffill')

### Financial Cycle

In [51]:
# Define the global quarterly dates based on the common time range
global_dates_fc = pd.date_range(global_start_date, global_end_date, freq='QS')

# Resample each country's data to align with global quarterly dates
for sheet in sheet_names:
    # Load the data for the current sheet (country)
    data_fc = pd.read_excel(excel_data, sheet_name=sheet)

    # Extract the dates and FCycle columns
    country_dates_fc = pd.to_datetime(data_fc.iloc[:, 0])  # Date column
    fcycle_values = data_fc['FCycle'].values  # FCycle column

    # Convert FCycle values to a pandas Series indexed by country-specific dates
    country_series_fc = pd.Series(data=fcycle_values, index=country_dates_fc)

    # Reindex to align with the global quarterly dates, forward filling to handle missing data
    resampled_series_fc = country_series_fc.reindex(global_dates_fc, method='ffill')

    # Store the resampled series in the dictionary
    resampled_series_data_fc[sheet] = resampled_series_fc.values

# Convert the resampled data into a matrix for easier processing
num_countries_fc = len(sheet_names)
num_quarters_fc = len(global_dates_fc)
time_series_matrix_fc = np.vstack([resampled_series_data_fc[sheet] for sheet in sheet_names])

In [52]:
from tslearn.metrics import dtw

# Initialize matrix to store mean DTW-based asymmetry for each country at each time point
pairwise_asymmetry_matrix_fc = np.full((num_countries_fc, num_quarters_fc), np.nan)

for country_idx_fc in range(num_countries_fc):
    country_start_idx_fc = np.where(~np.isnan(time_series_matrix_fc[country_idx_fc, :]))[0][0]  # First valid data point

    for t in range(country_start_idx_fc, num_quarters_fc):
        # Extract the time series up to time t for the current country
        ts_country_fc = time_series_matrix_fc[country_idx_fc, country_start_idx_fc:t + 1]
        ts_country_fc = ts_country_fc[~np.isnan(ts_country_fc)]  # Remove NaNs

        # Compute DTW distances to each other country at time t and take the time-specific GDP-weighted mean
        weighted_dtw_distances_fc = []
        weights_fc = []
        for other_country_idx_fc in range(num_countries_fc):
            if other_country_idx_fc != country_idx_fc:
                # Extract the other country's time series up to time t and remove NaNs
                ts_other_country_fc = time_series_matrix_fc[other_country_idx_fc, country_start_idx_fc:t + 1]
                ts_other_country_fc = ts_other_country_fc[~np.isnan(ts_other_country_fc)]

                # Only calculate DTW if both series are non-empty
                if len(ts_country_fc) > 0 and len(ts_other_country_fc) > 0:
                    dtw_distance_fc = dtw(ts_country_fc, ts_other_country_fc)
                    weighted_dtw_distances_fc.append(dtw_distance_fc)

                    # Add log GDP weight for this time point
                    raw_gdp = gdp_data[sheet_names[other_country_idx_fc]].iloc[t]
                    log_gdp_weight = np.log(raw_gdp)
                    weights_fc.append(log_gdp_weight)

        # Normalize weights to sum to 1 for this time point
        if weights_fc:
            normalized_weights_fc = np.array(weights_fc) / np.sum(weights_fc)

            # Store the GDP-weighted mean DTW distance for asymmetry calculation
            pairwise_asymmetry_matrix_fc[country_idx_fc, t] = np.average(weighted_dtw_distances_fc, weights=normalized_weights_fc)

In [53]:
# Find the first non-NaN entry for each country in the asymmetry_matrix
first_data_indices_fc = [
    np.where(~np.isnan(pairwise_asymmetry_matrix_fc[country_idx_fc, :]))[0][0]
    for country_idx_fc in range(num_countries_fc)
]

# Sort countries by their first available DTW date
sorted_indices_fc = np.argsort(first_data_indices_fc)

# Reorder the asymmetry matrix and country names for plotting based on sorted indices
sorted_asymmetry_matrix_fc = pairwise_asymmetry_matrix_fc[sorted_indices_fc]
sorted_sheet_names_fc = [sheet_names[idx_f] for idx_f in sorted_indices_fc]

asymmetry_matrix_plot_fc = np.nan_to_num(sorted_asymmetry_matrix_fc, nan=0)
asymmetry_matrix_plot_fc = np.ma.masked_equal(asymmetry_matrix_plot_fc, 0)

# Convert dates to years for labeling on the x-axis
year_labels_fc = [d.year for d in global_dates_fc]  # Extract only the year for each quarter
xaxis_ticks_fc = np.arange(0, num_quarters_fc, 10)
xaxis_labels_fc = [year_labels_fc[i] for i in xaxis_ticks_fc]

# Set up for 3D surface plot with sorted data
x_axis_fc = np.arange(num_quarters_fc)  # Time on the x-axis
y_axis_fc = sorted_sheet_names_fc  # Use sorted country names on the y-axis

# Create the meshgrid for the 3D surface plot
x_grid_fc, y_grid_fc = np.meshgrid(np.arange(num_quarters_fc), np.arange(num_countries_fc))

In [54]:
# Create the 3D surface plot using Plotly
darker_purples = [
    [0.0, 'rgb(188, 189, 220)'],
    [0.2, 'rgb(158, 154, 200)'],
    [0.4, 'rgb(128, 125, 186)'],
    [0.6, 'rgb(106, 81, 163)'],
    [0.8, 'rgb(92, 53, 153)'],
    [1.0, 'rgb(74, 20, 134)']
]

fig = go.Figure(data=[go.Surface(z=sorted_asymmetry_matrix_fc,
                                 x=x_grid_fc, y=y_grid_fc,
                                 colorscale=darker_purples,
                                 colorbar=dict(title='Divergence (DTW Distance)'),
                                 )])

# Customize layout
fig.update_layout(
    title=None,
    scene=dict(xaxis_title=None,
        xaxis=dict(tickvals=xaxis_ticks_fc, ticktext=xaxis_labels_fc),
        yaxis_title=None,
        yaxis=dict(tickvals=np.arange(num_countries_fc), ticktext=y_axis_fc),
        zaxis_title='DTW Divergence'
    ),
    autosize=True,
    width=1200,
    height=800
)


# Show the plot
fig.show()

### Business Cycle

In [55]:
# Define the global quarterly dates based on the common time range
global_dates_bc = pd.date_range(global_start_date, global_end_date, freq='QS')

# Resample each country's data to align with global quarterly dates
for sheet in sheet_names:
    # Load the data for the current sheet (country)
    data_bc = pd.read_excel(excel_data, sheet_name=sheet)

    # Extract the dates and FCycle columns
    country_dates_bc = pd.to_datetime(data_bc.iloc[:, 0])  # Date column
    bcycle_values = data_bc['BCycle'].values  # FCycle column

    # Convert FCycle values to a pandas Series indexed by country-specific dates
    country_series_bc = pd.Series(data=bcycle_values, index=country_dates_bc)

    # Reindex to align with the global quarterly dates, forward filling to handle missing data
    resampled_series_bc = country_series_bc.reindex(global_dates_bc, method='ffill')

    # Store the resampled series in the dictionary
    resampled_series_data_bc[sheet] = resampled_series_bc.values

# Convert the resampled data into a matrix for easier processing
num_countries_bc = len(sheet_names)
num_quarters_bc = len(global_dates_bc)
time_series_matrix_bc = np.vstack([resampled_series_data_bc[sheet] for sheet in sheet_names])

In [56]:
from tslearn.metrics import dtw

# Initialize matrix to store mean DTW-based asymmetry for each country at each time point
pairwise_asymmetry_matrix_bc = np.full((num_countries_bc, num_quarters_bc), np.nan)

for country_idx_bc in range(num_countries_bc):
    country_start_idx_bc = np.where(~np.isnan(time_series_matrix_bc[country_idx_bc, :]))[0][0]  # First valid data point

    for t in range(country_start_idx_bc, num_quarters_bc):
        # Extract the time series up to time t for the current country
        ts_country_bc = time_series_matrix_bc[country_idx_bc, country_start_idx_bc:t + 1]
        ts_country_bc = ts_country_bc[~np.isnan(ts_country_bc)]  # Remove NaNs

        # Compute DTW distances to each other country at time t and take the mean
        # Compute DTW distances to each other country at time t and take the time-specific GDP-weighted mean
        weighted_dtw_distances_bc = []
        weights_bc = []
        for other_country_idx_bc in range(num_countries_bc):
            if other_country_idx_bc != country_idx_bc:
                # Extract the other country's time series up to time t and remove NaNs
                ts_other_country_bc = time_series_matrix_bc[other_country_idx_bc, country_start_idx_bc:t + 1]
                ts_other_country_bc = ts_other_country_bc[~np.isnan(ts_other_country_bc)]

                # Only calculate DTW if both series are non-empty
                if len(ts_country_bc) > 0 and len(ts_other_country_bc) > 0:
                    dtw_distance_bc = dtw(ts_country_bc, ts_other_country_bc)
                    weighted_dtw_distances_bc.append(dtw_distance_bc)

                    # Add GDP weight for this time point
                    raw_gdp = gdp_data[sheet_names[other_country_idx_bc]].iloc[t]
                    log_gdp_weight = np.log(raw_gdp)
                    weights_bc.append(log_gdp_weight)


        # Normalize weights to sum to 1 for this time point
        if weights_bc:
            normalized_weights_bc = np.array(weights_bc) / np.sum(weights_bc)

            # Store the GDP-weighted mean DTW distance for asymmetry calculation
            pairwise_asymmetry_matrix_bc[country_idx_bc, t] = np.average(weighted_dtw_distances_bc, weights=normalized_weights_bc)

In [57]:
# Find the first non-NaN entry for each country in the asymmetry_matrix
first_data_indices_bc = [
    np.where(~np.isnan(pairwise_asymmetry_matrix_bc[country_idx_bc, :]))[0][0]
    for country_idx_bc in range(num_countries_bc)
]

# Sort countries by their first available DTW date
sorted_indices_bc = np.argsort(first_data_indices_bc)

# Reorder the asymmetry matrix and country names for plotting based on sorted indices
sorted_asymmetry_matrix_bc = pairwise_asymmetry_matrix_bc[sorted_indices_bc]
sorted_sheet_names_bc = [sheet_names[idx_b] for idx_b in sorted_indices_bc]

asymmetry_matrix_plot_bc = np.nan_to_num(sorted_asymmetry_matrix_bc, nan=0)
asymmetry_matrix_plot_bc = np.ma.masked_equal(asymmetry_matrix_plot_bc, 0)

# Convert dates to years for labeling on the x-axis
year_labels_bc = [d.year for d in global_dates_bc]  # Extract only the year for each quarter
xaxis_ticks_bc = np.arange(0, num_quarters_bc, 10)
xaxis_labels_bc = [year_labels_bc[i] for i in xaxis_ticks_bc]

# Set up for 3D surface plot with sorted data
x_axis_bc = np.arange(num_quarters_bc)  # Time on the x-axis
y_axis_bc = sorted_sheet_names_bc  # Use sorted country names on the y-axis

# Create the meshgrid for the 3D surface plot
x_grid_bc, y_grid_bc = np.meshgrid(np.arange(num_quarters_bc), np.arange(num_countries_bc))

In [58]:
darker_blues = [
    [0.0, 'rgb(158, 202, 225)'],
    [0.2, 'rgb(107, 174, 214)'],
    [0.4, 'rgb(66, 146, 198)'],
    [0.6, 'rgb(33, 113, 181)'],
    [0.8, 'rgb(8, 69, 148)'],
    [1.0, 'rgb(8, 48, 107)']
]

# Create the 3D surface plot using Plotly
fig = go.Figure(data=[go.Surface(z=sorted_asymmetry_matrix_bc,
                                 x=x_grid_bc, y=y_grid_bc,
                                 colorscale=darker_blues,
                                 colorbar=dict(title='Divergence (DTW Distance)'),
                                 )])

# Customize layout
fig.update_layout(
    title=None,
    scene=dict(xaxis_title=None,
        xaxis=dict(tickvals=xaxis_ticks_bc, ticktext=xaxis_labels_bc),
        yaxis_title=None,
        yaxis=dict(tickvals=np.arange(num_countries_bc), ticktext=y_axis_bc),
        zaxis_title='DTW Divergence'
    ),
    autosize=True,
    width=1200,
    height=800
)


# Show the plot
fig.show()

### Fiscal Cycle

In [59]:
# Define the global quarterly dates based on the common time range
global_dates_fisc = pd.date_range(global_start_date, global_end_date, freq='QS')

# Resample each country's data to align with global quarterly dates
for sheet in sheet_names:
    # Load the data for the current sheet (country)
    data_fisc = pd.read_excel(excel_data, sheet_name=sheet)

    # Extract the dates and FCycle columns
    country_dates_fisc = pd.to_datetime(data_fisc.iloc[:, 0])  # Date column
    fisccycle_values = data_fisc['FiscCycle'].values  # FCycle column

    # Convert FCycle values to a pandas Series indexed by country-specific dates
    country_series_fisc = pd.Series(data=fisccycle_values, index=country_dates_fisc)

    # Reindex to align with the global quarterly dates, forward filling to handle missing data
    resampled_series_fisc = country_series_fisc.reindex(global_dates_fisc, method='ffill')

    # Store the resampled series in the dictionary
    resampled_series_data_fisc[sheet] = resampled_series_fisc.values

# Convert the resampled data into a matrix for easier processing
num_countries_fisc = len(sheet_names)
num_quarters_fisc = len(global_dates_fisc)
time_series_matrix_fisc = np.vstack([resampled_series_data_fisc[sheet] for sheet in sheet_names])

In [60]:
from tslearn.metrics import dtw

# Initialize matrix to store mean DTW-based asymmetry for each country at each time point
pairwise_asymmetry_matrix_fisc = np.full((num_countries_fisc, num_quarters_fisc), np.nan)

for country_idx_fisc in range(num_countries_fisc):
    country_start_idx_fisc = np.where(~np.isnan(time_series_matrix_fisc[country_idx_fisc, :]))[0][0]  # First valid data point

    for t in range(country_start_idx_fisc, num_quarters_fisc):
        # Extract the time series up to time t for the current country
        ts_country_fisc = time_series_matrix_fisc[country_idx_fisc, country_start_idx_fisc:t + 1]
        ts_country_fisc = ts_country_fisc[~np.isnan(ts_country_fisc)]  # Remove NaNs

        # Compute DTW distances to each other country at time t and take the mean
        # Compute DTW distances to each other country at time t and take the time-specific GDP-weighted mean
        weighted_dtw_distances_fisc = []
        weights_fisc = []
        for other_country_idx_fisc in range(num_countries_fisc):
            if other_country_idx_fisc != country_idx_fisc:
                # Extract the other country's time series up to time t and remove NaNs
                ts_other_country_fisc = time_series_matrix_fisc[other_country_idx_fisc, country_start_idx_fisc:t + 1]
                ts_other_country_fisc = ts_other_country_fisc[~np.isnan(ts_other_country_fisc)]

                # Only calculate DTW if both series are non-empty
                if len(ts_country_fisc) > 0 and len(ts_other_country_fisc) > 0:
                    dtw_distance_fisc = dtw(ts_country_fisc, ts_other_country_fisc)
                    weighted_dtw_distances_fisc.append(dtw_distance_fisc)

                    # Add GDP weight for this time point
                    raw_gdp = gdp_data[sheet_names[other_country_idx_fisc]].iloc[t]
                    log_gdp_weight = np.log(raw_gdp)
                    weights_fisc.append(log_gdp_weight)


        # Normalize weights to sum to 1 for this time point
        if weights_fisc:
            normalized_weights_fisc = np.array(weights_fisc) / np.sum(weights_fisc)

            # Store the GDP-weighted mean DTW distance for asymmetry calculation
            pairwise_asymmetry_matrix_fisc[country_idx_fisc, t] = np.average(weighted_dtw_distances_fisc, weights=normalized_weights_fisc)

In [61]:
# Find the first non-NaN entry for each country in the asymmetry_matrix
first_data_indices_fisc = [
    np.where(~np.isnan(pairwise_asymmetry_matrix_fisc[country_idx_fisc, :]))[0][0]
    for country_idx_fisc in range(num_countries_fisc)
]

# Sort countries by their first available DTW date
sorted_indices_fisc = np.argsort(first_data_indices_fisc)

# Reorder the asymmetry matrix and country names for plotting based on sorted indices
sorted_asymmetry_matrix_fisc = pairwise_asymmetry_matrix_fisc[sorted_indices_fisc]
sorted_sheet_names_fisc = [sheet_names[idx_fisc] for idx_fisc in sorted_indices_fisc]

asymmetry_matrix_plot_fisc = np.nan_to_num(sorted_asymmetry_matrix_fisc, nan=0)
asymmetry_matrix_plot_fisc = np.ma.masked_equal(asymmetry_matrix_plot_fisc, 0)

# Convert dates to years for labeling on the x-axis
year_labels_fisc = [d.year for d in global_dates_fisc]  # Extract only the year for each quarter
xaxis_ticks_fisc = np.arange(0, num_quarters_bc, 10)
xaxis_labels_fisc = [year_labels_fisc[i] for i in xaxis_ticks_fisc]

# Set up for 3D surface plot with sorted data
x_axis_fisc = np.arange(num_quarters_fisc)  # Time on the x-axis
y_axis_fisc = sorted_sheet_names_fisc  # Use sorted country names on the y-axis

# Create the meshgrid for the 3D surface plot
x_grid_fisc, y_grid_fisc = np.meshgrid(np.arange(num_quarters_fisc), np.arange(num_countries_fisc))

In [62]:
teal_tones = [
    [0.0, 'rgb(178, 223, 232)'],  # Light airy teal
    [0.2, 'rgb(126, 203, 219)'],  # Bright sky teal
    [0.4, 'rgb(76, 179, 200)'],   # Balanced medium teal
    [0.6, 'rgb(45, 154, 176)'],   # Rich aquatic teal
    [0.8, 'rgb(20, 128, 150)'],   # Deep ocean teal
    [1.0, 'rgb(10, 96, 119)']     # Dark slate teal
]

# Create the 3D surface plot using Plotly
fig = go.Figure(data=[go.Surface(z=sorted_asymmetry_matrix_fisc,
                                 x=x_grid_fisc, y=y_grid_fisc,
                                 colorscale=teal_tones,
                                 colorbar=dict(title='Divergence (DTW Distance)'),
                                 )])

# Customize layout
fig.update_layout(
    title=None,
    scene=dict(xaxis_title=None,
        xaxis=dict(tickvals=xaxis_ticks_fisc, ticktext=xaxis_labels_fisc),
        yaxis_title=None,
        yaxis=dict(tickvals=np.arange(num_countries_fisc), ticktext=y_axis_fisc),
        zaxis_title='DTW Divergence'
    ),
    autosize=True,
    width=1200,
    height=800
)


# Show the plot
fig.show()

### Divergence Index

In [63]:
from scipy.stats import gmean

index_fc = np.nanmean(pairwise_asymmetry_matrix_fc, axis=0)
index_bc = np.nanmean(pairwise_asymmetry_matrix_bc, axis=0)
index_fisc = np.nanmean(pairwise_asymmetry_matrix_fisc, axis=0)

divergence_index = gmean([index_fc, index_bc, index_fisc], axis=0)

In [65]:
# Set up the time axis (in quarterly dates)
time_axis = global_dates_bc

# Create the 2D line plot with Plotly
fig = go.Figure()

# Fiscal Cycle Divergence
fig.add_trace(go.Scatter(
    x=time_axis,
    y=index_fc,
    mode='lines',
    line=dict(color='rgb(106, 81, 163)', dash='dot'),
    opacity=0.3,
    name='Financial Divergence',
))

# Business Cycle Divergence
fig.add_trace(go.Scatter(
    y=index_bc,
    x=time_axis,
    mode='lines',
    line=dict(color='rgb(33, 113, 181)', dash='dot'),
    opacity=0.3,
    name='Business Divergence'
))


# Fiscal Cycle Divergence
fig.add_trace(go.Scatter(
    y=index_fisc,
    x=time_axis,
    mode='lines',
    line=dict(color='rgb(45, 154, 176)', dash='dot'),
    opacity=0.3,
    name='Fiscal Divergence',
))

# Mean Divergence
fig.add_trace(go.Scatter(
    y=divergence_index,
    x=time_axis,
    mode='lines',
    line=dict(color='rgb(0, 0, 0)'),
    name='Divergence Index',
))


# Customize the layout
fig.update_layout(
    legend=dict(
        x=0.95,
        y=0.05,
        xanchor="right",
        yanchor="bottom",
    ),
    title=None,
    xaxis_title=None,
    width=800,
    height=600,
    yaxis_title='Mean DTW Distance',
    xaxis=dict(tickformat='%Y'),  # Format x-axis as years
    template='plotly_white'
)

# Show the plot
fig.show()

### Cycles

### Old code

In [None]:
# Reference time series: the mean FCycle across all countries (ignoring NaNs)
reference_series_bc = np.nanmean(time_series_matrix_bc, axis=0)

# Initialize matrix to store DTW-based divergence
asymmetry_matrix_bc = np.full((num_countries_bc, num_quarters_bc), np.nan)

In [None]:

# Compute DTW distances between each country's FCycle and the reference time series
# Only compute from the point where the country has data, using a monotonic constraint
for country_idx_bc in range(num_countries_bc):
    country_start_idx_bc = np.where(~np.isnan(time_series_matrix_bc[country_idx_bc, :]))[0][0]  # Find first valid data point

    for t in range(country_start_idx_bc, num_quarters_bc):  # Start from the country's first valid data point
        # Extract the time series up to time t (ignoring NaNs)
        ts_country_bc = time_series_matrix_bc[country_idx_bc, country_start_idx_bc:t + 1]
        ts_ref_bc = reference_series_bc[country_start_idx_bc:t + 1]

        if len(ts_country_bc) > 0 and len(ts_ref_bc) > 0:
            # Compute DTW distance between country and reference series at each time point
            dtw_distance_bc = dtw(ts_country_bc, ts_ref_bc)
            asymmetry_matrix_bc[country_idx_bc, t] = dtw_distance_bc

Code for peaks and valleys

In [None]:
import numpy as np
from scipy.signal import find_peaks
from tslearn.metrics import dtw_path

def identify_peaks_valleys(time_series):
    # Find peaks and valleys
    peaks, _ = find_peaks(time_series)
    valleys, _ = find_peaks(-time_series)  # invert series to find valleys

    # Create a mask: peak (1), valley (-1), or regular point (0)
    peak_valley_mask = np.zeros(len(time_series))
    peak_valley_mask[peaks] = 1
    peak_valley_mask[valleys] = -1
    return peak_valley_mask

def custom_dtw_distance(x, y):
    # Identify peaks and valleys in both series
    mask_x = identify_peaks_valleys(x)
    mask_y = identify_peaks_valleys(y)

    # Initialize the custom distance matrix
    distance_matrix = np.zeros((len(x), len(y)))

    # Populate the distance matrix with custom penalties
    for i in range(len(x)):
        for j in range(len(y)):
            if mask_x[i] == mask_y[j]:  # Peak-to-peak or valley-to-valley
                distance_matrix[i, j] = abs(x[i] - y[j])  # Regular distance
            else:
                # Apply a penalty for peak-valley mismatches
                distance_matrix[i, j] = abs(x[i] - y[j]) * 1.5
    return distance_matrix

In [None]:
def custom_dtw(x, y):
    # Generate the custom distance matrix
    distance_matrix = custom_dtw_distance(x, y)

    # Calculate cumulative cost for DTW
    n, m = distance_matrix.shape
    cost = np.full((n + 1, m + 1), np.inf)
    cost[0, 0] = 0

    for i in range(1, n + 1):
        for j in range(1, m + 1):
            min_cost = min(cost[i - 1, j],    # insertion
                           cost[i, j - 1],    # deletion
                           cost[i - 1, j - 1])  # match
            cost[i, j] = distance_matrix[i - 1, j - 1] + min_cost

    return cost[n, m]  # DTW distance at the bottom-right corner

In [None]:

# Initialize matrix to store mean DTW-based asymmetry for each country at each time point
pairwise_asymmetry_matrix_bc = np.full((num_countries_bc, num_quarters_bc), np.nan)

for country_idx_bc in range(num_countries_bc):
    country_start_idx_bc = np.where(~np.isnan(time_series_matrix_bc[country_idx_bc, :]))[0][0]  # First valid data point

    for t in range(country_start_idx_bc, num_quarters_bc):
        # Extract the time series up to time t for the current country
        ts_country_bc = time_series_matrix_bc[country_idx_bc, country_start_idx_bc:t + 1]
        ts_country_bc = ts_country_bc[~np.isnan(ts_country_bc)]  # Remove NaNs

        # Compute DTW distances to each other country at time t and take the mean
        dtw_distances_bc = []
        for other_country_idx_bc in range(num_countries_bc):
            if other_country_idx_bc != country_idx_bc:
                # Extract the other country's time series up to time t and remove NaNs
                ts_other_country_bc = time_series_matrix_bc[other_country_idx_bc, country_start_idx_bc:t + 1]
                ts_other_country_bc = ts_other_country_bc[~np.isnan(ts_other_country_bc)]

                # Only calculate DTW if both series are non-empty
                if len(ts_country_bc) > 0 and len(ts_other_country_bc) > 0:
                    # Use the custom DTW function
                    dtw_distance_bc = custom_dtw(ts_country_bc, ts_other_country_bc)
                    dtw_distances_bc.append(dtw_distance_bc)

        # Store the mean DTW distance for asymmetry calculation
        if dtw_distances_bc:
            pairwise_asymmetry_matrix_bc[country_idx_bc, t] = np.mean(dtw_distances_bc)