# Replication of paper "Econometrics at Scale - Spark Up Big Data in Economics", Chapter 4.3 (B. Bluhm and J. Cutura)
This notebook implements a distributed panel data regression exercise in PySpark as described in more detail in the paper "Econometrics at Scale - Spark Up Big Data in Economics" by Benjamin Bluhm and Jannic Cutura.

# Load packages
Note: if you want to execute this notebook on your local PC you need to create a Spark session (not required on AWS EMR notebook instance).  

In [None]:
import datetime
import math
import numpy as np
from pyspark.sql.types import IntegerType
import pyspark.sql.functions as f
from pyspark.ml.regression import GeneralizedLinearRegression
from pyspark.ml.feature import VectorAssembler

# Functions to compute:
- bread-and-meat matrix for panel robust standard errors
- degrees of freedom adjusted standard errors

In [None]:
def compute_bread_meat(iterator):
    """

    :param iterator:
    :return:
    """
    # Cast data to list
    data = list(iterator)
    
    # Construct arrays of features and residuals
    X = np.array([i[1][2:] for i in data])
    X_key = np.array([i[0] for i in data])
    X_time = np.array([i[1][0] for i in data])
    uhat = np.array([i[1][1] for i in data])

    # Set paramaters
    k = X.shape[1]
    n = len(np.unique(X_key))

    # Generate key and time lists
    tind = [[] for i in range(n)]
    tlab = [[] for i in range(n)]

    for index, i in np.ndenumerate(np.unique(X_key)):
        tind[index[0]] = np.where(X_key == i)

    # Initialze bread and meat arrays
    xtx = np.zeros((k, k, n))
    xuux = np.zeros((k, k, n))

    # Compute array cross-products
    for i in range(n):
        x = X[tind[i]]
        xt = np.transpose(x)
        u = uhat[tind[i]]
        ut = np.transpose(u)
        xtx[:, :, i] = xt.dot(x)
        xuux[:, :, i] = np.matmul(np.array([xt.dot(u)]).T, np.array([ut.dot(x)]))

    XtX = np.sum(xtx, axis=2)
    XuuX = np.sum(xuux, axis=2)

    yield [XtX, XuuX]

    
# Compute degrees of freedom adjusted standard errors
def compute_se_df_adjusted(n, t, k, summary):
    """
    :param n:
    :param t:
    :param k:
    :param summary:
    :return:
    """
    adjustment_factor = 1 / (n * (t - 1) - k) * (n * t - k)
    se = [math.sqrt(adjustment_factor) * i for i in summary.coefficientStandardErrors]
    return se

# Config

In [1]:
# path to parquet with simulated panel data
path = 's3://my-bucket/data/panel'

# specify independent variables
independent_vars = ["x1", "x2", "x3", "x4", "x5", "x6", "x7"]

# specify columns for panel ID, target variables and time dimension
id_var = "id"
dep_var = "target"
time_var = "time"

# define number of partitions
num_partitions = 1000

# Load data

In [None]:
df = spark.read.parquet(path)

# Within-group transformation via Spark SQL

In [None]:
# select columns
df.createOrReplaceTempView("df")
df = spark.sql(""" SELECT id, time, target, x1, x2, x3, x4, x5, x6, x7 FROM df""")

# create dataframe for standard ols regression
df.createOrReplaceTempView("df")
df_train = spark.sql("""SELECT id, time, target, x1, x2, x3, x4, x5, x6, x7 FROM df""")
df_train.createOrReplaceTempView("df_train")

# perform within-group transformation
df_train_between = spark.sql("""SELECT id, 
                             avg(target) as target, 
                             avg(x1) as x1, 
                             avg(x2) as x2, 
                             avg(x3) as x3, 
                             avg(x4) as x4, 
                             avg(x5) as x5, 
                             avg(x6) as x6, 
                             avg(x7) as x7 
                             FROM df
                             GROUP BY id""")
df_train_between.createOrReplaceTempView("df_train_between")

df_train_within = spark.sql("""SELECT id, time,
                                 target + (select avg(target) from df) as target, 
                                 x1 + (select avg(x1) from df) as x1, 
                                 x2 + (select avg(x2) from df) as x2, 
                                 x3 + (select avg(x3) from df) as x3, 
                                 x4 + (select avg(x4) from df) as x4, 
                                 x5 + (select avg(x5) from df) as x5, 
                                 x6 + (select avg(x6) from df) as x6, 
                                 x7 + (select avg(x7) from df) as x7
                                 FROM df""")
df_train_within.createOrReplaceTempView("df_train_within")

df_train_within = spark.sql("""SELECT a.id, a.time,
                           (a.target - b.target) as target,
                           (a.x1 - b.x1) as x1,
                           (a.x2 - b.x2) as x2,
                           (a.x3 - b.x3) as x3,
                           (a.x4 - b.x4) as x4,
                           (a.x5 - b.x5) as x5,
                           (a.x6 - b.x6) as x6,
                           (a.x7 - b.x7) as x7
                           FROM df_train_within as a
                           JOIN df_train_between as b ON a.id = b.id""") 

# apply VectorAssembler to prepare input data in required format 
vector_assembler = VectorAssembler(inputCols = independent_vars, outputCol = 'features')
df_train = vector_assembler.transform(df_train)
df_train_within = vector_assembler.transform(df_train_within)

# Fit linear model 
- without data transformation

In [None]:
start = datetime.datetime.now()
# Standard OLS
model = GeneralizedLinearRegression(labelCol="target", featuresCol="features").fit(df_train)
summary_ols = model.summary
print(summary_ols)
end = datetime.datetime.now()
print(end - start)

# Fit linear model with 
- transformed data & degrees of freedom adjusted standard errors 

In [None]:
start = datetime.datetime.now()
# Fixed effects model
model = GeneralizedLinearRegression(labelCol="target", featuresCol="features").fit(df_train_within)
summary_panel = model.summary
print(compute_se_df_adjusted(1000000000, 10, 8, summary_panel))
end = datetime.datetime.now()
print(end - start)

# Compute panel robust standard errors 

In [None]:
start = datetime.datetime.now()

# Predict, compute residuals and add intercept
df_train_within = model.transform(df_train_within)
df_train_within.createOrReplaceTempView("df")
df_train_within = spark.sql("""SELECT df.*, (target - prediction) as u, 1 as intercept FROM df""")

# Select relevant columns for computing sandwich VCE
df = df_train_within.select(["id", "time", "u", "intercept"] + independent_vars)

# Create hash partitioner assuring that data for each id is in one partition
def key_partitioner(id):
    return hash(id)

# Create RDD with 1,000 partitions
key_value_rdd = df.rdd.map(lambda x: (x[0], x[1:11])).partitionBy(1000, key_partitioner)

# Compute array cross-products for sandwich VCE and collect results to master node
arr_bread_meat = key_value_rdd.mapPartitions(compute_bread_meat).collect()

# Construct bread and meat arrays and sandwich VCE
bread = np.linalg.inv(sum([item[0] for item in arr_bread_meat]))
meat = sum([item[1] for item in arr_bread_meat])
vcov = bread.dot(meat).dot(bread)

# Compute robust standard errors (excluding intercept)
se = list(np.sqrt(vcov).diagonal())[1:]
print(se)
end = datetime.datetime.now()
print(end - start)