In [60]:
from pyspark.sql import SparkSession
import pyspark.sql.functions as F
import pandas as pd

In [42]:
spark = SparkSession.builder \
            .appName("ILO VARNN") \
            .config("spark.sql.catalog.nessie.ref", "feature/fact-labor-market-build") \
            .getOrCreate()

### Load data from Gold layer

In [43]:
dim_time = spark.read.format("iceberg").load("nessie.gold.dim_time")
dim_demographic = spark.read.format("iceberg").load("nessie.gold.dim_demographic")
dim_indicator = spark.read.format("iceberg").load("nessie.gold.dim_indicator")
fact_df = spark.read.format("iceberg").load("nessie.gold.fact_labor_market")

In [44]:
fact_df.show()

+--------------------+--------+--------------------+--------------------+------+
|       fact_labor_id|time_key|     demographic_key|       indicator_key| value|
+--------------------+--------+--------------------+--------------------+------+
|ad39ec38-ccf6-4bb...|20230101|936f4989-0f7d-4c1...|10f106fc-2cf7-431...|72.206|
|d645e339-0fef-4da...|20230101|1d56bcf7-e2f5-4a7...|10f106fc-2cf7-431...|41.287|
|ed4fcc96-1bef-45d...|20230101|5f20bf23-15e2-489...|10f106fc-2cf7-431...|91.985|
|eb654fdf-5835-414...|20230101|b676849e-39e1-4f4...|10f106fc-2cf7-431...|71.201|
|c92cce3f-c01d-493...|20230101|f0b5e490-2a5d-47b...|10f106fc-2cf7-431...|33.094|
|168e9ee3-06e5-405...|20230101|c0b092fc-26d5-4b8...|10f106fc-2cf7-431...|76.652|
|e9b6ed14-8e0a-4cd...|20230101|2d178d29-7e1e-406...|10f106fc-2cf7-431...|42.418|
|8400dd48-77a4-46b...|20230101|f81f8fd6-1633-410...|10f106fc-2cf7-431...|95.211|
|d7dc6f7a-dd33-43c...|20230101|482e69ce-86d9-4ae...|10f106fc-2cf7-431...|78.958|
|1840d69d-7243-49e...|202301

### Prepare data for training

In [45]:
labor_market_df = fact_df \
    .join(dim_time, fact_df.time_key == dim_time.time_key, "inner")
labor_market_df = labor_market_df \
    .join(dim_demographic, fact_df.demographic_key == dim_demographic.demographic_key, "inner")
labor_market_df = labor_market_df \
    .join(dim_indicator, fact_df.indicator_key == dim_indicator.indicator_key, "inner")

labor_market_df = labor_market_df.select(
    'full_date',
    'sex',
    'age_group',
    'indicator_code',
    'value'
)

labor_market_df.show()

+----------+------+---------+--------------+--------+
| full_date|   sex|age_group|indicator_code|   value|
+----------+------+---------+--------------+--------+
|1996-01-01|Female|    Total|     AVG_HOURS|   51.62|
|1996-01-01|  Male|    Total|     AVG_HOURS|   51.98|
|1996-01-01| Total|    Total|     AVG_HOURS|   51.79|
|1997-01-01|Female|    Total|     AVG_HOURS|    44.9|
|1997-01-01|  Male|    Total|     AVG_HOURS|   45.72|
|1997-01-01| Total|    Total|     AVG_HOURS|   45.31|
|1998-01-01|Female|    Total|     AVG_HOURS|   45.93|
|1998-01-01|  Male|    Total|     AVG_HOURS|   46.45|
|1998-01-01| Total|    Total|     AVG_HOURS|   46.19|
|1999-01-01|Female|    Total|     AVG_HOURS|   45.97|
|1999-01-01|  Male|    Total|     AVG_HOURS|    46.4|
|1999-01-01| Total|    Total|     AVG_HOURS|   46.19|
|2000-01-01| Total|    Total|      MIN_WAGE|180000.0|
|2001-01-01| Total|    Total|      MIN_WAGE|180000.0|
|2002-01-01| Total|    Total|      MIN_WAGE|210000.0|
|2003-01-01| Total|    Total

In [46]:
labor_market_df = labor_market_df.filter((F.col('sex') == 'Total') & (F.col('age_group') == 'Total'))
labor_market_df.show()

+----------+-----+---------+--------------+---------+
| full_date|  sex|age_group|indicator_code|    value|
+----------+-----+---------+--------------+---------+
|1996-01-01|Total|    Total|     AVG_HOURS|    51.79|
|1997-01-01|Total|    Total|     AVG_HOURS|    45.31|
|1998-01-01|Total|    Total|     AVG_HOURS|    46.19|
|1999-01-01|Total|    Total|     AVG_HOURS|    46.19|
|2000-01-01|Total|    Total|      MIN_WAGE| 180000.0|
|2001-01-01|Total|    Total|      MIN_WAGE| 180000.0|
|2002-01-01|Total|    Total|      MIN_WAGE| 210000.0|
|2003-01-01|Total|    Total|      MIN_WAGE| 290000.0|
|2004-01-01|Total|    Total|      MIN_WAGE| 290000.0|
|2004-01-01|Total|    Total|       EMP_POP|    69.88|
|2004-01-01|Total|    Total|   YOUTH_UNEMP|     2.14|
|2004-01-01|Total|    Total|  UNEMPLOYMENT|     2.14|
|2004-01-01|Total|    Total|          LFPR|    71.41|
|2005-01-01|Total|    Total|      MIN_WAGE| 350000.0|
|2006-01-01|Total|    Total|      MIN_WAGE| 450000.0|
|2007-01-01|Total|    Total|

In [47]:
labor_market_df.select('indicator_code').distinct().show()

+--------------+
|indicator_code|
+--------------+
|    AVG_INCOME|
|  INFORMAL_EMP|
|      MIN_WAGE|
|       EMP_POP|
|     AVG_HOURS|
|  UNEMPLOYMENT|
|          LFPR|
|   YOUTH_UNEMP|
+--------------+



In [48]:
labor_market_df = labor_market_df.drop('sex', 'age_group')
labor_market_df.show()

+----------+--------------+---------+
| full_date|indicator_code|    value|
+----------+--------------+---------+
|1996-01-01|     AVG_HOURS|    51.79|
|1997-01-01|     AVG_HOURS|    45.31|
|1998-01-01|     AVG_HOURS|    46.19|
|1999-01-01|     AVG_HOURS|    46.19|
|2000-01-01|      MIN_WAGE| 180000.0|
|2001-01-01|      MIN_WAGE| 180000.0|
|2002-01-01|      MIN_WAGE| 210000.0|
|2003-01-01|      MIN_WAGE| 290000.0|
|2004-01-01|      MIN_WAGE| 290000.0|
|2004-01-01|       EMP_POP|    69.88|
|2004-01-01|   YOUTH_UNEMP|     2.14|
|2004-01-01|  UNEMPLOYMENT|     2.14|
|2004-01-01|          LFPR|    71.41|
|2005-01-01|      MIN_WAGE| 350000.0|
|2006-01-01|      MIN_WAGE| 450000.0|
|2007-01-01|      MIN_WAGE| 540000.0|
|2007-01-01|    AVG_INCOME|1433693.0|
|2007-01-01|     AVG_HOURS|    43.99|
|2007-01-01|  INFORMAL_EMP|1433693.0|
|2007-01-01|       EMP_POP|   72.841|
+----------+--------------+---------+
only showing top 20 rows



In [49]:
# pivot the table to wide format
labor_market_df = labor_market_df.groupBy('full_date').pivot('indicator_code').agg(F.first('value'))
labor_market_df.show()

+----------+---------+----------+-------+------------+------+---------+------------+-----------+
| full_date|AVG_HOURS|AVG_INCOME|EMP_POP|INFORMAL_EMP|  LFPR| MIN_WAGE|UNEMPLOYMENT|YOUTH_UNEMP|
+----------+---------+----------+-------+------------+------+---------+------------+-----------+
|2009-01-01|     42.6| 2267610.0| 74.511|   2267610.0|75.828| 650000.0|       1.737|      1.737|
|2023-01-01|    41.82| 7853534.0| 71.018|   7853534.0|72.206|4680000.0|       1.645|      1.645|
|2016-01-01|    41.26| 5456969.0| 75.141|   5456969.0|76.556|2900000.0|       1.848|      1.848|
|1998-01-01|    46.19|      NULL|   NULL|        NULL|  NULL|     NULL|        NULL|       NULL|
|2011-01-01|    42.71| 3112676.0| 75.512|   3112676.0|76.274|1125000.0|       0.999|      0.999|
|2008-01-01|     NULL|      NULL|   NULL|        NULL|  NULL| 540000.0|        NULL|       NULL|
|2017-01-01|    40.31| 5360536.0| 74.701|   5360536.0|76.127|3110000.0|       1.874|      1.874|
|2007-01-01|    43.99| 1433693

In [50]:
labor_market_df = labor_market_df.sort('full_date')
labor_market_df.show(50)

+----------+---------+----------+-------+------------+------+---------+------------+-----------+
| full_date|AVG_HOURS|AVG_INCOME|EMP_POP|INFORMAL_EMP|  LFPR| MIN_WAGE|UNEMPLOYMENT|YOUTH_UNEMP|
+----------+---------+----------+-------+------------+------+---------+------------+-----------+
|1996-01-01|    51.79|      NULL|   NULL|        NULL|  NULL|     NULL|        NULL|       NULL|
|1997-01-01|    45.31|      NULL|   NULL|        NULL|  NULL|     NULL|        NULL|       NULL|
|1998-01-01|    46.19|      NULL|   NULL|        NULL|  NULL|     NULL|        NULL|       NULL|
|1999-01-01|    46.19|      NULL|   NULL|        NULL|  NULL|     NULL|        NULL|       NULL|
|2000-01-01|     NULL|      NULL|   NULL|        NULL|  NULL| 180000.0|        NULL|       NULL|
|2001-01-01|     NULL|      NULL|   NULL|        NULL|  NULL| 180000.0|        NULL|       NULL|
|2002-01-01|     NULL|      NULL|   NULL|        NULL|  NULL| 210000.0|        NULL|       NULL|
|2003-01-01|     NULL|      NU

In [None]:
labor_market_df = labor_market_df.filter((F.col('full_date') >= '2007-01-01') & (F.col('full_date') <= '2023-12-31'))
labor_market_df.show()

+----------+---------+----------+-------+------------+------+---------+------------+-----------+
| full_date|AVG_HOURS|AVG_INCOME|EMP_POP|INFORMAL_EMP|  LFPR| MIN_WAGE|UNEMPLOYMENT|YOUTH_UNEMP|
+----------+---------+----------+-------+------------+------+---------+------------+-----------+
|2007-01-01|    43.99| 1433693.0| 72.841|   1433693.0|74.348| 540000.0|       2.026|      2.026|
|2008-01-01|     NULL|      NULL|   NULL|        NULL|  NULL| 540000.0|        NULL|       NULL|
|2009-01-01|     42.6| 2267610.0| 74.511|   2267610.0|75.828| 650000.0|       1.737|      1.737|
|2010-01-01|     45.0| 2506386.0| 75.321|   2506386.0|76.169| 845000.0|       1.114|      1.114|
|2011-01-01|    42.71| 3112676.0| 75.512|   3112676.0|76.274|1125000.0|       0.999|      0.999|
|2012-01-01|    41.77| 3739062.0| 75.404|   3739062.0|76.187|1665000.0|       1.027|      1.027|
|2013-01-01|    40.94| 4092626.0| 76.008|   4092626.0|77.022|1950000.0|       1.316|      1.316|
|2014-01-01|    40.35| 4461659

In [52]:
labor_pd = labor_market_df.toPandas()
labor_pd = labor_pd.fillna(method='ffill')
labor_pd

Unnamed: 0,full_date,AVG_HOURS,AVG_INCOME,EMP_POP,INFORMAL_EMP,LFPR,MIN_WAGE,UNEMPLOYMENT,YOUTH_UNEMP
0,2007-01-01,43.99,1433693.0,72.841,1433693.0,74.348,540000.0,2.026,2.026
1,2008-01-01,43.99,1433693.0,72.841,1433693.0,74.348,540000.0,2.026,2.026
2,2009-01-01,42.6,2267610.0,74.511,2267610.0,75.828,650000.0,1.737,1.737
3,2010-01-01,45.0,2506386.0,75.321,2506386.0,76.169,845000.0,1.114,1.114
4,2011-01-01,42.71,3112676.0,75.512,3112676.0,76.274,1125000.0,0.999,0.999
5,2012-01-01,41.77,3739062.0,75.404,3739062.0,76.187,1665000.0,1.027,1.027
6,2013-01-01,40.94,4092626.0,76.008,4092626.0,77.022,1950000.0,1.316,1.316
7,2014-01-01,40.35,4461659.0,76.076,4461659.0,77.044,2250000.0,1.256,1.256
8,2015-01-01,40.86,5101540.0,75.771,5101540.0,77.198,2575000.0,1.848,1.848
9,2016-01-01,41.26,5456969.0,75.141,5456969.0,76.556,2900000.0,1.848,1.848


In [54]:
labor_pd = labor_pd.drop(columns=['full_date']) 
labor_pd

Unnamed: 0,AVG_HOURS,AVG_INCOME,EMP_POP,INFORMAL_EMP,LFPR,MIN_WAGE,UNEMPLOYMENT,YOUTH_UNEMP
0,43.99,1433693.0,72.841,1433693.0,74.348,540000.0,2.026,2.026
1,43.99,1433693.0,72.841,1433693.0,74.348,540000.0,2.026,2.026
2,42.6,2267610.0,74.511,2267610.0,75.828,650000.0,1.737,1.737
3,45.0,2506386.0,75.321,2506386.0,76.169,845000.0,1.114,1.114
4,42.71,3112676.0,75.512,3112676.0,76.274,1125000.0,0.999,0.999
5,41.77,3739062.0,75.404,3739062.0,76.187,1665000.0,1.027,1.027
6,40.94,4092626.0,76.008,4092626.0,77.022,1950000.0,1.316,1.316
7,40.35,4461659.0,76.076,4461659.0,77.044,2250000.0,1.256,1.256
8,40.86,5101540.0,75.771,5101540.0,77.198,2575000.0,1.848,1.848
9,41.26,5456969.0,75.141,5456969.0,76.556,2900000.0,1.848,1.848


In [57]:
# Split data into train and test sets
train_size = int(len(labor_pd) * 0.8)   
train_data = labor_pd.iloc[:train_size]
test_data = labor_pd.iloc[train_size:]
print(f'Train data shape: {train_data.shape}')
print(f'Test data shape: {test_data.shape}')

Train data shape: (13, 8)
Test data shape: (4, 8)


In [58]:
train_data

Unnamed: 0,AVG_HOURS,AVG_INCOME,EMP_POP,INFORMAL_EMP,LFPR,MIN_WAGE,UNEMPLOYMENT,YOUTH_UNEMP
0,43.99,1433693.0,72.841,1433693.0,74.348,540000.0,2.026,2.026
1,43.99,1433693.0,72.841,1433693.0,74.348,540000.0,2.026,2.026
2,42.6,2267610.0,74.511,2267610.0,75.828,650000.0,1.737,1.737
3,45.0,2506386.0,75.321,2506386.0,76.169,845000.0,1.114,1.114
4,42.71,3112676.0,75.512,3112676.0,76.274,1125000.0,0.999,0.999
5,41.77,3739062.0,75.404,3739062.0,76.187,1665000.0,1.027,1.027
6,40.94,4092626.0,76.008,4092626.0,77.022,1950000.0,1.316,1.316
7,40.35,4461659.0,76.076,4461659.0,77.044,2250000.0,1.256,1.256
8,40.86,5101540.0,75.771,5101540.0,77.198,2575000.0,1.848,1.848
9,41.26,5456969.0,75.141,5456969.0,76.556,2900000.0,1.848,1.848


In [59]:
test_data

Unnamed: 0,AVG_HOURS,AVG_INCOME,EMP_POP,INFORMAL_EMP,LFPR,MIN_WAGE,UNEMPLOYMENT,YOUTH_UNEMP
13,40.77,6873960.0,71.742,6873960.0,73.283,3675000.0,2.103,2.103
14,41.38,6746802.0,71.2,6746802.0,72.938,3675000.0,2.383,2.383
15,41.62,7460916.0,72.471,7460916.0,73.627,3895500.0,1.57,1.57
16,41.82,7853534.0,71.018,7853534.0,72.206,4680000.0,1.645,1.645


### Scaling data

In [61]:
import numpy as np
import matplotlib.pyplot as plt
import seaborn as sns
import time
import warnings
warnings.filterwarnings('ignore')

import statsmodels.api as sm
from statsmodels.tsa.stattools import adfuller, grangercausalitytests
from statsmodels.tsa.api import VAR
from sklearn.preprocessing import MinMaxScaler, StandardScaler
from sklearn.metrics import mean_squared_error, mean_absolute_error, r2_score
from sklearn.model_selection import train_test_split, ParameterGrid
from tqdm.notebook import tqdm_notebook
from typing import Union

In [64]:
# Scaled function
def scale_data(df: pd.DataFrame):
    scaled_df = df.copy()
    for name in df.columns:
        scaler = MinMaxScaler(feature_range=(-1, 1))
        scaled_df[name] = scaler.fit_transform(df[name].values.reshape(-1, 1)).ravel()
    return scaled_df

In [65]:
scaled_train = scale_data(train_data)
scaled_test = scale_data(test_data)

In [66]:
scaled_train

Unnamed: 0,AVG_HOURS,AVG_INCOME,EMP_POP,INFORMAL_EMP,LFPR,MIN_WAGE,UNEMPLOYMENT,YOUTH_UNEMP
0,0.572034,-1.0,-1.0,-1.0,-1.0,-1.0,1.0,1.0
1,0.572034,-1.0,-1.0,-1.0,-1.0,-1.0,1.0,1.0
2,-0.016949,-0.679983,0.032457,-0.679983,0.038596,-0.92517,0.437196,0.437196
3,1.0,-0.588352,0.53323,-0.588352,0.277895,-0.792517,-0.776047,-0.776047
4,0.029661,-0.355687,0.651314,-0.355687,0.351579,-0.602041,-1.0,-1.0
5,-0.368644,-0.11531,0.584544,-0.11531,0.290526,-0.234694,-0.945472,-0.945472
6,-0.720339,0.020371,0.95796,0.020371,0.876491,-0.040816,-0.382668,-0.382668
7,-0.970339,0.161988,1.0,0.161988,0.89193,0.163265,-0.499513,-0.499513
8,-0.754237,0.407544,0.811437,0.407544,1.0,0.384354,0.653359,0.653359
9,-0.584746,0.543941,0.421947,0.543941,0.549474,0.605442,0.653359,0.653359


In [67]:
scaled_test

Unnamed: 0,AVG_HOURS,AVG_INCOME,EMP_POP,INFORMAL_EMP,LFPR,MIN_WAGE,UNEMPLOYMENT,YOUTH_UNEMP
13,-1.0,-0.77021,-0.003441,-0.77021,0.515834,-1.0,0.311193,0.311193
14,0.161905,-1.0,-0.749484,-1.0,0.03026,-1.0,1.0,1.0
15,0.619048,0.290491,1.0,0.290491,1.0,-0.561194,-1.0,-1.0
16,1.0,1.0,-1.0,1.0,-1.0,1.0,-0.815498,-0.815498


### ADF Testing

In [98]:
def adf_test(df: pd.DataFrame) -> pd.DataFrame:
    result = []
    for col in df.columns:
        stat, p_value, *_ = adfuller(df[col])
        result.append({
            'Variable': col,
            'Statistic': stat,
            'p-value': round(p_value, 5),
            'Stationary': p_value < 0.05
        })
    return pd.DataFrame(result, index=df.columns)

In [73]:
def check_stationarity(df: pd.DataFrame) -> bool:
    return df['Stationary'].all()

In [99]:
adf_test(scaled_train)

Unnamed: 0,Variable,Statistic,p-value,Stationary
AVG_HOURS,AVG_HOURS,-2.669296,0.0795,False
AVG_INCOME,AVG_INCOME,-1.133543,0.70151,False
EMP_POP,EMP_POP,-1.391222,0.58648,False
INFORMAL_EMP,INFORMAL_EMP,-1.133543,0.70151,False
LFPR,LFPR,-2.650393,0.08302,False
MIN_WAGE,MIN_WAGE,-7.6526,0.0,True
UNEMPLOYMENT,UNEMPLOYMENT,-13.94087,0.0,True
YOUTH_UNEMP,YOUTH_UNEMP,-13.94087,0.0,True


In [100]:
train_diff = scaled_train.diff().dropna()
order = 0 # Bậc sai phân

while not check_stationarity(adf_test(train_diff)):
    order += 1
    train_diff = train_diff.diff(periods=order).dropna()

print(f'Bậc sai phân: {order}')

Bậc sai phân: 1


In [101]:
adf_test(train_diff)

Unnamed: 0,Variable,Statistic,p-value,Stationary
AVG_HOURS,AVG_HOURS,-4.308828,0.00043,True
AVG_INCOME,AVG_INCOME,-7.506611,0.0,True
EMP_POP,EMP_POP,-4.834857,5e-05,True
INFORMAL_EMP,INFORMAL_EMP,-7.506611,0.0,True
LFPR,LFPR,-3.857152,0.00237,True
MIN_WAGE,MIN_WAGE,-11.476509,0.0,True
UNEMPLOYMENT,UNEMPLOYMENT,-4.891037,4e-05,True
YOUTH_UNEMP,YOUTH_UNEMP,-4.891037,4e-05,True


### Finding optimize lag

In [121]:
def best_lag(endog: Union[pd.DataFrame, list], max_lag: int = 30) -> pd.DataFrame:
    model = VAR(endog)
    results = []

    for lag in range(0, max_lag + 1):
        try:
            fitted = model.fit(maxlags=lag, ic=None)
            results.append((lag, fitted.aic))
        except Exception as e:
            print(f"Error fitting model with lag {lag}: {e}")
            continue
    
    results_df = pd.DataFrame(results, columns=['Lag', 'AIC'])
    return results_df.sort_values(by='AIC').reset_index(drop=True)

In [123]:
train_diff

Unnamed: 0,AVG_HOURS,AVG_INCOME,EMP_POP,INFORMAL_EMP,LFPR,MIN_WAGE,UNEMPLOYMENT,YOUTH_UNEMP
2,-0.588983,0.320017,1.032457,0.320017,1.038596,0.07482993,-0.562804,-0.562804
3,1.605932,-0.228387,-0.531685,-0.228387,-0.799298,0.05782313,-0.650438,-0.650438
4,-1.987288,0.141034,-0.382689,0.141034,-0.165614,0.05782313,0.989289,0.989289
5,0.572034,0.007712,-0.184853,0.007712,-0.134737,0.1768707,0.278481,0.278481
6,0.04661,-0.104696,0.440185,-0.104696,0.647018,-0.1734694,0.508277,0.508277
7,0.101695,0.005936,-0.331376,0.005936,-0.570526,0.01020408,-0.679649,-0.679649
8,0.466102,0.103938,-0.230603,0.103938,0.092632,0.0170068,1.269718,1.269718
9,-0.04661,-0.109159,-0.200927,-0.109159,-0.558596,2.220446e-16,-1.152872,-1.152872
10,-0.572034,-0.173403,0.117465,-0.173403,0.149474,-0.07823129,0.050633,0.050633
11,0.95339,0.190778,0.294281,0.190778,-0.058246,-0.006802721,-1.439143,-1.439143


In [122]:
best_lag(train_diff)

Error fitting model with lag 0: 4-th leading minor of the array is not positive definite
Error fitting model with lag 1: 4-th leading minor of the array is not positive definite
Error fitting model with lag 2: 8-th leading minor of the array is not positive definite
Error fitting model with lag 3: 4-th leading minor of the array is not positive definite
Error fitting model with lag 4: 8-th leading minor of the array is not positive definite
Error fitting model with lag 5: 4-th leading minor of the array is not positive definite
Error fitting model with lag 6: 4-th leading minor of the array is not positive definite
Error fitting model with lag 7: 4-th leading minor of the array is not positive definite
Error fitting model with lag 8: 4-th leading minor of the array is not positive definite
Error fitting model with lag 9: x contains one or more constant columns. Column(s) 61 are constant. Adding a constant with trend='c' is not allowed.
Error fitting model with lag 10: x contains one or

  self._init_dates(dates, freq)


Unnamed: 0,Lag,AIC
