In [None]:
!pip install --upgrade pip -q
!pip install progressbar -q
!pip install memory_profiler -q
!pip install --upgrade pandas>=1.2 -q

[?25l[K     |▏                               | 10kB 20.4MB/s eta 0:00:01[K     |▍                               | 20kB 21.7MB/s eta 0:00:01[K     |▋                               | 30kB 11.5MB/s eta 0:00:01[K     |▉                               | 40kB 9.1MB/s eta 0:00:01[K     |█                               | 51kB 7.3MB/s eta 0:00:01[K     |█▎                              | 61kB 7.2MB/s eta 0:00:01[K     |█▌                              | 71kB 8.0MB/s eta 0:00:01[K     |█▊                              | 81kB 8.3MB/s eta 0:00:01[K     |██                              | 92kB 8.0MB/s eta 0:00:01[K     |██▏                             | 102kB 8.2MB/s eta 0:00:01[K     |██▍                             | 112kB 8.2MB/s eta 0:00:01[K     |██▌                             | 122kB 8.2MB/s eta 0:00:01[K     |██▊                             | 133kB 8.2MB/s eta 0:00:01[K     |███                             | 143kB 8.2MB/s eta 0:00:01[K     |███▏                    

In [None]:
%load_ext memory_profiler

In [None]:
import urllib
import tarfile
import os
from collections import OrderedDict
import warnings
import random

import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import seaborn as sns
import progressbar

from sklearn.linear_model import SGDClassifier
from sklearn.preprocessing import KBinsDiscretizer
from sklearn.feature_extraction import FeatureHasher
from sklearn import linear_model
from sklearn.metrics import roc_auc_score, log_loss

## Download Criteo  Display Advertising Challenge dataset

In [None]:
# ProgressBar borrowed from https://stackoverflow.com/a/53643011/2015762
class ProgressBar():
    def __init__(self):
        self.pbar = None

    def __call__(self, block_num, block_size, total_size):
        if not self.pbar:
            self.pbar=progressbar.ProgressBar(maxval=total_size)
            self.pbar.start()

        downloaded = block_num * block_size
        if downloaded < total_size:
            self.pbar.update(downloaded)
        else:
            self.pbar.finish()


def download_dataset(dataset_url, dataset_folder_path, compressed_dataset_path):
    # Download dataset
    os.makedirs(dataset_folder_path, exist_ok=True)
    urllib.request.urlretrieve(dataset_url, compressed_dataset_path, ProgressBar())

def extract_dataset(compressed_dataset_path, dataset_folder_path, dataset_path):
    # Extract train.txt (dataset with labels) and readme
    with tarfile.open(compressed_dataset_path, "r") as input_file:
        input_file.extract('readme.txt', dataset_folder_path)
        input_file.extract('train.txt', dataset_folder_path)
        os.rename(os.path.join(dataset_folder_path, 'train.txt'), dataset_path)

In [None]:
dataset_url = "https://criteostorage.blob.core.windows.net/criteo-research-datasets/kaggle-display-advertising-challenge-dataset.tar.gz"
dataset_folder_path = os.path.abspath('sync/data/criteo_dataset')
compressed_dataset_path = os.path.join(dataset_folder_path, "criteo_dataset.tar.gz")
dataset_path = os.path.join(dataset_folder_path, "criteo_dataset.txt")

if not os.path.exists(compressed_dataset_path):
    download_dataset(dataset_url, dataset_folder_path, compressed_dataset_path)

if not os.path.exists(dataset_path):
    extract_dataset(compressed_dataset_path, dataset_folder_path, dataset_path)




Quick look at the files we have downloaded.

Within iPython notebook, we can execute bash command by prepending the cell with `!` and insert python variable into it with `{}`

In [None]:
!ls -alh {dataset_folder_path}

total 15G
drwxr-xr-x 2 root      root  4.0K Mar  5 14:54 .
drwxr-xr-x 3 root      root  4.0K Mar  5 14:44 ..
-rw-r--r-- 1 root      root  4.3G Mar  5 14:48 criteo_dataset.tar.gz
-rw-r--r-- 1 293604138 staff  11G May 12  2014 criteo_dataset.txt
-rw-r--r-- 1 293604138 staff 1.9K Aug 22  2014 readme.txt


In [None]:
!cat {dataset_folder_path}/readme.txt

        ------ Display Advertising Challenge ------

Dataset: dac-v1

This dataset contains feature values and click feedback for millions of display 
ads. Its purpose is to benchmark algorithms for clickthrough rate (CTR) prediction.
It has been used for the Display Advertising Challenge hosted by Kaggle:
https://www.kaggle.com/c/criteo-display-ad-challenge/


Full description:

This dataset contains 2 files:
  train.txt
  test.txt
corresponding to the training and test parts of the data. 


Dataset construction:

The training dataset consists of a portion of Criteo's traffic over a period
of 7 days. Each row corresponds to a display ad served by Criteo and the first
column is indicates whether this ad has been clicked or not.
The positive (clicked) and negatives (non-clicked) examples have both been
subsampled (but at different rates) in order to reduce the dataset size.

There are 13 features taking integer values (mostly count features) and 26
categorical features. The values of th

In [None]:
label_columns = #
integer_features = #
categorical_features = #
columns = label_columns + integer_features + categorical_features

SyntaxError: ignored

In [None]:
pd.read_csv(dataset_path, nrows=5, header=None, sep='\t', names=columns)

## Reading data with memory constraints

We first create a toy dataset with "only" 1 million rows (out of 45 millions)

In [None]:
toy_dataset_path = os.path.join(dataset_folder_path, "criteo_toy_dataset.txt")

In [None]:
!head -n 1000000 {dataset_path} > {toy_dataset_path}

Let's say we want to perform a basic operation: estimate the number of positive samples within the data

### Basic approach

In [None]:
def compute_positive_label_proportion(dataset_path, columns):
    df = pd.read_csv(dataset_path, sep="\t", header=None, names=columns)
    return df['label'].mean()

Let's measure its memory footprint with the `%%memit` magic function

In [None]:
%%memit
positive_label_proportion = compute_positive_label_proportion(toy_dataset_path, columns)
print('positive_label_proportion', positive_label_proportion)

What would happen if you run the same function on a 45 times bigger dataset ?

You can give a try with `compute_positive_label_proportion(dataset_path, columns)`... at your own risks.

### Specifying column types
We can help pandas by specifying the column types to be used such that it does not need to infer it. Do so with the parameter dtype of pd.read_csv: https://pandas.pydata.org/pandas-docs/stable/reference/api/pandas.read_csv.html.

In [None]:
col_types = OrderedDict()
# Fill col_types here

def compute_positive_label_proportion_with_dtype(dataset_path, columns, col_types):
    # Read csv with dtype and return positive_label_proportion
    pass

In [None]:
%%memit
positive_label_proportion = compute_positive_label_proportion_with_dtype(toy_dataset_path, columns, col_types)
print('positive_label_proportion', positive_label_proportion)

### Reading data by chunks
We can control the amount of memory we need by loading only a small chunk of the data and processing it before moving to the next chunk.

See documentation at https://pandas.pydata.org/pandas-docs/stable/user_guide/io.html#iterating-through-files-chunk-by-chunk

```
with pd.read_csv(..., chunksize=10, nrows=100) as reader:
    for chunk in reader:
        print(chunk)
```

In [None]:
def compute_positive_label_proportion_with_dtype_and_chunksize(dataset_path, columns, col_types, chunksize):
    # Read csv with dtype and chunksize and return positive_label_proportion
    pass

In [None]:
%%memit
positive_label_proportion = compute_positive_label_proportion_with_dtype_and_chunksize(toy_dataset_path, columns, col_types, 100_000)
print('positive_label_proportion', positive_label_proportion)

This can now be applied to the full dataset with no memory issue.

In [None]:
%%memit
positive_label_proportion = compute_positive_label_proportion_with_dtype_and_chunksize(dataset_path, columns, col_types, 100_000)
print('positive_label_proportion', positive_label_proportion)

## Training and evaluation

### Split train and test datasets
Since the datasets contain one line per example, we can split them into train and test by simply iterating over the lines. For each line in the original dataset: write it to the test data set with a probability p and write it to the train dataset with a probability 1 - p.

In [None]:
def split_train_test(full_dataset_path, train_dataset_path, test_dataset_path, test_ratio, seed=302984, print_every=None):
    random.seed(seed)
    pass
        
train_dataset_path = os.path.join(dataset_folder_path, "criteo_train_dataset.txt")
test_dataset_path = os.path.join(dataset_folder_path, "criteo_test_dataset.txt")

In [None]:
if not os.path.exists(train_dataset_path) or not os.path.exists(test_dataset_path):
    split_train_test(dataset_path, train_dataset_path, test_dataset_path, test_ratio=0.1, print_every=10_000_000)

In [None]:
!wc -l {test_dataset_path}

### Shuffling
The convergence guarantees of SGD rely on the fact that the observations come at random. Hence, shuffling between epochs is important.

First result of "How to shuffle a file that is too big for memory" on Google: https://stackoverflow.com/a/40814865/2015762

Note that quicker pseudo-shuffling strategies exists, but this fits our "Big data on your laptop" problematic.

In [None]:
!awk 'BEGIN{srand();} {printf "%06d %s\n", rand()*1000000, $0;}' /databricks/driver/sync/data/criteo_dataset/criteo_test_dataset.txt | sort -n | cut -c8- > /databricks/driver/sync/data/criteo_dataset/criteo_test_dataset_shuffled.txt
# We can run it on the train dataset too but let'ss skip it since it is quite long
# !awk 'BEGIN{srand();} {printf "%06d %s\n", rand()*1000000, $0;}' /databricks/driver/sync/data/criteo_dataset/criteo_train_dataset.txt | sort -n | cut -c8- > /databricks/driver/sync/data/criteo_dataset/criteo_train_dataset_shuffled.txt

### Training
In order to train a logistic model on chunks of data, we will use scikit-learn `SGDClassifier` (https://scikit-learn.org/stable/modules/generated/sklearn.linear_model.SGDClassifier.html) and train for its `log` loss with its `partial_fit` method.
We can now apply the previous data processing pipeline and add the training to obtain a trained classifier.

In [None]:
#  To begin with, let's not do any preprocessing and deal with "ready to use" continuous features only
def preprocess_data(chunk, integer_features, categorical_features):
    return chunk[integer_features].fillna(-1)

In [None]:
max_training_steps = 1_000
chunk_size = 1_000
print_every = 100

# 1. Read train data by chunks
# 2. Apply preprocess_data to return the continous features
# 3. Train classifier on this chunk
# 4. Stop after `max_training_steps`

classifier = SGDClassifier(...)

### Testing
Let's evaluate the performances of the trained classifier. We should iterate over the test dataset and evaluate the labels predicted by the classifier with `roc_auc_score` and `log_loss`.

In [None]:
max_testing_steps = 100
chunk_size = 1_000
print_every = 50

# 1. Read test data by chunks
# 2. Apply preprocess_data to return the continous features
# 3. Predict labels with classifiers
# 4. Compute AUC score and Log loss for this chunk
# 5. Stop after `max_testing_steps`
# 6. Return averaged values of the metrics

## Data preprocessing

### Continuous features
A smart way to deal with continuous features (counting integer features are part of them), consists in transforming them into categorical features through a quantile transformation. To do so we will use scikit-learn KBinsDiscretizer : https://scikit-learn.org/stable/modules/generated/sklearn.preprocessing.KBinsDiscretizer.html.

It can be used as following
```
df = pd.DataFrame({'col_1': np.random.normal(size=1000), 'col_2': np.random.poisson(lam=1, size=1000)})
bucketizer = KBinsDiscretizer(n_bins=20, encode='ordinal')
bucketizer.fit(df)
df_bucketized = pd.DataFrame(bucketizer.transform(df), columns=[f'{col}_bucketized' for col in df.columns], index=df.index)
sns.jointplot(data=pd.concat((df, df_bucketized), axis=1), x="col_1", y="col_1_bucketized")
```

1. Create a `KBinsDiscretizer` and train it on the first chunk of the dataset
1. Update `preprocess_data` to add a bucketize step to the training pipeline. What happens if you change the `encode` parameter?
1. Do not forget to deal with missing values, you do not want to carry on NaNs. You can for example replace them with -1.

In [None]:
bucketizer = KBinsDiscretizer(...)

def preprocess_data(chunk, integer_features, categorical_features):
    # To Update
    return 

# To test your bucketization
preprocess_data(chunk, integer_features, categorical_features)

### Categorical features
For categorical features we will implement the hashing trick using scikit-learn FeatureHasher: https://scikit-learn.org/stable/modules/generated/sklearn.feature_extraction.FeatureHasher.html

It can be used as following
```
df = pd.DataFrame({'col_1': np.random.choice(['a', 'b', 'c'], size=100), 'col_2': np.random.poisson(size=100)})
hasher = FeatureHasher(n_features=2**16, input_type="dict")
hasher.transform((row._asdict() for row in df.itertuples(index=False)))
```
Again, you will probably want to ensurer you get rid of the NaNs. What value could you set for these?

Let's improve our previous pipeline and apply the hashing trick to the categorical features **and** to the bucketized continuous features (have a look at `pd.concat`).

In [None]:
hasher = FeatureHasher(...)

def preprocess_data(chunk, integer_features, categorical_features):
    # To Update to both bucketize and then hash
    return 

# To test your preprocessing
preprocess_data(chunk, integer_features, categorical_features)