## Spark ML Pipeline for Wikipedia Traffic Forecasting

In this notebook, we construct the entire pipeline for learning a generalized linear model and a neural network.

In [14]:
from pyspark.sql import functions as F, types as T

In [2]:
pagecount = spark.read.parquet("../data/enwiki/pagecount_daily_v2")
pages = spark.read.parquet("../data/enwiki/pages")
pagelinks = spark.read.parquet("../data/enwiki/pagelinks")

In [3]:
pagecount.count()

87420272

In [4]:
pagecount.show(vertical=True, n=2)

-RECORD 0-------------
 page_id | 1436       
 date    | 2018-06-27 
 count   | 1154       
-RECORD 1-------------
 page_id | 1436       
 date    | 2018-11-08 
 count   | 1598       
only showing top 2 rows



### Creating and caching the main view

We create a time-series vector for each page. We cache it since it will be reused several times in this session.

In [5]:
dataset = pagecount.groupBy("page_id").pivot("date").agg(F.min("count"))
dataset.cache()
dataset.printSchema(), dataset.count()

root
 |-- page_id: long (nullable = true)
 |-- 2018-01-01: long (nullable = true)
 |-- 2018-01-02: long (nullable = true)
 |-- 2018-01-03: long (nullable = true)
 |-- 2018-01-04: long (nullable = true)
 |-- 2018-01-05: long (nullable = true)
 |-- 2018-01-06: long (nullable = true)
 |-- 2018-01-07: long (nullable = true)
 |-- 2018-01-08: long (nullable = true)
 |-- 2018-01-09: long (nullable = true)
 |-- 2018-01-10: long (nullable = true)
 |-- 2018-01-11: long (nullable = true)
 |-- 2018-01-12: long (nullable = true)
 |-- 2018-01-13: long (nullable = true)
 |-- 2018-01-14: long (nullable = true)
 |-- 2018-01-15: long (nullable = true)
 |-- 2018-01-16: long (nullable = true)
 |-- 2018-01-17: long (nullable = true)
 |-- 2018-01-18: long (nullable = true)
 |-- 2018-01-19: long (nullable = true)
 |-- 2018-01-20: long (nullable = true)
 |-- 2018-01-21: long (nullable = true)
 |-- 2018-01-22: long (nullable = true)
 |-- 2018-01-23: long (nullable = true)
 |-- 2018-01-24: long (nullable = true

(None, 882517)

In [6]:
test_dataset = dataset.sample(0.1)
test_dataset.cache()
test_dataset.count()

88424

We uses a list of datestrings to vectorize a group of columns. We define the period which we will use for rolling evaluation of the model loss.

In [7]:
from datetime import datetime, timedelta

training_period=7*12
period = 7
start_date = datetime.strptime("2019-01-01", "%Y-%m-%d")

def get_name(t): 
    return (start_date + timedelta(t)).strftime("%Y-%m-%d")

train_dates = [get_name(t) for t in range(training_period)]
validate_dates = [get_name(training_period+t) for t in range(period)]
retrain_dates = [get_name(period+t) for t in range(training_period)]
test_dates = [get_name(training_period+period+t) for t in range(period)]

In [17]:
from pyspark.ml import Pipeline
from pyspark.ml.linalg import Vectors, VectorUDT
from pyspark.ml.feature import (
    VectorAssembler, 
    Imputer, 
    SQLTransformer, 
    StandardScaler
)
from pyspark.ml.regression import GeneralizedLinearRegression
import numpy as np


def assembler(output_name, input_dates):
    return  VectorAssembler(
        inputCols = input_dates,
        outputCol = output_name
    ).setHandleInvalid("keep")


pipeline = Pipeline(stages=[
    assembler("train", train_dates),
    assembler("validate", validate_dates),
    assembler("retrain", retrain_dates),
    assembler("test", test_dates),
])


learning_window = (
    pipeline
    .fit(test_dataset)
    .transform(test_dataset)
    .selectExpr(
        "page_id", 
        "train", 
        "validate",
        "retrain",
        "test"
    )
)
learning_window.show(n=5)
learning_window.printSchema()

+-------+--------------------+--------------------+--------------------+--------------------+
|page_id|               train|            validate|             retrain|                test|
+-------+--------------------+--------------------+--------------------+--------------------+
|   8075|[265.0,320.0,342....|[316.0,304.0,335....|[350.0,358.0,363....|[403.0,363.0,454....|
|  15846|[241.0,243.0,372....|[NaN,NaN,NaN,NaN,...|[278.0,277.0,291....|[NaN,NaN,NaN,NaN,...|
|  17043|[238.0,345.0,350....|[405.0,336.0,358....|[442.0,384.0,404....|[354.0,288.0,309....|
|  18730|[123.0,185.0,199....|[220.0,178.0,187....|[210.0,185.0,148....|[174.0,182.0,185....|
|  33862|[362.0,495.0,493....|[557.0,432.0,428....|[531.0,434.0,448....|[583.0,543.0,522....|
+-------+--------------------+--------------------+--------------------+--------------------+
only showing top 5 rows

root
 |-- page_id: long (nullable = true)
 |-- train: vector (nullable = true)
 |-- validate: vector (nullable = true)
 |-- retra

In [19]:
@F.udf(VectorUDT())
def fill0(vec):
    return Vectors.dense(np.nan_to_num(vec))


@F.udf(VectorUDT())
def denoise(vec, window_size=7):    
    # Create a new matrix for every row that is matrix of size (T//window_size, window_size)
    T = vec.shape[0]
    indexer = np.arange(window_size).reshape(1, -1) + window_size * np.arange(
        T // window_size
    ).reshape(-1, 1)

    # A page is made up of many windows
    page = vec[indexer]
    imputed = np.nan_to_num(page)
    scaled = imputed - imputed.mean(axis=0)
    u, s, vh = np.linalg.svd(scaled, full_matrices=False)
    
    k = window_size-1
    return Vectors.dense(u[:,:k].dot(np.diag(s[:k])).dot(vh[:k]).reshape(-1))


learning_window.select(denoise("train")).show(n=5)

+--------------------+
|      denoise(train)|
+--------------------+
|[-73.170450752947...|
|[172.166666666666...|
|[-170.27860447096...|
|[-71.091683780098...|
|[-148.75455166942...|
+--------------------+
only showing top 5 rows



In [20]:
poisson_regression = GeneralizedLinearRegression(
    family="poisson", 
    link="log",
    maxIter=10, 
    regParam=0.3,
)
poisson_regression.fit(
    learning_window
    .select(
        denoise("train").alias("features"), 
        F.expr(f"{validate_dates[0]} as label")
    )
)

GeneralizedLinearRegression_6e3af0ae7958

In [23]:
learning_window_full = (
    pipeline
    .fit(dataset)
    .transform(dataset)
    .selectExpr(
        "page_id", 
        "train", 
        "validate",
        "retrain",
        "test"
    )
)
model_full = poisson_regression.fit(
    learning_window_full
    .select(
        denoise("train").alias("features"), 
        F.expr(f"{validate_dates[0]} as label")
    )
)
prediction = model_full.transform(learning_window_full.withColumn("features", denoise("retrain")))
prediction.select("page_id", "train", "validate", "retrain", "test", "prediction").show()

+-------+--------------------+--------------------+--------------------+--------------------+------------------+
|page_id|               train|            validate|             retrain|                test|        prediction|
+-------+--------------------+--------------------+--------------------+--------------------+------------------+
|   1677|[256.0,357.0,391....|[586.0,406.0,341....|[370.0,388.0,390....|[344.0,380.0,315....| 1989.999999997957|
|   1697|[451.0,617.0,600....|[472.0,467.0,588....|[635.0,670.0,617....|[463.0,447.0,667....| 1989.999999997966|
|   1806|[3655.0,5033.0,50...|[4491.0,4868.0,39...|[5780.0,5199.0,51...|[4122.0,4105.0,48...|1989.9999999981374|
|   1950|[1191.0,361.0,269...|[484.0,485.0,468....|[379.0,505.0,440....|[563.0,585.0,650....| 1989.999999997996|
|   2927|[502.0,524.0,529....|[498.0,538.0,583....|[504.0,461.0,451....|[480.0,481.0,435....|1989.9999999979145|
|   3764|[981.0,1564.0,169...|[1365.0,1403.0,13...|[1669.0,1627.0,16...|[1579.0,1556.0,15...|198

## Appendix

In [None]:
x = learning_window.select(normalize("train").alias("train")).first()
x

import numpy as np

x.train
imputed = np.nan_to_num(x)

# Create a new matrix for every row that is matrix of size (T//window_size, window_size)
T = x.train.shape[0]
window_size=7
indexer = np.arange(window_size).reshape(1, -1) + window_size * np.arange(
    T // window_size
).reshape(-1, 1)

# A page is made up of many windows
page = x.train[indexer]
print(page)

u, s, vh = np.linalg.svd(page, full_matrices=False)


u.shape, s.shape, vh.shape

k=7
a1 = u[:,:k].dot(np.diag(s[:k])).dot(vh[:k])
print(np.allclose(x, a1.reshape(-1)))
k=6
a2 = u[:,:k].dot(np.diag(s[:k])).dot(vh[:k])
print(np.allclose(x, a2.reshape(-1)))

(a1-a2).astype(int)

page - page.mean(axis=0).shape