In [None]:
!pip install xlsxwriter
!pip install dataframe-image
!apt install chromium-chromedriver

Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/
Collecting xlsxwriter
  Downloading XlsxWriter-3.0.7-py3-none-any.whl (152 kB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m152.8/152.8 KB[0m [31m4.0 MB/s[0m eta [36m0:00:00[0m
[?25hInstalling collected packages: xlsxwriter
Successfully installed xlsxwriter-3.0.7
Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/
Collecting dataframe-image
  Downloading dataframe_image-0.1.3-py3-none-any.whl (6.6 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m6.6/6.6 MB[0m [31m42.7 MB/s[0m eta [36m0:00:00[0m
Installing collected packages: dataframe-image
Successfully installed dataframe-image-0.1.3
Reading package lists... Done
Building dependency tree       
Reading state information... Done
The following package was automatically installed and is no longer required:
  libnvidia-common-460
Use 'apt autoremov

# **Configuration**
Importing modules, setting up codespace and enabling codespace to use files from google drive.

In [None]:
# modules
import math
from sklearn.metrics import cohen_kappa_score

import copy

import numpy as np

import pandas as pd
import dataframe_image as dfi
from tabulate import tabulate
import xlsxwriter

from datetime import datetime

import matplotlib.pyplot as plt

plt.close("all")

In [None]:
# set up drive
from google.colab import files
from google.colab import drive
drive.mount('/content/drive/', force_remount=True)

Mounted at /content/drive/


In [None]:
#import excel data from slide 'Germany'
data = pd.read_excel('/content/drive/MyDrive/Colab_Notebooks/TPA/data.xlsx', 'Germany')

#dataframe
df = pd.DataFrame(data)

#delete unnecessary variables
df = df.drop(df.columns[[1, 2, 5, 7, 9, 10]], axis=1)

#rename variables for easier use
df = df.rename({'GDP real': 'GDP_real', 
                'Credit-to-GDP (%)': 'Credit_to_GDP', #still in %!
                'Credit real': 'Credit_real', 
                'House prices real': 'House_prices_real'
                }, axis=1)

# **Definition of turning points**

In general, a **potential turning point** is defined as a value x for which 

f(x) > f(x-2),f(x-1),f(x+1),f(x+2).

In this notebook, I will apply the following criteria in the displayed order to extract the real turning points from the potential ones:

0. Peaks or troughs at the first or last two quarters are excluded. 
1.   Peaks and troughs must alternate. If 
there is a violation, e.g., two troughs occur in sequence, the higher of the two is eliminated; if there are two peaks, the lower peak is erased.
2.   A peak must be higher than the previous trough and a trough must be lower than the previous peak.
3. Two quarters is the minimum phase of a cycle, where phase is
peak to trough or trough to peak. If there is a violation, both turningpoints are erased.
4. Five quarters is the minimum length of a cycle, e.g., peak to peak.
5. Peaks at the beginning or the end are excluded if they are lower than the initial or last values. Similarly, troughs at the beginning or end are excluded if they are higher than the initial or last values.

based on: 

P. Hiebert et al. / Contrasting financial and business cycles: Stylized facts and candidate explanations / Journal of Financial Stability 38 (2018) 72–80

# **Turningpoints: Subfunctions**

## Functions for finding potential turningpoints

Functions calculating potential peaks (throughs). These functions scim the whole series for potential turningpoints. The functions do not care if 2 or more peaks (throughs) are next to each other. Peaks (throughs) in the first and last 2 quarters are, by definition of the functions, automatically excluded. Peaks (throughs) at the end that are lower (higher) than the last 2 values are, by definition of the function, also automatically excluded.

In [None]:
# function for finding potential peaks in an array arr

def pot_peaks(arr, p):
    
    # define an empty array
    peaks = np.array([], dtype='int')
    
    
    # define loop variables
    l1 = range(2,len(arr)-2)
    l2 = np.array([l for l in range(-p,p+1) if l != 0])
    
    # append if bigger than next 2 values left and right
    for i in l1:

        for j in l2:

            if arr[i] < arr[i+j]:
                break

            if j == p:
                peaks = np.append(peaks, i)

    return peaks 

In [None]:
# function for finding potential peaks in an array arr

def pot_throughs(arr, p):
    
    # define empty array
    through = np.array([], dtype='int')
    
    # define loop variables
    l1 = range(2,len(arr)-2)
    l2 = np.array([l for l in range(-p,p+1) if l != 0])
    
    # append if smaller than next 2 values left and right
    for i in l1:

        for j in l2:

            if arr[i] > arr[i+j]:
                break

            if j == p:
                through = np.append(through, i)

    return through 

In [None]:
# function combining both peaks and throughs in a shared list with according labels

def combine(arr1, arr2):
    
    # combine peaks and throughs into one list
    tp = sorted(list(arr1) + list(arr2))
    return [["peak", i] if i in arr1 else ["through", i] for i in tp]

In [None]:
# all-in-one function

def pot_turningpoints(arr, p):
    
    tp = combine(pot_peaks(arr, p), pot_throughs(arr, p))
    print(f"set of potential turningpoints: {tp}")
    print(f"total number of potential turningpoints: {len(tp)}")
    
    return tp

## Function for checking if turningpoints alternate

Function checking for alternation between peaks and throughs. If 2 or more turningpoints of the same kind are neighbors, the lowest peak (highest through) is dropped until peaks and throughs alternate.

In [None]:
def check_alternate(tp, arr):

    tpc = copy.deepcopy(tp)
    i = 0 
              
    while i < len(tpc)-1: 

        # if neighbors are peaks, remove lower peak
        if tpc[i][0] == tpc[i+1][0] == "peak":
            
            if arr[tpc[i][1]] > arr[tpc[i+1][1]]:
                print(f"check_alternate removed: {tpc[i+1]}")
                tpc.remove(tpc[i+1])
                i += 1

            else:
                print(f"check_alternate removed: {tpc[i]}")
                tpc.remove(tpc[i])
                

        # if neighbors are throughs, remove higher through
        elif tpc[i][0] == tpc[i+1][0] == "through":
            
            if arr[tpc[i][1]] > arr[tpc[i+1][1]]:
                print(f"check_alternate removed: {tpc[i+1]}")
                tpc.remove(tpc[i+1])
                i += 1

            else:
                print(f"check_alternate removed: {tpc[i]}")
                tpc.remove(tpc[i])
                
        # if neighbors are not the same type, simply go to next one
        else:
            i += 1

    # if after iteration, tp and tpc are the same, change condition for next step        
    if tp == tpc:
        c = 2

    else:
        c = 1
    
    tp = tpc
    
    return tp, c

## Function for checking for required y-axis differences

Function for checking if peak is higher than previous trough and if trough is lower than previous peak. If this requirement is not met, the turningpoint is removed.

In [None]:
def check_high_low(tp, arr):
    
    # copy list
    tpc = copy.deepcopy(tp)
    
    # initialize loop variable
    i = 1

    while i < len(tpc):
        
        # if peak is lower than through before, remove it
        if tpc[i][0] == "peak":  
            
            if arr[tpc[i][1]] < arr[tpc[i-1][1]]:
                print(f"check_high_low removed: {tpc[i]}")
                tpc.remove(tpc[i])
                break

            else:
                i += 1

        # if through is higher than peak before it, remove it
        else: # tpc[i][0] == "through":
            
            if arr[tpc[i][1]] > arr[tpc[i-1][1]]:
                print(f"check_high_low removed: {tpc[i]}")
                tpc.remove(tpc[i])
                break

            else:
                i += 1

    # if after iteration, tp and tpc are the same, change condition for next step           
    if tp == tpc:
        c = 3

    else:
        c = 1

    tp = tpc
    return tp, c

## Function for checking for required phase length

Function for checking if the minimum phase length, either peak-to-through or through-to-peak, are kept. If this is not the case, both turningpoints in violation are erased.

In [None]:
def check_phases(tp, p):

    # copy list
    tpc = copy.deepcopy(tp)
    
    # initialize loop variable
    i = 0

    while i < len(tpc)-1:
  
        # if phase length is too short, remove both turningpoints
        if tpc[i+1][1] - tpc[i][1] < p:
            print(f"check_phases removed: {tpc[i]} and {tpc[i+1]}")
            del tpc[i:i+2]

        # else go to next
        else:
            i += 1

    # if after iteration, tp and tpc are the same, change condition for next step
    if tp == tpc:
        c = 4

    else:
        c = 1

    tp = tpc
    return tp, c

## Function for checking for required cycle length

Function for checking if minimum length of cycle q for 2 turningpoints of the same kind in quarters is given. If either the number of quarters to the next left or right turningpoint of the same kind is not the minimum required quantity, the y-axis values of the 2 turningpoints in question are compared. For a peak (through), the one with the lower (higher) y-axis value is erased.

In [None]:
def check_length(tp, arr, q):

    # copy list
    tpc = copy.deepcopy(tp)
    
    # initialize loop variable
    i = 0

    while i < len(tpc)-3: # third-to-last element

        # if required cycle length is kept, go to next
        if tpc[i+2][1] - tpc[i][1] >= q:
            i += 1

        
        # if cycl length between 2 peaks is too short
        elif tpc[i][0] == "peak" and tpc[i+2][1] - tpc[i][1] < q:

            # remove whichever peak is lower
            if arr[tp[i][1]] < arr[tp[i+2][1]]:
                tpc.remove(tpc[i])
            
            else:
                tpc.remove(tpc[i+2])
                
            break

        # if cycle length between 2 throughs is too short
        elif tpc[i][0] == "through" and tpc[i+2][1] - tpc[i][1] < q:

            # remove whichever through is higher
            if arr[tp[i][1]] > arr[tp[i+2][1]]:
                tpc.remove(tpc[i])

            else:
                tpc.remove(tpc[i+2])

            break

    # if after iteration, tp and tpc are the same, change condition for next step
    if tp == tpc:
        c = 5

    else:
        c = 1

    tp = tpc
    return tp, c   

## Function for checking for first and last turningpoints

If the first peak or last peak (through) is lower (higher) than the initial value, it is erased.

In [None]:
def check_first_last(tp, arr):

    tpc = copy.deepcopy(tp)
    
    # first peak
    if tpc[0][0] == "peak" and arr[tpc[0][1]] < arr[0]:
        tpc.remove(tpc[0])

    # first through
    elif tpc[0][0] == "through" and arr[tpc[0][1]] > arr[0]:
        tpc.remove(tpc[0])

    # last peak
    if tpc[len(tpc)-1][0] == "peak" and arr[tpc[len(tpc)-1][1]] < arr[len(arr)-1]:
        tpc.remove(tpc[len(tpc)-1])

    # last through
    elif tpc[len(tpc)-1][0] == "through" and arr[tpc[len(tpc)-1][1]] > arr[len(arr)-1]:
        tpc.remove(tpc[len(tpc)-1])

    print(f"check_first_last removed {[el for el in tp if el not in tpc]}")
    
    
    # if after iteration, tp and tpc are the same, change condition for next step
    if tp == tpc:
        c = 6

    else:
        c = 5

    tp = tpc
    return tp, c

## Full Program

In [None]:
# v: timeseries for variable in which turningpoints shall be found
# p: phaselength, default is set to 2
# l: cyclelength, default is set to 5

def find_tp(variable, phase=2, cycle=5):
    
    arr = np.array([i for i in variable])
    tp = pot_turningpoints(arr, phase)
    c = 1

    while True:

        if c == 1:
            tp, c = check_alternate(tp, arr)

        elif c == 2:
            tp, c = check_high_low(tp, arr)

        elif c == 3:
            tp, c = check_phases(tp, phase)

        elif c == 4:
            tp, c = check_length(tp, arr, cycle)

        elif c == 5:
            tp, c = check_first_last(tp, arr)

        else:
            break   

    print(f"final set of turningpoints: {tp}")
    print(f"total number of turningpoints: {len(tp)}")
    return tp, arr

# **Phases**

## Function for defining set of phases

In [None]:
# works

def phase_set(tp):

    # all phases
    all = [["PT", (tp[i][1], tp[i+1][1])] if tp[i][0] == "peak" 
          else ["TP", (tp[i][1], tp[i+1][1])] 
          for i in range(len(tp)-1)]

    # PT
    PT = [all[i] for i in range(len(all)) 
          if all[i][0] == "PT"]
    print(f'PT phases: {PT}')

    # TP
    TP = [all[i] for i in range(len(all)) 
          if all[i][0] == "TP"]
    print(f'TP phases: {TP}')
    return all, PT, TP

## Function for calculating PT-, TP- & overall phase length

In [None]:
# for everything

def calc_len(liste):

    # list of lengths of phases
    len_liste = np.array(
        [liste[i][1][1] - liste[i][1][0] for i in range(len(liste))], 
        dtype='int')
    
    # average phase length
    avg_len = np.sum(len_liste) / len(len_liste)

    return len_liste, avg_len

## Function for calculating amplitude of phases

In [None]:
# for everything

def calc_amp(liste, arr):
    
    # liste amplitudes
    amp_liste = np.array([abs(arr[liste[i][1][1]] - arr[liste[i][1][0]]) 
                         for i in range(len(liste))], 
                         dtype='float')
    
    # overall average amplitude
    avg_amp = np.sum(amp_liste) / len(amp_liste)

    return amp_liste, avg_amp

## Functions for creating information sets and tables

### Everything non-average 

In [None]:
def inf(liste, len_liste, amp_liste, date):
    
    # information about phase/cycle type, beginning, end, length and amplitude 
    info = [[liste[i][0], date[liste[i][1][0]], date[liste[i][1][1]], float(len_liste[i]/4), float(round(amp_liste[i], 3))] 
            for i in range(len(liste))]

    return info

In [None]:
def tab(info, col_names=None):

    if col_names == None:
        col_names = ["Type", "Beginning", "End", "Duration", "Amplitude"]

    col_names = np.array(col_names, dtype='str')

    table = pd.DataFrame(data=info, columns=col_names)

    print(tabulate(table, headers=col_names, tablefmt='fancy_grid'), '\n')

    title = input("name to save: ")
    
    if title == None:
        title = "Table"

    print('\n')

    table.name = title

    return table

### Average

In [None]:
def inf_avg(avg_PT_l, avg_amp_PT, avg_TP_l, avg_amp_TP, avg_all_l, avg_amp_all, date):

    # information about average phase length and amplitude for PT, TP, overall
    info_avg = [["PT", round(float(avg_PT_l)/4,2), avg_amp_PT], ["TP", round(float(avg_TP_l)/4,2), avg_amp_TP],
                ["Overall", round(float(avg_all_l)/4,2), avg_amp_all]]

    return info_avg

In [None]:
def tab_avg(info_avg, col_names_avg=None):
    
    if col_names_avg == None:
        col_names_avg = ["Type", "Average Length", "Average Amplitude"]

    col_names_avg = np.array(col_names_avg, dtype='str')
    
    table_avg = pd.DataFrame(data=info_avg, columns=col_names_avg)

    print(tabulate(table_avg, headers=col_names_avg, tablefmt='fancy_grid'), '\n')

    title = input("name to save: ")
    
    if title == None:
        title = "Summary Table"

    print('\n')

    table_avg.name = title
    
    return table_avg

### Export as .xslx & .png

In [None]:
def export(table_list):
    
    title = input("Name of file to save: ")

    folder = input("Path to save tables to (don't add '/' at the end): \n")

    with pd.ExcelWriter(f'{folder}/{title}.xlsx', engine='xlsxwriter') as writer:
        
        for table in table_list:
            # as excel sheet
            table.to_excel(writer, sheet_name=f'{table.name}', float_format='%.2f', index=False)
            
            # as png
            save_to = f"{folder}/{table.name}.png"
            dfi.export(table, save_to)

## Full function: phases

In [None]:
# works

def phases(tp, arr, date):

    # for a better looking date
    date = date.dt.strftime('%d.%m.%Y')
    
    # create set of phases
    all, PT, TP = phase_set(tp)

    # calculating length lists and average lengths
    len_all, avg_len_all = calc_len(all)
    len_PT, avg_len_PT = calc_len(PT)
    len_TP, avg_len_TP = calc_len(TP)

    # calculating amplitudes and average amplitudes
    amp_all, avg_amp_all = calc_amp(all, arr)
    amp_PT, avg_amp_PT = calc_amp(PT, arr)
    amp_TP, avg_amp_TP = calc_amp(TP, arr)

    # generating information 
    inf_all = inf(all, len_all, amp_all, date)
    inf_PT = inf(PT, len_PT, amp_PT, date)
    inf_TP = inf(TP, len_TP, amp_TP, date)

    # tabulating
    headings = ["Type", "Beginning", "End", "Duration in Years", "Amplitude in billion $US"]
    
    tab_all = tab(inf_all, headings)
    tab_PT = tab(inf_PT, headings)
    tab_TP = tab(inf_TP, headings)

    # average information and tabulation
    heading = ["Phase Type", "Average Duration in Years", "Average Amplitude in billion $US"]
    
    info_avg = inf_avg(avg_len_PT, avg_amp_PT, avg_len_TP, avg_amp_TP, 
                       avg_len_all, avg_amp_all, date)
    table_avg = tab_avg(info_avg, heading)

    # export
    table_list = [tab_all, tab_PT, tab_TP, table_avg]
    export(table_list)

    return tab_all, tab_PT, tab_TP, table_avg

# **Cycles**

## Function for defining set of cycles

In [None]:
def cycle_set(tp):

    # all cycles
    all = [["PP", (tp[i][1], tp[i+2][1])] if tp[i][0] == "peak" 
          else ["TT", (tp[i][1], tp[i+2][1])] 
          for i in range(len(tp)-2)]

    # PP
    PP = [all[i] for i in range(len(all)) 
          if all[i][0] == "PP"]
    print(f'PP cycles: {PP}')
    
    # TP
    TT = [all[i] for i in range(len(all)) 
          if all[i][0] == "TT"]
    print(f'TT cycles: {TT}')

    return all, PP, TT

## Function for calculating PP-, TT- & overall cycle length

Sames as for phases

## Function for calculating amplitude of cycles

Same as for phases

## Function for creating cycle information set

In [None]:
def inf_avg_cyc(avg_PP_l, avg_amp_PP, avg_TT_l, avg_amp_TT, avg_all_l, avg_amp_all, date):

    # information about average phase length and amplitude for PT, TP, overall
    info_avg = [["PP", round(float(avg_PP_l)/4,2), avg_amp_PP], ["TT", round(float(avg_TT_l)/4,2), avg_amp_TT],
                ["Overall", round(float(avg_all_l)/4,2), avg_amp_all]]

    return info_avg

## Function for tabulation

Same as phases

## Full function: cycles

In [None]:
# works

def cycles(tp, arr, date):

    # for a better looking date
    date = date.dt.strftime('%d.%m.%Y')
    
    # create set of phases
    all, PP, TT = cycle_set(tp)

    # calculating length lists and average lengths
    len_all, avg_len_all = calc_len(all)
    len_PP, avg_len_PP = calc_len(PP)
    len_TT, avg_len_TT = calc_len(TT)

    # calculating amplitudes and average amplitudes
    amp_all, avg_amp_all = calc_amp(all, arr)
    amp_PP, avg_amp_PP = calc_amp(PP, arr)
    amp_TT, avg_amp_TT = calc_amp(TT, arr)

    # generating information 
    inf_PP = inf(PP, len_PP, amp_PP, date)
    inf_TT = inf(TT, len_TT, amp_TT, date)

    # tabulating
    headings = ["Type", "Beginning", "End", "Duration in Years", "Amplitude in billion $US"]
    
    #tab_all = tab(inf_all, headings)
    tab_PP = tab(inf_PP, headings)
    tab_TT = tab(inf_TT, headings)

    # average information and tabulation
    heading = ["Type", "Average Length in Years", "Average Amplitude in billion $US"]
    
    info_avg = inf_avg_cyc(avg_len_PP, avg_amp_PP, avg_len_TT, avg_amp_TT, 
                       avg_len_all, avg_amp_all, date)
    table_avg = tab_avg(info_avg, heading)

    # export
    table_list = [tab_PP, tab_TT, table_avg]
    export(table_list)

    return tab_PP, tab_TT, table_avg

# **Full Program: Turning Point Algorithm**
Finding turningpoints, tabulating phases and cycles

In [None]:
def tpa(variable, date, phase=2, cycle=5):
    
    variable = variable
    date = date
    
    tp, arr = find_tp(variable, phase, cycle)
    tab_all, tab_PT, tab_TP, table_avg_phases = phases(tp, arr, date)
    tab_PP, tab_TT, table_avg_cycles = cycles(tp, arr, date)

    return tab_all, tab_PT, tab_TP, table_avg_phases, tab_PP, tab_TT, table_avg_cycles, tp, arr

# **Plots**

## Boxplots

In [None]:
def boxplot(dataframes):
    
    folder = input("Path to save boxplots to (don't add '/' at the end): \n")

    #additional= {'dpi':200, 'clear':True}
    for dataframe in dataframes:
        
        dataframe.plot(kind='box', subplots=True, title=dataframe.name, figsize=(7,5))
        plt.suptitle(dataframe.name, fontsize=14)
        # , fontweight='bold'
        
        save_to = f"{folder}/{dataframe.name}.png"
        plt.savefig(save_to, transparent=False)

    plt.close('all')

    return

## Line Graphs

In [None]:
def lineplot(date, variable, title, tp, arr, x_name, y_name):
    
    folder = input("Path to save lineplot to (don't add '/' at the end): \n")
    
    date = date.dt.strftime('%d.%m.%Y')
    
    x_tix = np.array([tp[i][1] for i in range(len(tp))])
    
    f = plt.figure()
    f.set_figwidth(15)
    f.set_figheight(10)

    ax = plt.axes()
    
    plt.plot(range(len(date)), variable, marker=None)
    
    plt.title(title, fontsize=15)
    
    plt.xlabel(x_name)
    plt.xticks(x_tix, fontsize=8, rotation=90)
    ax.set_xticklabels(np.array([date[tp[i][1]] for i in range(len(tp))]))
    
    plt.ylabel(y_name)
    plt.yticks(fontsize=8)
    
    plt.grid(visible=1, which='major', axis='x')

    save_to = f"{folder}/{title}.png"
    plt.savefig(save_to, transparent=False, dpi=100)

    plt.close('all')

    return

In [None]:
"""plt.rcParams['figure.dpi'] = 300
title = "Turning Points of real German GDP from 1950 to 2020"
x_name = "Quarters"
y_name = "real GDP in billion $US"
lineplot(df.Date, df.GDP_real, title, tp, gdp_real, x_name, y_name)"""

'plt.rcParams[\'figure.dpi\'] = 300\ntitle = "Turning Points of real German GDP from 1950 to 2020"\nx_name = "Quarters"\ny_name = "real GDP in billion $US"\nlineplot(df.Date, df.GDP_real, title, tp, gdp_real, x_name, y_name)'

# **Business Cycle**

Here we use our predefined functions to analyze business cycles. For analysis we use **real GDP** as a variable. The below program extracts all the needed information and exports it for us as excel files.

In [None]:
if __name__ == '__main__':
    tab_all_phases, tab_PT, tab_TP, tab_avg_phases, tab_PP, tab_TT, tab_avg_cycles, tp_gdp_real, gdp_real = tpa(df.GDP_real, df.Date)
    
    boxplot([tab_PT, tab_TP, tab_PP, tab_TT])
    
    title = "Turning Points of real German GDP from 1950 to 2022"
    x_name = "Date"
    y_name = "real GDP in billion $US"
    lineplot(df.Date, df.GDP_real, title, tp_gdp_real, gdp_real, x_name, y_name)

set of potential turningpoints: [['through', 4], ['peak', 16], ['through', 21], ['peak', 40], ['through', 43], ['peak', 48], ['through', 50], ['through', 57], ['peak', 67], ['through', 68], ['peak', 84], ['through', 86], ['peak', 88], ['through', 93], ['through', 104], ['peak', 121], ['through', 123], ['peak', 124], ['through', 128], ['peak', 130], ['through', 132], ['peak', 137], ['through', 139], ['peak', 152], ['through', 156], ['peak', 166], ['peak', 170], ['through', 172], ['through', 194], ['peak', 196], ['peak', 199], ['through', 201], ['through', 204]]
total number of potential turningpoints: 33
check_alternate removed: ['through', 50]
check_alternate removed: ['through', 93]
check_alternate removed: ['peak', 166]
check_alternate removed: ['through', 172]
check_alternate removed: ['peak', 196]
check_alternate removed: ['through', 201]
check_high_low removed: ['through', 57]
check_alternate removed: ['peak', 48]
check_high_low removed: ['through', 104]
check_alternate removed: [

# **Financial Cycle**
For analysis we use **credit to GDP** in % as well as **real credit** and **real house prices**

## **Credit to GDP**

In [None]:
if __name__ == '__main__':
    tab_all_phases, tab_PT, tab_TP, tab_avg_phases, tab_PP, tab_TT, tab_avg_cycles, tp_credit_to_gdp, credit_to_gdp = tpa(df.Credit_to_GDP, df.Date)

    boxplot([tab_PT, tab_TP, tab_PP, tab_TT])
    
    title = "Turning Points of German Credit to GDP from 1950 to 2022"
    x_name = "Date"
    y_name = "Credit to GDP in %"
    lineplot(df.Date, df.Credit_to_GDP, title, tp_credit_to_gdp, credit_to_gdp, x_name, y_name)

## **Real Credit**

In [None]:
if __name__ == '__main__':
    tab_all_phases, tab_PT, tab_TP, tab_avg_phases, tab_PP, tab_TT, tab_avg_cycles, tp_credit_real, credit_real = tpa(df.Credit_real, df.Date)

    boxplot([tab_PT, tab_TP, tab_PP, tab_TT])
    
    title = "Turning Points of real German Credit from 1950 to 2022"
    x_name = "Date"
    y_name = "Real Credit in billion $US"
    lineplot(df.Date, df.Credit_real, title, tp_credit_real, credit_real, x_name, y_name)

## **Real house prices**

In [None]:
if __name__ == '__main__':
    tab_all_phases, tab_PT, tab_TP, tab_avg_phases, tab_PP, tab_TT, tab_avg_cycles, tp_house_prices_real, house_prices_real = tpa(df.House_prices_real, df.Date)

    boxplot([tab_PT, tab_TP, tab_PP, tab_TT])
    
    title = "Turning Points of real German House Prices from 1950 to 2022"
    x_name = "Date"
    y_name = "Real House Prices in billion $US"
    lineplot(df.Date, df.House_prices_real, title, tp_house_prices_real, house_prices_real, x_name, y_name)

# **Measurement of Concordance**

In [None]:
def calc_conc_index(business, financial):

    if len(business) < len(financial): 
        i_max = len(business)
    else: 
        i_max = len(financial)

    q_in_same_phase = 0
    counter = 0

    for i in range(i_max):
        
        for j in business[i][0]:
            if j in financial[i][0]:
                q_in_same_phase += 1
            counter += 1

        for j in business[i][1]:
            if j in financial[i][1]:
                q_in_same_phase += 1
            counter += 1

    conc_ind = q_in_same_phase / counter
    return conc_ind

## GDP real

In [None]:
# gdp real
PT_phases_gdp = [['PT', (16, 21)], ['PT', (40, 43)], ['PT', (84, 86)], ['PT', (130, 132)], ['PT', (137, 139)], ['PT', (152, 156)], ['PT', (199, 204)]]
TP_phases_gdp = [['TP', (21, 40)], ['TP', (43, 84)], ['TP', (86, 130)], ['TP', (132, 137)], ['TP', (139, 152)], ['TP', (156, 199)]]

In [None]:
PP_cycles_gdp = [['PP', (16, 40)], ['PP', (40, 84)], ['PP', (84, 130)], ['PP', (130, 137)], ['PP', (137, 152)], ['PP', (152, 199)]]
TT_cycles_gdp = [['TT', (21, 43)], ['TT', (43, 86)], ['TT', (86, 132)], ['TT', (132, 139)], ['TT', (139, 156)]]

In [None]:
PP_set_gdp = [([i for i in range(16,21)], [i for i in range(21,40)]), ([i for i in range(40,43)], [i for i in range(43,84)]), ([i for i in range(84,86)], [i for i in range(86,130)]), ([i for i in range(130,132)], [i for i in range(132,137)]), ([i for i in range(137,139)], [i for i in range(139, 152)]), ([i for i in range(152,156)], [i for i in range(156,199)])]
TT_set_gdp = [([i for i in range(21,40)], [i for i in range(40,43)]), ([i for i in range(43,84)], [i for i in range(84,86)]), ([i for i in range(86,130)], [i for i in range(130,132)]), ([i for i in range(132,137)], [i for i in range(137,139)]), ([i for i in range(139,152)], [i for i in range(152,156)]), ([i for i in range(156,199)], [i for i in range(199,204)])]

## Credit real

In [None]:
# credit to gdp
PT_phases_credit = [['PT', (23, 28)], ['PT', (59, 69)], ['PT', (126, 128)], ['PT', (133, 140)], ['PT', (145, 153)], ['PT', (159, 165)], ['PT', (174, 179)], ['PT', (188, 191)], ['PT', (197, 199)]]
TP_phases_credit = [['TP', (28, 59)], ['TP', (69, 126)], ['TP', (128, 133)], ['TP', (140, 145)], ['TP', (153, 159)], ['TP', (165, 174)], ['TP', (179, 188)], ['TP', (191, 197)], ['TP', (199, 204)]]

In [None]:
PP_cycles_credit = [['PP', (23, 59)], ['PP', (59, 126)], ['PP', (126, 133)], ['PP', (133, 145)], ['PP', (145, 159)], ['PP', (159, 174)], ['PP', (174, 188)], ['PP', (188, 197)]]
TT_cycles_credit = [['TT', (28, 69)], ['TT', (69, 128)], ['TT', (128, 140)], ['TT', (140, 153)], ['TT', (153, 165)], ['TT', (165, 179)], ['TT', (179, 191)], ['TT', (191, 199)]]

In [None]:
PP_set_credit = [([i for i in range(23,28)], [i for i in range(28,59)]), ([i for i in range(59,69)], [i for i in range(69,126)]), ([i for i in range(126,128)], [i for i in range(128,133)]), ([i for i in range(133,140)], [i for i in range(140,145)]), ([i for i in range(145,153)], [i for i in range(153,159)]), ([i for i in range(159,165)], [i for i in range(165,174)]), ([i for i in range(174,179)], [i for i in range(179,188)]), ([i for i in range(188,191)], [i for i in range(191,197)]), ([i for i in range(197,199)], [i for i in range(199,204)])]
TT_set_credit = [([i for i in range(28,59)], [i for i in range(59,69)]), ([i for i in range(69,126)], [i for i in range(126,128)]), ([i for i in range(128,133)], [i for i in range(133,140)]), ([i for i in range(140,145)], [i for i in range(145,153)]), ([i for i in range(153,159)], [i for i in range(159,165)]), ([i for i in range(165,174)], [i for i in range(174,179)]), ([i for i in range(179,188)], [i for i in range(188,191)]), ([i for i in range(191,197)], [i for i in range(197,199)])]

In [None]:
PT phases: [['PT', (19, 22)], ['PT', (133, 143)], ['PT', (146, 160)], ['PT', (162, 172)], ['PT', (177, 179)]]

SyntaxError: ignored

In [None]:
conc_ind_credit = calc_conc_index(PP_set_gdp, PP_set_credit)
print(conc_ind_credit)

0.20765027322404372


## real credit

In [None]:
# real credit
PT_phases = [['PT', (19, 22)], ['PT', (133, 143)], ['PT', (146, 160)], ['PT', (162, 172)], ['PT', (177, 179)]]
TP_phases = [['TP', (22, 133)], ['TP', (143, 146)], ['TP', (160, 162)], ['TP', (172, 177)], ['TP', (179, 203)]]

In [None]:
PP_set_gdp_to_credit = [([i for i in range(19,22)], [i for i in range(22,133)]),([i for i in range(133,143)], [i for i in range(143,146)]),([i for i in range(146,160)], [i for i in range(160,162)]),([i for i in range(162,172)], [i for i in range(172,177)]),([i for i in range(177,179)], [i for i in range(179,203)]),]

In [None]:
conc_ind_credit_to_gdp = calc_conc_index(PP_set_gdp, PP_set_gdp_to_credit)
print(conc_ind_credit_to_gdp)

0.14705882352941177


## Real house prices

In [None]:
PT_phases = [['PT', (8, 15)], ['PT', (17, 25)], ['PT', (45, 61)], ['PT', (65, 70)], ['PT', (75, 77)], ['PT', (85, 87)], ['PT', (98, 114)], ['PT', (116, 144)], ['PT', (146, 160)], ['PT', (163, 165)]]
TP_phases = [['TP', (15, 17)], ['TP', (25, 45)], ['TP', (61, 65)], ['TP', (70, 75)], ['TP', (77, 85)], ['TP', (87, 98)], ['TP', (114, 116)], ['TP', (144, 146)], ['TP', (160, 163)]]

In [None]:
PP_set_house = [([i for i in range(8,15)], [i for i in range(15,17)]),([i for i in range(17,25)], [i for i in range(25,45)]),([i for i in range(45,61)], [i for i in range(61,65)]),([i for i in range(65,70)], [i for i in range(70,75)]),([i for i in range(75,77)], [i for i in range(77,85)]),([i for i in range(85,87)], [i for i in range(87,98)]),([i for i in range(98,114)], [i for i in range(114,116)]),([i for i in range(116,144)], [i for i in range(144,146)]),([i for i in range(146,160)], [i for i in range(160,163)])]

In [None]:
conc_ind_house = calc_conc_index(PP_set_gdp, PP_set_house)
print(conc_ind_house)

0.01092896174863388


In [None]:
# concordance_index PP
if len(PP_set_gdp) < len(PP_set_credit): i_max = len(PP_set_gdp)
else: i_max = len(PP_set_credit)

q_in_same_phase = 0
counter = 0

for i in range(i_max):
    
    for j in PP_set_gdp[i][0]:
        if j in PP_set_credit[i][0]:
            q_in_same_phase += 1
        counter += 1

    for j in PP_set_gdp[i][1]:
        if j in PP_set_credit[i][1]:
            q_in_same_phase += 1
        counter += 1

conc_ind = q_in_same_phase / counter
print(conc_ind)


0.20765027322404372


In [None]:
# concordance_index PP
if len(TT_set_gdp) < len(TT_set_credit): i_max = len(TT_set_gdp)
else: i_max = len(TT_set_credit)

q_in_same_phase = 0
counter = 0

for i in range(i_max):
    
    for j in TT_set_gdp[i][0]:
        if j in TT_set_credit[i][0]:
            q_in_same_phase += 1
        counter += 1

    for j in TT_set_gdp[i][1]:
        if j in TT_set_credit[i][1]:
            q_in_same_phase += 1
        counter += 1

conc_ind = q_in_same_phase / counter
print(conc_ind)

NameError: ignored