In [1]:
import gzip as gz
import pandas as pd
import numpy as np

np.set_printoptions(suppress=True, precision=2)

In [2]:
import pyspark
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql.types import StructType, StructField, IntegerType, StringType

from operator import add

In [3]:
spark = SparkSession.builder.getOrCreate()
sc = spark.sparkContext

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


22/11/13 21:59:18 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
22/11/13 21:59:18 WARN Utils: Service 'SparkUI' could not bind on port 4040. Attempting port 4041.


## Setup

In [4]:
d = 13
batch_N = 5000

In [5]:
l = 200 #5850
batch_l = 120

W0 = np.random.randn(d, l-1)
W0 = np.hstack([ W0, np.zeros((W0.shape[0], 1)) ])
b0 = np.zeros((1, l))
b0[0, -1] = 1

In [6]:
W0.shape, b0.shape

((13, 200), (1, 200))

In [7]:
# condition check removes the last empty batch when W divides by batch size equally
split_idx = [batch_l*(i+1) for i in range(l // batch_l) if batch_l*(i+1) < l]
l_blocks = len(split_idx) + 1

# split W and bias at once
Wb = [(wb, bias_b) for wb,bias_b in zip(
    np.array_split(W0, split_idx, axis=1),
    np.array_split(b0, split_idx, axis=1)
)]

In [8]:
spWb = sc.parallelize(enumerate(Wb))
spW2 = sc.broadcast(spWb.cartesian(spWb).collect())

## One Criteo file

In [9]:
fname = "/Users/akusok/wrkdir/criteo/day_0.gz"
fpart = "/Users/akusok/wrkdir/criteo/part_0.gz"
fdir = "/Users/akusok/wrkdir/criteo/day_0/"
ffull = "/Volumes/P9230/Datasets/Criteo/"

In [10]:
# loaded from auto-detect schema
schema = StructType([
    StructField('y', IntegerType(), True), 
    StructField('x1', IntegerType(), True), 
    StructField('x2', IntegerType(), True), 
    StructField('x3', IntegerType(), True), 
    StructField('x4', IntegerType(), True), 
    StructField('x5', IntegerType(), True), 
    StructField('x6', IntegerType(), True), 
    StructField('x7', IntegerType(), True), 
    StructField('x8', IntegerType(), True), 
    StructField('x9', IntegerType(), True), 
    StructField('x10', IntegerType(), True), 
    StructField('x11', IntegerType(), True), 
    StructField('x12', IntegerType(), True), 
    StructField('x13', IntegerType(), True), 
])

In [11]:
df_raw = spark.read.options(delimiter="\t").csv(ffull, schema=schema)
# N = df_raw.count()

```
195841983
```

In [12]:
df_y = df_raw.select("y")
df = df_raw.select(*[f"x{i}" for i in range(1, 14)])

In [13]:
# df_y.groupby("y").agg(F.count("*")).show()

```
+---+---------+
|  y| count(1)|
+---+---------+
|  1|  6286525|
|  0|189555458|
+---+---------+
```

## Process a bunch of Numpy arrays into chunks of HH arrays

## _PYSPARK_

In [14]:
def compute(buf):
    x0 = pd.DataFrame(buf).fillna(-1).to_numpy().astype(int)
    
    for w in spW2.value:
        i, (w1, b1) = w[0]
        j, (w2, b2) = w[1]

        if j < i:
            yield ((i, j), 0)
        else:
            # Qb = (f(X0@Wb1) + bb1).T @ (f(X0@Wb2) + bb2)
            yield ((i, j), (np.tanh(x0@w1) + b1).T @ (np.tanh(x0@w2) + b2))

In [15]:
def process_partition(data):
    buf = []
    
    for d in data:
        buf.append(d)
        if len(buf) >= batch_N:
            yield from compute(buf)
            buf = []
            
    if len(buf) > 0:
        yield from compute(buf)

In [16]:
spBlocks = df.rdd.map(np.array).mapPartitions(process_partition).foldByKey(0, add)

`Repartition` vs `Coalesce`

Bring computations to data, not the other way

In [17]:
%%time

W_blocks = [[ [] for _ in range(l_blocks)  ] for _ in range(l_blocks)]

# gather data into block matrix
for (i, j), Qb in spBlocks.collect():
    W_blocks[i][j] = Qb

# fill missing triangular part
for i in range(l_blocks):
    for j in range(l_blocks):
        if j > i:
            W_blocks[j][i] = W_blocks[i][j].T

W3 = np.block(W_blocks)
W3.shape



CPU times: user 1.83 s, sys: 1.63 s, total: 3.47 s
Wall time: 6h 36min 13s


                                                                                

(200, 200)

In [18]:
W3[:5, :5]

array([[ 4.34e+09, -1.80e+09, -2.89e+09, -1.92e+09, -1.86e+09],
       [-1.80e+09,  4.35e+09,  2.33e+09,  2.82e+09,  3.16e+09],
       [-2.89e+09,  2.33e+09,  4.37e+09,  2.26e+09,  2.75e+09],
       [-1.92e+09,  2.82e+09,  2.26e+09,  4.36e+09,  3.56e+09],
       [-1.86e+09,  3.16e+09,  2.75e+09,  3.56e+09,  4.37e+09]])

In [19]:
W3[-4:, -4:]

array([[ 4.36e+09,  3.11e+09,  3.99e+09, -3.39e+09],
       [ 3.11e+09,  4.35e+09,  3.03e+09, -3.60e+09],
       [ 3.99e+09,  3.03e+09,  4.36e+09, -3.63e+09],
       [-3.39e+09, -3.60e+09, -3.63e+09,  4.37e+09]])

In [22]:
np.save("/Users/akusok/wrkdir/criteo/W3.npy", W3)

In [23]:
W3[-1,-1]

4373472329.0

In [25]:
W3[-1,-1] / 1e9

4.373472329

## Measure pure unzip time

In [4]:
%%writefile foo.py

import gzip as gz
from time import time
from multiprocessing import Pool

def foo(i):
    a = 0
    with gz.open(f"/Volumes/P9230/Datasets/Criteo/day_{i}.gz", "rb") as fdata:
        for line in fdata:
            a += len(line)
    return a

if __name__ == "__main__":
    t = time()
    with Pool(8) as p:
        res = p.map(foo, range(8))
    print(time() - t)
    
# python foo.py  2372.30s user 100.72s system 228% cpu 18:03.20 total

Overwriting foo.py
