# Toy analysis on simulated data

In [1]:
# setup
import numpy as np
import scipy as sp
import pandas as pd
from scipy.linalg import solve_triangular

from sklearn.datasets import make_regression
from sklearn.linear_model import LinearRegression
from sklearn.model_selection import train_test_split

n = 10000
p = 5

# create simulated data
np.random.seed(42)
X = np.random.random_sample((n, p))
betas_real = np.random.normal(0, 3, p+1)
y = betas_real[0] + np.dot(X, betas_real[1:]) + np.random.normal(0, 0.5, n)

# save as pandas dataframe
xy = pd.DataFrame(X, columns = ["x" + str(i+1) for i in range(p)])
xy["y"] = y

# obtain linear model coefficients
lm = LinearRegression()
lm.fit(X, y)
print(lm.intercept_, lm.coef_)

-0.0349682218527132 [ 3.5977656   1.89014171 -2.45067194  5.37054442 -4.6054133 ]


In [2]:
# spark setup
import os
import sys

os.environ['PYSPARK_PYTHON'] = sys.executable
os.environ['PYSPARK_DRIVER_PYTHON'] = sys.executable
os.environ["PYARROW_IGNORE_TIMEZONE"] = "1"

# spark session
from pyspark.sql import SparkSession

spark = SparkSession \
    .builder \
    .master('local[*]') \
    .appName('This Spark session') \
    .config('spark.sql.repl.eagerEval.enabled', True) \
    .config('spark.sql.execution.arrow.pyspark.enabled', False) \
    .getOrCreate()

spark

In [3]:
# read data
df = spark.createDataFrame(xy)
n = df.count()
print('Full dataset has', n, 'rows.')
print('Columns:', df.columns)

Full dataset has 10000 rows.
Columns: ['x1', 'x2', 'x3', 'x4', 'x5', 'y']


In [4]:
# utils
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.sql.functions import col, pandas_udf, lit

# preprocessing
from pyspark.sql.functions import row_number, monotonically_increasing_id
from pyspark.sql import Window

df = df.withColumn("index", row_number().over(Window.orderBy(monotonically_increasing_id()))-1)
df = df.withColumn('index', df['index'].cast('double'))

features = [c for c in df.columns if c not in ['index', 'y']]
assembler = VectorAssembler(inputCols = features, outputCol = 'X')

df = assembler.transform(df).select("index", "X", "y")
df.printSchema()

root
 |-- index: double (nullable = false)
 |-- X: vector (nullable = true)
 |-- y: double (nullable = true)



In [5]:
# obtain two equal splits
s1 = df.limit(int(n/2))
ns = s1.count()
s2 = df.subtract(s1).limit(ns)

print('Split 1 has', ns, 'rows.')
print('Split 2 has', s2.count(), 'rows.')

Split 1 has 5000 rows.
Split 2 has 5000 rows.


### Linear model

In [6]:
# functions for liner model estimation and prediction

def map_lm(row):
    """Calculate x*t(x) and x*y for each row"""
    x = [1]
    x.extend(row['X'].toArray())
    x = np.array(x)
    y = row["y"]
    out = [np.outer(x, x), x*y]    
    return out

def get_prediction_lm(row, beta):
    """Calculate predictions for a row from estimated linear model"""
    x = row["X"].toArray()
    ypred = beta[1] + np.dot(x, beta[1:])
    return (row + (float(ypred),))

def cholesky_normal_equations(xtx, xty):
    """Calculate the solution to the normal equation via Cholesky scomposition"""
    
    L = np.linalg.cholesky(xtx) 
    z = solve_triangular(L, xty, lower = True) 
    beta_hat = solve_triangular(L, z, lower = True, trans = "T")
    return beta_hat    

In [7]:
%%time
# calculate t(X)X and t(X)y for the two splits
xtx_lm_s1, xty_lm_s1 = s1.rdd.map(lambda row: map_lm(row)).reduce(lambda a,b: (a[0]+b[0], a[1]+b[1]))
xtx_lm_s2, xty_lm_s2 = s2.rdd.map(lambda row: map_lm(row)).reduce(lambda a,b: (a[0]+b[0], a[1]+b[1]))

CPU times: total: 15.6 ms
Wall time: 16.2 s


In [8]:
# solve normal equations
beta_hat_lm_s1 = cholesky_normal_equations(xtx_lm_s1, xty_lm_s1)
beta_hat_lm_s2 = cholesky_normal_equations(xtx_lm_s2, xty_lm_s2)

In [9]:
%%time
# calculate predictions from linear model
o1 = s1.rdd.map(lambda row:get_prediction_lm(row, beta_hat_lm_s2)).toDF(['index', "X", "y", "yhat"])
o2 = s2.rdd.map(lambda row:get_prediction_lm(row, beta_hat_lm_s1)).toDF(['index', "X", "y", "yhat"])

CPU times: total: 15.6 ms
Wall time: 3.88 s


In [10]:
# functions for conformal prediction

@pandas_udf("double")
def get_abs_error(yreal: pd.Series, ypred: pd.Series) -> pd.Series:
    """Calculate absolute prediction error"""
    return np.abs(yreal-ypred)

@pandas_udf("double")
def find_quantile(err: pd.Series) -> pd.Series:
    """given ordered absolute errors, calculate ROO empirical quantile"""
    level = 0.95
    m = int(np.ceil(len(err)*level))
    out = [err.drop([i]).values[m-1] for i in range(len(err))]
    return pd.Series(out)

def ROOsplitconformal(df):
    """
    given a Spark dataframe with columns y and yhat,
    calculate and sort absolute errors, 
    find ROO empirical quantiles,
    calculate lower and upper bounds for ROO split conformal prediction intervals    
    """
    out = df \
            .withColumn("abs_error", get_abs_error(col("y"), col("yhat"))).sort(col("abs_error")) \
            .withColumn("ROO_eq", find_quantile(col("abs_error"))) \
            .withColumn("lower", col("yhat")-col("ROO_eq")).withColumn("upper", col("yhat")+col("ROO_eq"))
    return out

In [11]:
# ROO split conformal predictive intervals
o1 = o1.transform(ROOsplitconformal)
o2 = o2.transform(ROOsplitconformal)

# merge the splits
res_lm = o1.union(o2)

In [12]:
# functions for results evaluation

@pandas_udf("double")
def rmse(yreal: pd.Series, ypred: pd.Series) -> float:
    return np.sqrt(np.mean((yreal-ypred)**2))

@pandas_udf("double")
def rmse_orig(yreal: pd.Series, ypred: pd.Series) -> float:
    return np.sqrt(np.mean((np.exp(yreal)-np.exp(ypred))**2))

@pandas_udf("double")
def coverage(yreal: pd.Series, lower: pd.Series, upper: pd.Series) -> float:
    return np.mean(((yreal>lower) & (yreal<upper))*1.0)

@pandas_udf("double")
def avg_length(lower: pd.Series, upper: pd.Series) -> float:
    return np.mean(upper - lower)

In [13]:
# evaluate results
metrics_lm = res_lm.select(lit('linear').alias("model"),
                           rmse(col('y'), col('yhat')).alias('rmse'),
                           coverage(col('y'), col('lower'), col('upper')).alias('coverage'), 
                           avg_length(col('lower'), col('upper')).alias('avg_length'))

In [14]:
%%time
metrics_lm.show()

+------+-----------------+--------+-----------------+
| model|             rmse|coverage|       avg_length|
+------+-----------------+--------+-----------------+
|linear|3.665820561412509|    0.95|8.922321162283405|
+------+-----------------+--------+-----------------+

CPU times: total: 0 ns
Wall time: 20.5 s


### Additive model

In [15]:
# functions for additive model estimation and prediction

knots = np.linspace(0.05, 0.95, 10)

def ncs_elem(x, xk):
    """given a feature and knots, calulates basis functtion"""
    out = (((xk-1/2)**2-1/12)*((x-1/2)**2)-1/12)/4 - ((np.abs(x-xk)-1/2)**4-1/2*(np.abs(x-xk)-1/2)**2 + 7/240)/24
    return out

def splS(knots):
    """define S penalty matrix for a single term"""
    q = len(knots)+2
    S = np.zeros([q,q])
    k = np.array(knots)
    S[2:, 2:] = ncs_elem(k[:,None], k)
    return S
    
def get_bigS(knots, lam = 0.01):
    """Obtain global penalty matrix"""
    Slist = []
    for i in range(p):
        q = len(knots)+1
        S = np.zeros([p*q, p*q])
        S2 = np.zeros([len(S)+1, len(S)+1])
        S[i*q:(i+1)*q, i*q:(i+1)*q] = splS(knots)[1:,1:]
        S2[1:,1:] = S
        Slist.append(S2)
    bigS = sum(lam*elem for elem in Slist)
    return bigS

def map_gam(row):
    """for each row, compute natural cubic splines basis expansion for x then calculate x*t(x) and x*y"""
    nx = [1]
    x = row["X"].toArray()
    y = row["y"]
    bss = []
    for xij in x:
        nx.append(xij)
        nx.extend([ncs_elem(xij, knot) for knot in knots])
    x = np.array(nx)
    out = [np.outer(x, x), x*y]
    return out

def get_prediction_gam(row, beta):
    """Calculate predictions for estimated additive model"""
    nx = []
    x = row["X"].toArray()
    bss = []
    for xij in x:
        nx.append(xij)
        nx.extend([ncs_elem(xij, knot) for knot in knots])
    x = np.array(nx)
    ypred = beta[1] + np.dot(x, beta[1:])
    return (row + (float(ypred),))

In [16]:
%%time
# calculate t(X)X and t(X)y for the two splits
xtx_gam_s1, xty_gam_s1 = s1.rdd.map(lambda row: map_gam(row)).reduce(lambda a,b: (a[0]+b[0], a[1]+b[1]))
xtx_gam_s2, xty_gam_s2 = s2.rdd.map(lambda row: map_gam(row)).reduce(lambda a,b: (a[0]+b[0], a[1]+b[1]))

CPU times: total: 15.6 ms
Wall time: 7.04 s


In [17]:
%%time
bigS = get_bigS(knots, lam=0.0026366508987303583)
xtx_gam2_s1 = xtx_gam_s1 + bigS
xtx_gam2_s2 = xtx_gam_s2 + bigS

CPU times: total: 0 ns
Wall time: 5.48 ms


In [18]:
[np.all(xtx_gam_s1.T == xtx_gam_s1), np.all(xtx_gam2_s1.T == xtx_gam2_s1)]

[True, False]

In [19]:
# solve normal equations using LU decomposition since penalized matrix is not symmetric
beta_hat_gam_s1 = np.linalg.solve(xtx_gam2_s1, xty_gam_s1)
beta_hat_gam_s2 = np.linalg.solve(xtx_gam2_s2, xty_gam_s2)

In [20]:
%%time
# calculate predictions from additive model
o1 = s1.rdd.map(lambda row:get_prediction_gam(row, beta_hat_gam_s2)).toDF(['index', "X", "y", "yhat"])
o2 = s2.rdd.map(lambda row:get_prediction_gam(row, beta_hat_gam_s1)).toDF(['index', "X", "y", "yhat"])

CPU times: total: 31.2 ms
Wall time: 3.73 s


In [21]:
# ROO split conformal predictive intervals
o1 = o1.transform(ROOsplitconformal)
o2 = o2.transform(ROOsplitconformal)

# merge the splits
res_gam = o1.union(o2)

In [22]:
# evaluate results
metrics_gam = res_gam.select(lit('additive').alias('model'),
                             rmse(col('y'), col('yhat')).alias('rmse'),
                             coverage(col('y'), col('lower'), col('upper')).alias('coverage'), 
                             avg_length(col('lower'), col('upper')).alias('avg_length'))

metrics = metrics_lm.union(metrics_gam)

In [23]:
%%time
metrics.show()

+--------+------------------+--------+-----------------+
|   model|              rmse|coverage|       avg_length|
+--------+------------------+--------+-----------------+
|  linear| 3.665820561412509|    0.95|8.922321162283405|
|additive|3.6049689704612264|    0.95|8.787756129260648|
+--------+------------------+--------+-----------------+

CPU times: total: 0 ns
Wall time: 38.7 s


## 