# Criteo 1 TiB benchmark

In this experiment we will evalutate a number of machine learning tools on a varying size of train data to determine how fast they learn, how much memory they consume etc.
在这个实验中，我们将在不同大小的训练数据上评估许多机器学习工具，以确定它们学习的速度、消耗的内存等。

We will assess Vowpal Wabbit and XGBoost in local mode, and Spark.ML models in cluster mode.

我们将在本地模式下评估Vowpal Wabbit和XGBoost，在集群模式下评估Spark.ML模型。

We will use terabyte click logs released by Criteo and sample needed amount of data from them.

我们将使用Criteo发布的TB点击日志，并从中获取所需的数据量。


This instance of experiment notebook focuses on data preparation and training VW & XGBoost locally.
本实验笔记本集中于数据准备和本地 training VW & XGBoost。

Let's go!



# Table of contents

* [Configuration](#Configuration)
* [Data preparation](#Data-preparation)
  * [Criteo → LibSVM](#Criteo-→-LibSVM)
  * [LibSVM → Train and test (sampling)](#LibSVM-→-Train-and-test-(sampling%29)
  * [LibSVM train and test → VW train and test](#LibSVM-train-and-test-→-VW-train-and-test)
  * [Local data](#Local-data)
* [Local training](#Local-training)

In [1]:
%load_ext autotime
%matplotlib inline

from __future__ import print_function

time: 366 ms (started: 2022-11-16 01:53:37 +00:00)


In [2]:
import findspark

findspark.init()

time: 2.39 ms (started: 2022-11-16 01:53:37 +00:00)


## Configuration
[_(back to toc)_](#Table-of-contents)

Paths:

In [3]:
criteo_data_remote_path = 'criteo/plain'
libsvm_data_remote_path = 'criteo/libsvm'
vw_data_remote_path = 'criteo/vw'

local_data_path = 'criteo/data'
local_results_path = 'criteo/results'
local_runtime_path = 'criteo/runtime'

time: 516 µs (started: 2022-11-16 01:53:37 +00:00)


In [4]:
import os


criteo_day_template = os.path.join(criteo_data_remote_path, 'day{}')
libsvm_day_template = os.path.join(libsvm_data_remote_path, 'day{}')
vw_day_template = os.path.join(vw_data_remote_path, 'day{}')

libsvm_train_template = os.path.join(libsvm_data_remote_path, 'train', '{}')
libsvm_test_template = os.path.join(libsvm_data_remote_path, 'test', '{}')
vw_train_template = os.path.join(vw_data_remote_path, 'train', '{}')
vw_test_template = os.path.join(vw_data_remote_path, 'test', '{}')

local_libsvm_test_template = os.path.join(local_data_path, 'data.test.{}.libsvm')
local_libsvm_train_template = os.path.join(local_data_path, 'data.train.{}.libsvm')
local_vw_test_template = os.path.join(local_data_path, 'data.test.{}.vw')
local_vw_train_template = os.path.join(local_data_path, 'data.train.{}.vw')

# local_libsvm_test_template = os.path.join(local_data_path, 'datatest{}libsvm')
# local_libsvm_train_template = os.path.join(local_data_path, 'datatrain{}libsvm')
# local_vw_test_template = os.path.join(local_data_path, 'datatest{}vw')
# local_vw_train_template = os.path.join(local_data_path, 'datatrain{}vw')

time: 2.51 ms (started: 2022-11-16 01:53:37 +00:00)


In [5]:
def ensure_directory_exists(path):
    if not os.path.exists(path):
        os.makedirs(path)

time: 282 µs (started: 2022-11-16 01:53:37 +00:00)


In [6]:
file_lists = [libsvm_data_remote_path, criteo_data_remote_path, vw_data_remote_path, local_data_path, local_results_path, local_runtime_path]

for file in file_lists:
    ensure_directory_exists(file)

time: 577 µs (started: 2022-11-16 01:53:37 +00:00)


Days to work on:

In [7]:
days = list(range(0, 23 + 1))


time: 211 µs (started: 2022-11-16 01:53:37 +00:00)


Samples to take:

In [8]:
train_samples = [
    10000, 30000,  # tens of thousands
    100000, 300000,  # hundreds of thousands
    1000000, 3000000,  # millions
    10000000, 30000000,  # tens of millions
    100000000, 300000000,  # hundreds of millions
    1000000000, 3000000000,  # billions
]
test_samples = [1000000]



time: 769 µs (started: 2022-11-16 01:53:38 +00:00)


Spark configuration and initialization:

In [9]:
total_cores = 256

time: 633 µs (started: 2022-11-16 01:53:38 +00:00)


In [10]:
executor_cores = 4
executor_instances = total_cores / executor_cores
memory_per_core = 4

time: 651 µs (started: 2022-11-16 01:53:38 +00:00)


In [11]:
app_name = 'Criteo experiment'

master = 'yarn'

settings = {
    'spark.network.timeout': '600',
    
    'spark.driver.cores': '16',
    'spark.driver.maxResultSize': '16G',
    'spark.driver.memory': '32G',
    
    'spark.executor.cores': str(executor_cores),
    'spark.executor.instances': str(executor_instances),
    'spark.executor.memory': str(memory_per_core * executor_cores) + 'G',
    
    'spark.speculation': 'true',
    'spark.yarn.queue': 'root.HungerGames',
}

time: 1.32 ms (started: 2022-11-16 01:53:38 +00:00)


In [12]:
import findspark
findspark.init('/data/dataset/fengwen/script/data/spark-3.3.1-bin-hadoop3-scala2.13')
from pyspark import SparkContext
sc = SparkContext('local','pyspark')
os.environ['PYSPARK_DRIVER_PYTHON'] = '/home/fengwen/miniconda3/envs/spark/bin/jupyter'
os.environ['PYSPARK_DRIVER_PYTHON_OPTS'] = " --ip=0.0.0.0 --port=7777"
# jupyter: /home/fengwen/miniconda3/envs/spark/bin/jupyter

22/11/16 01:53:39 WARN Utils: Your hostname, oneflow-27 resolves to a loopback address: 127.0.1.1; using 192.168.1.27 instead (on interface ens121f0)
22/11/16 01:53:39 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address


Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


22/11/16 01:53:40 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
time: 2.9 s (started: 2022-11-16 01:53:38 +00:00)


In [13]:
from pyspark.sql import SparkSession


builder = SparkSession.builder

builder.appName(app_name)
builder.master(master)
for k, v in settings.items():
    builder.config(k, v)

spark = builder.getOrCreate()
sc = spark.sparkContext

sc.setLogLevel('ERROR')


time: 101 ms (started: 2022-11-16 01:53:41 +00:00)


Logging:

In [14]:
import sys
import logging

from importlib import reload # 添加
logging.shutdown()            # 添加
reload(logging)              # 在 reload(logging) 前添加两行代码


handler = logging.StreamHandler(stream=sys.stdout)
formatter = logging.Formatter('[%(asctime)s] %(message)s')
handler.setFormatter(formatter)

ensure_directory_exists(local_runtime_path)
file_handler = logging.FileHandler(filename=os.path.join(local_runtime_path, 'mylog.log'), mode='a')
file_handler.setFormatter(formatter)

logger = logging.getLogger()
logger.addHandler(handler)
logger.addHandler(file_handler)
logger.setLevel(logging.INFO)

time: 5.11 ms (started: 2022-11-16 01:53:41 +00:00)


In [15]:
logger.info('Spark version: %s.', spark.version)

[2022-11-16 01:53:41,734] Spark version: 3.3.1.
time: 3.25 ms (started: 2022-11-16 01:53:41 +00:00)


## Data preparation 数据准备
[_(back to toc)_](#Table-of-contents)

Poor man's HDFS API:

In [16]:
def hdfs_exists(path):
    l = !hadoop fs -ls $path 2>/dev/null
    return len(l) != 0

def hdfs_success(path):
    return hdfs_exists(os.path.join(path, '_SUCCESS'))

def hdfs_delete(path, recurse=False):
    if recurse:
        _ = !hadoop fs -rm -r $path
    else:
        _ = !hadoop fs -rm $path

def hdfs_get(remote_path, local_path):
    remote_path_glob = os.path.join(remote_path, 'part-*')
    _ = !hadoop fs -cat $remote_path_glob >$local_path

time: 590 µs (started: 2022-11-16 01:53:41 +00:00)


Load RDDs from one place and save them to another converted:
从一个位置加载RDD并将其保存到另一个已转换的位置：

In [17]:
def convert_chunked_data(input_path_template, output_path_template, chunks, load_rdd, convert_row, transform_rdd=None):
    for chunk in chunks:
        input_path = input_path_template.format(chunk)
        output_path = output_path_template.format(chunk)
    
        if hdfs_success(output_path):
            print('Chunk "%s" is already converted and saved to "%s", skipping.', chunk, output_path)
            logger.info('Chunk "%s" is already converted and saved to "%s", skipping.', chunk, output_path)
            continue

        logger.info('Reading chunk "%s" data from "%s".', chunk, input_path)
     
        rdd = load_rdd(input_path)

        if hdfs_exists(output_path):
            logger.info('Cleaning "%s".', output_path)
            hdfs_delete(output_path, recurse=True)

        # print("=d"*50)
        # print(output_path)
        
        logger.info('Processing and saving to "%s".', output_path)

        rdd = rdd.map(convert_row)
        print(type(rdd))
        print(rdd)
        
        if transform_rdd is not None:
            rdd = transform_rdd(rdd)
        
        rdd.saveAsTextFile(output_path)

        logger.info('Done with chunk "%s".', chunk)

time: 739 µs (started: 2022-11-16 01:53:41 +00:00)


### Criteo → LibSVM
[_(back to toc)_](#Table-of-contents)

Criteo RDD is actually a DataFrame:
Criteo RDD实际上是一个数据帧:

In [18]:
def load_criteo_rdd(path):
    # print("path",path) # path criteo/plain/day_0
    # print(type(path))
    return (
        spark
        .read
        .option('header', 'false')
        .option('inferSchema', 'true')
        .option('delimiter', '\t')
        .csv(path)
        .rdd
    )

time: 1.11 ms (started: 2022-11-16 01:53:42 +00:00)


Simply add an index to each existing column except the first one which is a target:
简单地向每个现有列添加一个索引，除了第一个是目标列:

In [19]:
def criteo_to_libsvm(row):
    return (
        str(row[0])
        + ' '
        + ' '.join(
            [
                # integer features
                str(i) + ':' + str(row[i])
                for i in range(1, 13 + 1)
                if row[i] is not None
            ] + [
                # string features converted from hex to int
                str(i) + ':' + str(int(row[i], 16))
                for i in range(14, 39 + 1)
                if row[i] is not None
            ]
        )
    )

time: 1.97 ms (started: 2022-11-16 01:53:42 +00:00)


Do it for all days:
每天都这样做:

In [20]:
convert_chunked_data(criteo_day_template, libsvm_day_template, days, load_criteo_rdd, criteo_to_libsvm)

[2022-11-16 01:53:42,205] Reading chunk "0" data from "criteo/plain/day0".
[2022-11-16 01:53:46,370] Processing and saving to "criteo/libsvm/day0".
<class 'pyspark.rdd.PipelinedRDD'>
PythonRDD[15] at RDD at PythonRDD.scala:53
[2022-11-16 01:53:47,286] Done with chunk "0".


                                                                                

[2022-11-16 01:53:47,299] Reading chunk "1" data from "criteo/plain/day1".
[2022-11-16 01:53:47,551] Processing and saving to "criteo/libsvm/day1".
<class 'pyspark.rdd.PipelinedRDD'>
PythonRDD[34] at RDD at PythonRDD.scala:53
[2022-11-16 01:53:47,820] Done with chunk "1".
[2022-11-16 01:53:47,830] Reading chunk "2" data from "criteo/plain/day2".
[2022-11-16 01:53:48,056] Processing and saving to "criteo/libsvm/day2".
<class 'pyspark.rdd.PipelinedRDD'>
PythonRDD[53] at RDD at PythonRDD.scala:53
[2022-11-16 01:53:48,271] Done with chunk "2".
[2022-11-16 01:53:48,285] Reading chunk "3" data from "criteo/plain/day3".
[2022-11-16 01:53:48,461] Processing and saving to "criteo/libsvm/day3".
<class 'pyspark.rdd.PipelinedRDD'>
PythonRDD[72] at RDD at PythonRDD.scala:53
[2022-11-16 01:53:48,692] Done with chunk "3".
[2022-11-16 01:53:48,704] Reading chunk "4" data from "criteo/plain/day4".
[2022-11-16 01:53:48,901] Processing and saving to "criteo/libsvm/day4".
<class 'pyspark.rdd.PipelinedRDD'

### LibSVM → Train and test (sampling)
[_(back to toc)_](#Table-of-contents)

Let's name samples as their shortened "engineering" notation - e.g. 1e5 is 100k etc.:

In [21]:
def sample_name(sample):
    return str(sample)[::-1].replace('000', 'k')[::-1]

time: 1.29 ms (started: 2022-11-16 01:53:56 +00:00)


Load data, sample a bit more than needed and cut at exact desired number of lines by zipping with index and filtering upto required index:

加载数据，采样比需要的多一点，并通过索引压缩和过滤到所需的索引来精确地切割所需的行数:



In [22]:
oversample = 1.03
sampled_partitions = 256

from functools import reduce

def sample_and_save(input_path_template, output_path_template, days, samples):
    union = None
    union_count = None
    
    for sample in samples:
        name = sample_name(sample)
        output_path = output_path_template.format(name)
        
        if hdfs_success(output_path):
            logger.info('Sample "%s" is already written to "%s", skipping.', sample, output_path)
            continue
            
        logger.info('Preparing to write sample to "%s".', output_path)
        
        if union is None:
            rdds = map(lambda day: sc.textFile(input_path_template.format(day)), days)
            union = reduce(lambda left, right: left.union(right), rdds)

            union_count = union.count()
            logger.info('Total number of lines for days "%s" is "%s".', days, union_count)
            
        ratio = float(sample) / union_count
        
        sampled_union = (
            union
            .sample(False, min(1.0, oversample * ratio))
            .zipWithIndex()
            .filter(lambda z: z[1] < sample)
            .map(lambda z: z[0])
        )
        
        if hdfs_exists(output_path):
            logger.info('Cleaning "%s".', output_path)
            hdfs_delete(output_path, recurse=True)
            
        logger.info('Writing sample "%s" to "%s".', sample, output_path)
        sampled_union.coalesce(sampled_partitions).saveAsTextFile(output_path)
        
        logger.info('Saved "%s" lines to "%s".', sc.textFile(output_path).count(), output_path)

time: 4.63 ms (started: 2022-11-16 01:53:56 +00:00)


Sample all LibSVM data:

采样所有LibSVM数据:



In [23]:

sample_and_save(libsvm_day_template, libsvm_test_template, days[-1:], test_samples)

[2022-11-16 01:53:56,533] Preparing to write sample to "criteo/libsvm/test/1kk".
[2022-11-16 01:53:56,736] Total number of lines for days "[23]" is "2000".
[2022-11-16 01:53:56,751] Writing sample "1000000" to "criteo/libsvm/test/1kk".
[2022-11-16 01:53:56,945] Saved "2000" lines to "criteo/libsvm/test/1kk".
time: 427 ms (started: 2022-11-16 01:53:56 +00:00)


In [24]:
sample_and_save(libsvm_day_template, libsvm_train_template, days[:-1], train_samples)

[2022-11-16 01:53:57,076] Preparing to write sample to "criteo/libsvm/train/10k".
[2022-11-16 01:53:58,313] Total number of lines for days "[0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22]" is "46000".
[2022-11-16 01:53:58,622] Writing sample "10000" to "criteo/libsvm/train/10k".


                                                                                

[2022-11-16 01:53:59,953] Saved "10000" lines to "criteo/libsvm/train/10k".
[2022-11-16 01:53:59,965] Preparing to write sample to "criteo/libsvm/train/30k".
[2022-11-16 01:54:00,281] Writing sample "30000" to "criteo/libsvm/train/30k".


                                                                                

[2022-11-16 01:54:01,902] Saved "30000" lines to "criteo/libsvm/train/30k".
[2022-11-16 01:54:01,914] Preparing to write sample to "criteo/libsvm/train/100k".
[2022-11-16 01:54:02,219] Writing sample "100000" to "criteo/libsvm/train/100k".


                                                                                

[2022-11-16 01:54:04,162] Saved "46000" lines to "criteo/libsvm/train/100k".


                                                                                

[2022-11-16 01:54:04,175] Preparing to write sample to "criteo/libsvm/train/300k".
[2022-11-16 01:54:04,465] Writing sample "300000" to "criteo/libsvm/train/300k".


                                                                                

[2022-11-16 01:54:06,121] Saved "46000" lines to "criteo/libsvm/train/300k".
[2022-11-16 01:54:06,133] Preparing to write sample to "criteo/libsvm/train/1kk".
[2022-11-16 01:54:06,377] Writing sample "1000000" to "criteo/libsvm/train/1kk".


                                                                                

[2022-11-16 01:54:08,142] Saved "46000" lines to "criteo/libsvm/train/1kk".


                                                                                

[2022-11-16 01:54:08,155] Preparing to write sample to "criteo/libsvm/train/3kk".
[2022-11-16 01:54:08,394] Writing sample "3000000" to "criteo/libsvm/train/3kk".


                                                                                

[2022-11-16 01:54:10,124] Saved "46000" lines to "criteo/libsvm/train/3kk".
[2022-11-16 01:54:10,137] Preparing to write sample to "criteo/libsvm/train/10kk".
[2022-11-16 01:54:10,354] Writing sample "10000000" to "criteo/libsvm/train/10kk".


                                                                                

[2022-11-16 01:54:12,003] Saved "46000" lines to "criteo/libsvm/train/10kk".
[2022-11-16 01:54:12,015] Preparing to write sample to "criteo/libsvm/train/30kk".
[2022-11-16 01:54:12,238] Writing sample "30000000" to "criteo/libsvm/train/30kk".


                                                                                

[2022-11-16 01:54:14,027] Saved "46000" lines to "criteo/libsvm/train/30kk".
[2022-11-16 01:54:14,038] Preparing to write sample to "criteo/libsvm/train/100kk".
[2022-11-16 01:54:14,268] Writing sample "100000000" to "criteo/libsvm/train/100kk".


                                                                                

[2022-11-16 01:54:16,008] Saved "46000" lines to "criteo/libsvm/train/100kk".
[2022-11-16 01:54:16,020] Preparing to write sample to "criteo/libsvm/train/300kk".
[2022-11-16 01:54:16,256] Writing sample "300000000" to "criteo/libsvm/train/300kk".


                                                                                

[2022-11-16 01:54:18,016] Saved "46000" lines to "criteo/libsvm/train/300kk".
[2022-11-16 01:54:18,028] Preparing to write sample to "criteo/libsvm/train/1kkk".
[2022-11-16 01:54:18,280] Writing sample "1000000000" to "criteo/libsvm/train/1kkk".


                                                                                

[2022-11-16 01:54:20,002] Saved "46000" lines to "criteo/libsvm/train/1kkk".


                                                                                

[2022-11-16 01:54:20,015] Preparing to write sample to "criteo/libsvm/train/3kkk".
[2022-11-16 01:54:20,309] Writing sample "3000000000" to "criteo/libsvm/train/3kkk".


                                                                                

[2022-11-16 01:54:21,904] Saved "46000" lines to "criteo/libsvm/train/3kkk".
time: 24.8 s (started: 2022-11-16 01:53:57 +00:00)


### LibSVM train and test → VW train and test
[_(back to toc)_](#Table-of-contents)

LibSVM RDD is a text file:

In [25]:
def load_libsvm_rdd(path):
    return sc.textFile(path)

time: 424 µs (started: 2022-11-16 01:54:22 +00:00)


Conversion is trivial - we only have to map target to {-1, 1} and convert categorical features to VW feature names as a whole:

In [26]:
def libsvm_to_vw(line):
    parts = line.split(' ')
    parts[0] = '1 |' if parts[0] == '1' else '-1 |'
    for i in range(1, len(parts)):
        index, _, value = parts[i].partition(':')
        if int(index) >= 14:
            parts[i] = index + '_' + value
    return ' '.join(parts)

time: 721 µs (started: 2022-11-16 01:54:22 +00:00)


Also, data for VW should be well shuffled:

In [27]:
import hashlib


def calculate_hash(something):
    m = hashlib.md5()
    m.update(str(something))
    return m.hexdigest()

def random_sort(rdd):
    
    return (
        rdd
        .zipWithIndex()
        .sortBy(lambda z: calculate_hash(z[1]))
        .map(lambda z: z[0])
    )

time: 893 µs (started: 2022-11-16 01:54:22 +00:00)


Convert all LibSVM samples:

In [28]:
convert_chunked_data(libsvm_test_template, vw_test_template, [sample_name(sample) for sample in test_samples], load_libsvm_rdd, libsvm_to_vw, transform_rdd=random_sort)

[2022-11-16 01:54:22,463] Reading chunk "1kk" data from "criteo/libsvm/test/1kk".
[2022-11-16 01:54:22,496] Processing and saving to "criteo/vw/test/1kk".
<class 'pyspark.rdd.PipelinedRDD'>
PythonRDD[646] at RDD at PythonRDD.scala:53
22/11/16 01:54:22 ERROR Utils: Aborting task
org.apache.spark.api.python.PythonException: Traceback (most recent call last):
  File "/data/dataset/fengwen/script/data/spark-3.3.1-bin-hadoop3-scala2.13/python/lib/pyspark.zip/pyspark/worker.py", line 686, in main
    process()
  File "/data/dataset/fengwen/script/data/spark-3.3.1-bin-hadoop3-scala2.13/python/lib/pyspark.zip/pyspark/worker.py", line 676, in process
    out_iter = func(split_index, iterator)
  File "/data/dataset/fengwen/script/data/spark-3.3.1-bin-hadoop3-scala2.13/python/pyspark/rdd.py", line 3472, in pipeline_func
    return func(split, prev_func(split, iterator))
  File "/data/dataset/fengwen/script/data/spark-3.3.1-bin-hadoop3-scala2.13/python/pyspark/rdd.py", line 3472, in pipeline_func


Py4JJavaError: An error occurred while calling o1317.saveAsTextFile.
: org.apache.spark.SparkException: Job aborted.
	at org.apache.spark.internal.io.SparkHadoopWriter$.write(SparkHadoopWriter.scala:106)
	at org.apache.spark.rdd.PairRDDFunctions.$anonfun$saveAsHadoopDataset$1(PairRDDFunctions.scala:1091)
	at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.scala:18)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
	at org.apache.spark.rdd.RDD.withScope(RDD.scala:406)
	at org.apache.spark.rdd.PairRDDFunctions.saveAsHadoopDataset(PairRDDFunctions.scala:1089)
	at org.apache.spark.rdd.PairRDDFunctions.$anonfun$saveAsHadoopFile$4(PairRDDFunctions.scala:1062)
	at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.scala:18)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
	at org.apache.spark.rdd.RDD.withScope(RDD.scala:406)
	at org.apache.spark.rdd.PairRDDFunctions.saveAsHadoopFile(PairRDDFunctions.scala:1027)
	at org.apache.spark.rdd.PairRDDFunctions.$anonfun$saveAsHadoopFile$3(PairRDDFunctions.scala:1009)
	at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.scala:18)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
	at org.apache.spark.rdd.RDD.withScope(RDD.scala:406)
	at org.apache.spark.rdd.PairRDDFunctions.saveAsHadoopFile(PairRDDFunctions.scala:1008)
	at org.apache.spark.rdd.PairRDDFunctions.$anonfun$saveAsHadoopFile$2(PairRDDFunctions.scala:965)
	at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.scala:18)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
	at org.apache.spark.rdd.RDD.withScope(RDD.scala:406)
	at org.apache.spark.rdd.PairRDDFunctions.saveAsHadoopFile(PairRDDFunctions.scala:963)
	at org.apache.spark.rdd.RDD.$anonfun$saveAsTextFile$2(RDD.scala:1599)
	at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.scala:18)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
	at org.apache.spark.rdd.RDD.withScope(RDD.scala:406)
	at org.apache.spark.rdd.RDD.saveAsTextFile(RDD.scala:1599)
	at org.apache.spark.rdd.RDD.$anonfun$saveAsTextFile$1(RDD.scala:1585)
	at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.scala:18)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
	at org.apache.spark.rdd.RDD.withScope(RDD.scala:406)
	at org.apache.spark.rdd.RDD.saveAsTextFile(RDD.scala:1585)
	at org.apache.spark.api.java.JavaRDDLike.saveAsTextFile(JavaRDDLike.scala:564)
	at org.apache.spark.api.java.JavaRDDLike.saveAsTextFile$(JavaRDDLike.scala:563)
	at org.apache.spark.api.java.AbstractJavaRDDLike.saveAsTextFile(JavaRDDLike.scala:45)
	at sun.reflect.GeneratedMethodAccessor77.invoke(Unknown Source)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:182)
	at py4j.ClientServerConnection.run(ClientServerConnection.java:106)
	at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 112.0 failed 1 times, most recent failure: Lost task 0.0 in stage 112.0 (TID 926) (192.168.1.27 executor driver): org.apache.spark.SparkException: Task failed while writing rows
	at org.apache.spark.internal.io.SparkHadoopWriter$.executeTask(SparkHadoopWriter.scala:163)
	at org.apache.spark.internal.io.SparkHadoopWriter$.$anonfun$write$1(SparkHadoopWriter.scala:88)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
	at org.apache.spark.scheduler.Task.run(Task.scala:136)
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:548)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1504)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:551)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.spark.api.python.PythonException: Traceback (most recent call last):
  File "/data/dataset/fengwen/script/data/spark-3.3.1-bin-hadoop3-scala2.13/python/lib/pyspark.zip/pyspark/worker.py", line 686, in main
    process()
  File "/data/dataset/fengwen/script/data/spark-3.3.1-bin-hadoop3-scala2.13/python/lib/pyspark.zip/pyspark/worker.py", line 676, in process
    out_iter = func(split_index, iterator)
  File "/data/dataset/fengwen/script/data/spark-3.3.1-bin-hadoop3-scala2.13/python/pyspark/rdd.py", line 3472, in pipeline_func
    return func(split, prev_func(split, iterator))
  File "/data/dataset/fengwen/script/data/spark-3.3.1-bin-hadoop3-scala2.13/python/pyspark/rdd.py", line 3472, in pipeline_func
    return func(split, prev_func(split, iterator))
  File "/data/dataset/fengwen/script/data/spark-3.3.1-bin-hadoop3-scala2.13/python/pyspark/rdd.py", line 3472, in pipeline_func
    return func(split, prev_func(split, iterator))
  [Previous line repeated 1 more time]
  File "/data/dataset/fengwen/script/data/spark-3.3.1-bin-hadoop3-scala2.13/python/pyspark/rdd.py", line 540, in func
    return f(iterator)
  File "/data/dataset/fengwen/script/data/spark-3.3.1-bin-hadoop3-scala2.13/python/pyspark/rdd.py", line 985, in sortPartition
    return iter(sort(iterator, key=lambda kv: keyfunc(kv[0]), reverse=(not ascending)))
  File "/data/dataset/fengwen/script/data/spark-3.3.1-bin-hadoop3-scala2.13/python/lib/pyspark.zip/pyspark/shuffle.py", line 491, in sorted
    chunk = list(itertools.islice(iterator, batch))
  File "/data/dataset/fengwen/script/data/spark-3.3.1-bin-hadoop3-scala2.13/python/lib/pyspark.zip/pyspark/util.py", line 81, in wrapper
    return f(*args, **kwargs)
  File "/data/dataset/fengwen/script/data/spark-3.3.1-bin-hadoop3-scala2.13/python/pyspark/rdd.py", line 2869, in <lambda>
    return self.map(lambda x: (f(x), x))
  File "/tmp/ipykernel_3569009/3965613699.py", line 14, in <lambda>
  File "/tmp/ipykernel_3569009/3965613699.py", line 6, in calculate_hash
TypeError: Unicode-objects must be encoded before hashing

	at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.handlePythonException(PythonRunner.scala:559)
	at org.apache.spark.api.python.PythonRunner$$anon$3.read(PythonRunner.scala:765)
	at org.apache.spark.api.python.PythonRunner$$anon$3.read(PythonRunner.scala:747)
	at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:512)
	at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
	at scala.collection.Iterator$$anon$9.hasNext(Iterator.scala:576)
	at scala.collection.Iterator$$anon$9.hasNext(Iterator.scala:576)
	at org.apache.spark.internal.io.SparkHadoopWriter$.$anonfun$executeTask$1(SparkHadoopWriter.scala:136)
	at org.apache.spark.util.Utils$.tryWithSafeFinallyAndFailureCallbacks(Utils.scala:1538)
	at org.apache.spark.internal.io.SparkHadoopWriter$.executeTask(SparkHadoopWriter.scala:135)
	... 9 more

Driver stacktrace:
	at org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:2672)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:2608)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:2607)
	at scala.collection.immutable.List.foreach(List.scala:333)
	at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2607)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1(DAGScheduler.scala:1182)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1$adapted(DAGScheduler.scala:1182)
	at scala.Option.foreach(Option.scala:437)
	at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:1182)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2860)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2802)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2791)
	at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
	at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:952)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2228)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2249)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2281)
	at org.apache.spark.internal.io.SparkHadoopWriter$.write(SparkHadoopWriter.scala:83)
	... 50 more
Caused by: org.apache.spark.SparkException: Task failed while writing rows
	at org.apache.spark.internal.io.SparkHadoopWriter$.executeTask(SparkHadoopWriter.scala:163)
	at org.apache.spark.internal.io.SparkHadoopWriter$.$anonfun$write$1(SparkHadoopWriter.scala:88)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
	at org.apache.spark.scheduler.Task.run(Task.scala:136)
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:548)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1504)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:551)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	... 1 more
Caused by: org.apache.spark.api.python.PythonException: Traceback (most recent call last):
  File "/data/dataset/fengwen/script/data/spark-3.3.1-bin-hadoop3-scala2.13/python/lib/pyspark.zip/pyspark/worker.py", line 686, in main
    process()
  File "/data/dataset/fengwen/script/data/spark-3.3.1-bin-hadoop3-scala2.13/python/lib/pyspark.zip/pyspark/worker.py", line 676, in process
    out_iter = func(split_index, iterator)
  File "/data/dataset/fengwen/script/data/spark-3.3.1-bin-hadoop3-scala2.13/python/pyspark/rdd.py", line 3472, in pipeline_func
    return func(split, prev_func(split, iterator))
  File "/data/dataset/fengwen/script/data/spark-3.3.1-bin-hadoop3-scala2.13/python/pyspark/rdd.py", line 3472, in pipeline_func
    return func(split, prev_func(split, iterator))
  File "/data/dataset/fengwen/script/data/spark-3.3.1-bin-hadoop3-scala2.13/python/pyspark/rdd.py", line 3472, in pipeline_func
    return func(split, prev_func(split, iterator))
  [Previous line repeated 1 more time]
  File "/data/dataset/fengwen/script/data/spark-3.3.1-bin-hadoop3-scala2.13/python/pyspark/rdd.py", line 540, in func
    return f(iterator)
  File "/data/dataset/fengwen/script/data/spark-3.3.1-bin-hadoop3-scala2.13/python/pyspark/rdd.py", line 985, in sortPartition
    return iter(sort(iterator, key=lambda kv: keyfunc(kv[0]), reverse=(not ascending)))
  File "/data/dataset/fengwen/script/data/spark-3.3.1-bin-hadoop3-scala2.13/python/lib/pyspark.zip/pyspark/shuffle.py", line 491, in sorted
    chunk = list(itertools.islice(iterator, batch))
  File "/data/dataset/fengwen/script/data/spark-3.3.1-bin-hadoop3-scala2.13/python/lib/pyspark.zip/pyspark/util.py", line 81, in wrapper
    return f(*args, **kwargs)
  File "/data/dataset/fengwen/script/data/spark-3.3.1-bin-hadoop3-scala2.13/python/pyspark/rdd.py", line 2869, in <lambda>
    return self.map(lambda x: (f(x), x))
  File "/tmp/ipykernel_3569009/3965613699.py", line 14, in <lambda>
  File "/tmp/ipykernel_3569009/3965613699.py", line 6, in calculate_hash
TypeError: Unicode-objects must be encoded before hashing

	at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.handlePythonException(PythonRunner.scala:559)
	at org.apache.spark.api.python.PythonRunner$$anon$3.read(PythonRunner.scala:765)
	at org.apache.spark.api.python.PythonRunner$$anon$3.read(PythonRunner.scala:747)
	at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:512)
	at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
	at scala.collection.Iterator$$anon$9.hasNext(Iterator.scala:576)
	at scala.collection.Iterator$$anon$9.hasNext(Iterator.scala:576)
	at org.apache.spark.internal.io.SparkHadoopWriter$.$anonfun$executeTask$1(SparkHadoopWriter.scala:136)
	at org.apache.spark.util.Utils$.tryWithSafeFinallyAndFailureCallbacks(Utils.scala:1538)
	at org.apache.spark.internal.io.SparkHadoopWriter$.executeTask(SparkHadoopWriter.scala:135)
	... 9 more


time: 1.51 s (started: 2022-11-16 01:54:22 +00:00)


In [None]:

convert_chunked_data(libsvm_train_template, vw_train_template, [sample_name(sample) for sample in train_samples], load_libsvm_rdd, libsvm_to_vw, transform_rdd=random_sort)

Spark is no longer needed:

In [None]:
spark.stop()

### Local data
[_(back to toc)_](#Table-of-contents)

Download all sampled data to local directory:

In [None]:
ensure_directory_exists(local_data_path)

In [None]:
def count_lines(path):
    lines = 0
    with open(path) as f:
        for i, _ in enumerate(f):
            lines = i
        print("lines",lines)
        print("path",path)
        return lines + 1

def download_data(remote_template, local_template, samples):
    for sample in samples:
        name = sample_name(sample)
        remote_path = remote_template.format(name)
        local_path = local_template.format(name)
        if os.path.exists(local_path):
            count = count_lines(local_path)
            if count == sample:
                logger.info('File "%s" is already loaded, skipping.', local_path)
                continue
            else:
                logger.info('File "%s" already exists but number of lines "%s" is wrong (must be "%s"), reloading.', local_path, count, sample)
        logger.info('Loading file "%s" as local file "%s".', remote_path, local_path)
        hdfs_get(remote_path, local_path)
        count = count_lines(local_path)
        logger.info('File loaded to "%s", number of lines is "%s".', local_path, count)
        assert count == sample, 'File "{}" contains wrong number of lines "{}" (must be "{}").'.format(local_path, count, sample)

In [None]:
download_data(libsvm_test_template, local_libsvm_test_template, test_samples)

In [None]:
download_data(libsvm_train_template, local_libsvm_train_template, train_samples)

In [None]:
download_data(vw_test_template, local_vw_test_template, test_samples)

In [None]:
download_data(vw_train_template, local_vw_train_template, train_samples)

## Local training
[_(back to toc)_](#Table-of-contents)

Measuring model quality and ML engine technical metrics:

In [None]:
import sys 
from matplotlib import pyplot
from sklearn.metrics import (
    auc,
    log_loss,
    roc_curve,
)


def measure(engine, sample, test_file, time_file, predictions_file):
    
    def get_last_in_line(s):
        return s.rstrip().split( )[-1]

    def parse_elapsed_time(s):
        return reduce(lambda a, b: a * 60 + b, map(float, get_last_in_line(s).split(':')))

    def parse_max_memory(s):
        return int(get_last_in_line(s)) * 1024

    def parse_cpu(s):
        return float(get_last_in_line(s).rstrip('%')) / 100 


    elapsed = -1
    memory = -1
    cpu = -1

    with open(time_file, 'rb') as f:
        for line in f:
            if 'Elapsed (wall clock) time' in line:
                elapsed = parse_elapsed_time(line)
            elif 'Maximum resident set size' in line:
                memory = parse_max_memory(line)
            elif 'Percent of CPU' in line:
                cpu = parse_cpu(line)

    with open(test_file, 'rb') as f:
        labels = [line.rstrip().split(' ')[0] == '1' for line in f]

    with open(predictions_file, 'rb') as f:
        scores = [float(line.rstrip().split(' ')[0]) for line in f]

    fpr, tpr, _ = roc_curve(labels, scores)
    roc_auc = auc(fpr, tpr)
    ll = log_loss(labels, scores)
    
    figure = pyplot.figure(figsize=(6, 6))
    pyplot.plot(fpr, tpr, linewidth=2.0)
    pyplot.plot([0, 1], [0, 1], 'k--')
    pyplot.xlabel('FPR')
    pyplot.ylabel('TPR')
    pyplot.title('{} {} - {:.3f} ROC AUC'.format(engine, sample_name(sample), roc_auc))
    pyplot.show()

    return {
        'Engine': engine,
        'Train size': sample,
        'ROC AUC': roc_auc,
        'Log loss': ll,
        'Train time': elapsed,
        'Maximum memory': memory,
        'CPU load': cpu,
    }

Settings for VW & XGBoost and how to run them; I use (a little bit patched for correctness sake) GNU Time to measure running time, CPU load and memory consumption; configurations for VW & XGBoost are obtained via Hyperopt:

In [None]:
def get_time_command_and_file(train_file):
    time_file = train_file + '.time'
    return [
        '/usr/local/bin/time',
        '-v',
        '--output=' + time_file,
    ], time_file

def get_vw_commands_and_predictions_file(train_file, test_file):
    model_file = train_file + '.model'
    predictions_file = test_file + '.predictions'
    return [
        'vw83',
        '--link=logistic',
        '--loss_function=logistic',
        '-b', '29',
        '-l', '0.3',
        '--initial_t', '1',
        '--decay_learning_rate', '0.5',
        '--power_t', '0.5',
        '--l1', '1e-15',
        '--l2', '0',
        '-d', train_file,
        '-f', model_file,
    ], [
        'vw83',
        '--loss_function=logistic',
        '-t',
        '-i', model_file,
        '-d', test_file,
        '-p', predictions_file,
    ], predictions_file


xgboost_conf = [
    'booster = gbtree',
    'objective = binary:logistic',
    'nthread = 24',
    'eval_metric = logloss',
    'max_depth = 7',
    'num_round = 200',
    'eta = 0.2',
    'gamma = 0.4',
    'subsample = 0.8',
    'colsample_bytree = 0.8',
    'min_child_weight = 20',
    'alpha = 3',
    'lambda = 100',
]


def get_xgboost_commands_and_predictions_file(train_file, test_file, cache=False):
    config_file = os.path.join(local_runtime_path, 'xgb.conf')
    ensure_directory_exists(local_runtime_path)
    with open(config_file, 'wb') as f:
        for line in xgboost_conf:
            print(line, file=f)
    model_file = train_file + '.model'
    predictions_file = test_file + '.predictions'
    if cache:
        train_file = train_file + '#' + train_file + '.cache'
    return [
        'xgboost',
        config_file,
        'data=' + train_file,
        'model_out=' + model_file,
    ], [
        'xgboost',
        config_file,
        'task=pred',
        'test:data=' + test_file,
        'model_in=' + model_file,
        'name_pred=' + predictions_file,
    ], predictions_file

def get_xgboost_ooc_commands_and_predictions_file(train_file, test_file):
    return get_xgboost_commands_and_predictions_file(train_file, test_file, cache=True)

In [None]:
engines = {
    'vw': (get_vw_commands_and_predictions_file, local_vw_train_template, local_vw_test_template),
    'xgb': (get_xgboost_commands_and_predictions_file, local_libsvm_train_template, local_libsvm_test_template),
    'xgb.ooc': (get_xgboost_ooc_commands_and_predictions_file, local_libsvm_train_template, local_libsvm_test_template),
}

Train & test everything:

In [None]:
import subprocess


measurements = []

for sample in train_samples:
    for engine in engines:
        logger.info('Training "%s" on "%s" lines of data.', engine, sample)
        
        get_commands_and_predictions_file, train_template, test_template = engines[engine]

        train_file = train_template.format(sample_name(sample))
        test_file = test_template.format(sample_name(test_samples[0]))
        logger.info('Will train on "%s" and test on "%s".', train_file, test_file)

        command_time, time_file = get_time_command_and_file(train_file)
        command_engine_train, command_engine_test, predictions_file = get_commands_and_predictions_file(train_file, test_file)

        logger.info('Performing train.')
        subprocess.call(command_time + command_engine_train)

        logger.info('Performing test.')
        subprocess.call(command_engine_test)

        logger.info('Measuring results.')
        measurement = measure(engine, sample, test_file, time_file, predictions_file)
        logger.info(measurement)
        measurements.append(measurement)

Load measurements:

In [None]:
import pandas


measurements_df = pandas.DataFrame(measurements).sort_values(by=['Engine', 'Train size'])
measurements_df

Plot measurements:

In [None]:
def extract_data_for_plotting(df, what):
    return reduce(
        lambda left, right: pandas.merge(left, right, how='outer', on='Train size'),
        map(
            lambda name: df[df.Engine == name][['Train size', what]].rename(columns={what: name}),
            df.Engine.unique(),
        ),
    )   

def plot_stuff(df, what, ylabel=None, **kwargs):
    data = extract_data_for_plotting(df, what).set_index('Train size')
    ax = data.plot(marker='o', figsize=(6, 6), title=what, grid=True, linewidth=2.0, **kwargs)  # xlim=(1e4, 4e9)
    if ylabel is not None:
        ax.set_ylabel(ylabel)


plot_stuff(measurements_df, 'ROC AUC', logx=True)
plot_stuff(measurements_df, 'Log loss', logx=True)
plot_stuff(measurements_df, 'Train time', loglog=True, ylabel='s')
plot_stuff(measurements_df, 'Maximum memory', loglog=True, ylabel='bytes')
plot_stuff(measurements_df, 'CPU load', logx=True)