In [1]:
import pandas as pd
import numpy as np
from statsmodels.tsa.api import VAR
from statsmodels.tsa.vector_ar.svar_model import SVAR
import matplotlib.pyplot as plt

# Step 1: Load the data
data = pd.read_excel('DATA.xlsx')


# Step 2: Define variables based on nchoice
nchoice = 2  # 1 for employment, 2 for hours
if nchoice == 1:
    nx = data['LHEM']
    labor = 'employment'
elif nchoice == 2:
    nx = data['LPMHU']
    labor = 'hours'

# Step 3: Compute variables
yx = data['GDPQ']
xx = yx / nx

# Create index numbers starting at 100
y = 100 + 100 * np.log(yx / yx.iloc[0])
n = 100 + 100 * np.log(nx / nx.iloc[0])
x = 100 + 100 * np.log(xx / xx.iloc[0])

# Compute first differences
dy = y.diff()
dn = n.diff()
dx = x.diff()

# Step 4: Handle the transformation of 'n' based on nint and difn
nint = 1  # 1 if 'n' is I(1), 0 if 'n' is I(0)
difn = 'yes'  # 'yes' to use first differences, 'no' to use detrended series

if nint == 0:
    # Detrend 'n' by regressing on a constant and trend
    trend = np.arange(len(n))
    n_trend = pd.DataFrame({'n': n, 'trend': trend})
    coeffs = np.polyfit(trend[~n.isna()], n[~n.isna()], 1)
    n_trend_line = np.polyval(coeffs, trend)
    n_detrended = n - n_trend_line
    dnz = n_detrended
else:
    # Use first differences
    dnz = dn

# If difn is 'no' and nint is 0, use the detrended 'n' for 'dn'
if nint == 0 and difn == 'no':
    dn = n_detrended



In [2]:

# Ensure that all series are aligned and drop missing values
data_var = pd.DataFrame({'dx': dx, 'dnz': dnz}).dropna()

# Step 5: Estimate the reduced-form VAR
model = VAR(data_var)
results = model.fit(4)  # Using 4 lags as in the RATS code

# Step 6: Identify the structural VAR using long-run restrictions
# The long-run impact of the second shock on 'dx' is restricted to zero
# That is, only technology shocks have permanent effects on productivity

# The long-run restriction matrix (A) should have the form:
# [[a11, 0],
#  [a21, a22]]
# We set the (1,2) element to zero to impose the long-run restriction

from statsmodels.tsa.vector_ar.irf import IRAnalysis

# Compute the long-run effects matrix from the reduced-form VAR
# First, get the companion form
companion_matrix = results.coefs_companion
identity = np.eye(companion_matrix.shape[0])

# Compute the long-run matrix
long_run_matrix = np.linalg.inv(identity - companion_matrix)[:2, :2]

# Impose the long-run restriction that the second shock does not affect 'dx' in the long run
# This restriction implies that the (1,2) element of the cumulative impulse response is zero

# Now, we set up the matrices needed for identification
B = np.array([[np.nan, 0],
              [np.nan, np.nan]])

# We can estimate the SVAR with the long-run restriction using statsmodels
svar_model = SVAR(data_var, svar_type='longrun', A=B)
svar_results = svar_model.fit(4)

# Step 7: Obtain the structural MA representation (Impulse Responses)
nsteps = 40  # Number of periods for impulse responses
irf = svar_results.irf(nsteps)

# Get the impulse response functions
irfs = irf.irfs  # Shape: (nsteps+1, nvars, nvars)

# Step 8: Compute the MA coefficients (Cumulative Impulse Responses)
# Since we need the cumulative sum for levels
cum_irfs = np.cumsum(irfs, axis=0)

# Step 9: Compute conditional variances and covariances using MA coefficients
# For productivity ('dx') and hours ('dnz')

# Extract MA coefficients for 'dx' and 'dnz' responses to each shock
# Let's denote:
# - C_11: response of 'dx' to technology shock
# - C_12: response of 'dx' to non-technology shock
# - C_21: response of 'dnz' to technology shock
# - C_22: response of 'dnz' to non-technology shock

C_11 = irfs[:, 0, 0]  # 'dx' response to tech shock
C_12 = irfs[:, 0, 1]  # 'dx' response to non-tech shock
C_21 = irfs[:, 1, 0]  # 'dnz' response to tech shock
C_22 = irfs[:, 1, 1]  # 'dnz' response to non-tech shock

# Compute variances of 'dx' and 'dnz' for each component
# Variance due to technology shock
var_dx_tech = np.sum(C_11 ** 2)
var_dnz_tech = np.sum(C_21 ** 2)
cov_dx_dnz_tech = np.sum(C_11 * C_21)

# Variance due to non-technology shock
var_dx_nontech = np.sum(C_12 ** 2)
var_dnz_nontech = np.sum(C_22 ** 2)
cov_dx_dnz_nontech = np.sum(C_12 * C_22)

# Compute conditional correlations
corr_tech = cov_dx_dnz_tech / np.sqrt(var_dx_tech * var_dnz_tech)
corr_nontech = cov_dx_dnz_nontech / np.sqrt(var_dx_nontech * var_dnz_nontech)

print("Conditional Correlation (Technology Shock):", corr_tech)
print("Conditional Correlation (Non-Technology Shock):", corr_nontech)

# Step 10: Generate Figure 1 - Scatter plots of innovations
# Obtain structural shocks (innovations)
epsilon = svar_results.resid

# Note: Since the shocks are already estimated, we can use them directly



  self._init_dates(dates, freq)


AttributeError: 'VARResults' object has no attribute 'coefs_companion'

In [None]:
# Generate scatter plots of the structural shocks
plt.figure(figsize=(12, 8))

# Technology shock component
plt.subplot(2, 1, 1)
plt.scatter(epsilon.iloc[:, 0], epsilon.iloc[:, 1], alpha=0.5)
plt.title('Structural Innovations')
plt.xlabel('Technology Shock')
plt.ylabel('Non-Technology Shock')

# Scatter plot of contributions to 'dx' and 'dnz' due to technology shock
# Compute contributions at each point in time using MA coefficients
dx_tech = svar_results.fittedvalues['dx']
dnz_tech = svar_results.fittedvalues['dnz']

# Since we cannot directly get the contributions from statsmodels, we can approximate
# For illustration purposes, we can plot the innovations
plt.subplot(2, 1, 2)
plt.scatter(epsilon.iloc[:, 0], epsilon.iloc[:, 1], alpha=0.5)
plt.title('Contributions of Shocks to Variables')
plt.xlabel('Tech Shock Contribution to dx')
plt.ylabel('Tech Shock Contribution to dnz')

plt.tight_layout()
plt.show()

# Step 11: Generate Figure 2 - Impulse Responses
# Plot the impulse responses using the MA coefficients

# Impulse Response of 'dx' to Technology and Non-Technology Shocks
plt.figure(figsize=(12, 6))
plt.plot(irfs[:, 0, 0], label='dx response to Technology Shock')
plt.plot(irfs[:, 0, 1], label='dx response to Non-Technology Shock')
plt.title('Impulse Responses of dx')
plt.xlabel('Periods')
plt.ylabel('Response')
plt.legend()
plt.show()

# Impulse Response of 'dnz' to Technology and Non-Technology Shocks
plt.figure(figsize=(12, 6))
plt.plot(irfs[:, 1, 0], label='dnz response to Technology Shock')
plt.plot(irfs[:, 1, 1], label='dnz response to Non-Technology Shock')
plt.title('Impulse Responses of dnz')
plt.xlabel('Periods')
plt.ylabel('Response')
plt.legend()
plt.show()

# Step 12: Generate Figure 3 - Cumulative Impulse Responses
# Plot cumulative impulse responses to see long-run effects

# Cumulative Impulse Response of 'dx'
plt.figure(figsize=(12, 6))
plt.plot(cum_irfs[:, 0, 0], label='Cumulative dx response to Technology Shock')
plt.plot(cum_irfs[:, 0, 1], label='Cumulative dx response to Non-Technology Shock')
plt.title('Cumulative Impulse Responses of dx')
plt.xlabel('Periods')
plt.ylabel('Cumulative Response')
plt.legend()
plt.show()

# Cumulative Impulse Response of 'dnz'
plt.figure(figsize=(12, 6))
plt.plot(cum_irfs[:, 1, 0], label='Cumulative dnz response to Technology Shock')
plt.plot(cum_irfs[:, 1, 1], label='Cumulative dnz response to Non-Technology Shock')
plt.title('Cumulative Impulse Responses of dnz')
plt.xlabel('Periods')
plt.ylabel('Cumulative Response')
plt.legend()
plt.show()

In [None]:
def historical_decomposition(svar_results, nsteps):
    # Number of observations and variables
    nobs = svar_results.nobs
    nvars = svar_results.neqs

    # Initialize arrays to store contributions
    contributions = np.zeros((nobs + nsteps, nvars, nvars))  # [time, variable, shock]

    # Get structural shocks
    shocks = svar_results.resid.values

    # Get impulse responses
    irfs = svar_results.irf(nsteps).irfs  # Shape: (nsteps+1, nvars, nvars)

    # Loop over time
    for t in range(nobs):
        for i in range(nsteps + 1):
            if t + i < nobs + nsteps:
                contributions[t + i, :, :] += np.outer(irfs[i], shocks[t])

    return contributions

# Use the function to compute contributions
contributions = historical_decomposition(svar_results, nsteps=40)

# Sum contributions over shocks to get the total
total_contributions = contributions.sum(axis=2)

# Plot the contributions of technology shock to 'dx'
dx_tech_contribution = contributions[:, 0, 0]  # Variable 'dx', shock 'technology'

plt.figure(figsize=(12, 6))
plt.plot(data_var.index, dx[4:], label='Observed dx')
plt.plot(data_var.index, dx_tech_contribution[4:nobs + 4], label='Technology Shock Contribution to dx')
plt.title('Historical Decomposition of dx')
plt.xlabel('Time')
plt.ylabel('dx')
plt.legend()
plt.show()