## Data load

In [1]:
import os

if not os.path.exists('data/transactions_train.csv'):
    ! mkdir -p data
    ! curl -OL https://storage.yandexcloud.net/di-datasets/age-prediction-nti-sbebank-2019.zip
    ! unzip -j -o age-prediction-nti-sbebank-2019.zip 'data/*.csv' -d data
    ! mv age-prediction-nti-sbebank-2019.zip data/

# Setup

In [2]:
%load_ext autoreload
%autoreload 2

import logging
import torch
import pytorch_lightning as pl
# import warnings

# warnings.filterwarnings('ignore')
# logging.getLogger("pytorch_lightning").setLevel(logging.ERROR)

## Pyspark Data Preproccessing

In [3]:
import os
import pyspark
from pyspark.sql import SparkSession
import pyspark.sql.functions as F
from pyspark.sql import types as T


data_path = 'data/'

spark_conf = pyspark.SparkConf()
spark_conf.setMaster("local[*]").setAppName("PysparkDataPreprocessor")
spark_conf.set("spark.driver.maxResultSize", "4g")
spark_conf.set("spark.executor.memory", "16g")
spark_conf.set("spark.executor.memoryOverhead", "4g")
spark_conf.set("spark.driver.memory", "16g")
spark_conf.set("spark.driver.memoryOverhead", "4g")
spark_conf.set("spark.cores.max", "24")
spark_conf.set("spark.sql.shuffle.partitions", "200")
spark_conf.set("spark.local.dir", "../../spark_local_dir")


spark = SparkSession.builder.config(conf=spark_conf).getOrCreate()
spark.sparkContext.getConf().getAll()

22/06/23 11:37:26 WARN Utils: Your hostname, vm2 resolves to a loopback address: 127.0.1.1; using 192.168.0.6 instead (on interface ens192)
22/06/23 11:37:26 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
22/06/23 11:37:26 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
22/06/23 11:37:26 WARN SparkConf: Note that spark.local.dir will be overridden by the value set by the cluster manager (via SPARK_LOCAL_DIRS in mesos/standalone/kubernetes and LOCAL_DIRS in YARN).


[('spark.driver.host', '192.168.0.6'),
 ('spark.driver.memoryOverhead', '4g'),
 ('spark.app.id', 'local-1655984247654'),
 ('spark.driver.port', '44961'),
 ('spark.sql.warehouse.dir',
  'file:/home/kireev/pycharm-deploy/pytorch-lifestream/demo/spark-warehouse'),
 ('spark.local.dir', '../../spark_local_dir'),
 ('spark.executor.id', 'driver'),
 ('spark.driver.memory', '16g'),
 ('spark.executor.memory', '16g'),
 ('spark.rdd.compress', 'True'),
 ('spark.serializer.objectStreamReset', '100'),
 ('spark.app.startTime', '1655984246801'),
 ('spark.sql.shuffle.partitions', '200'),
 ('spark.master', 'local[*]'),
 ('spark.submit.pyFiles', ''),
 ('spark.submit.deployMode', 'client'),
 ('spark.app.name', 'PysparkDataPreprocessor'),
 ('spark.cores.max', '24'),
 ('spark.ui.showConsoleProgress', 'true'),
 ('spark.driver.maxResultSize', '4g'),
 ('spark.executor.memoryOverhead', '4g')]

In [4]:
source_data = spark.read.options(header=True, inferSchema=True).csv(os.path.join(data_path, 'transactions_train.csv'))
source_data.show(2)

                                                                                

+---------+----------+-----------+----------+
|client_id|trans_date|small_group|amount_rur|
+---------+----------+-----------+----------+
|    33172|         6|          4|    71.463|
|    33172|         6|         35|    45.017|
+---------+----------+-----------+----------+
only showing top 2 rows



In [5]:
from ptls.preprocessing import PysparkDataPreprocessor

preprocessor = PysparkDataPreprocessor(
    col_id='client_id',
    cols_event_time='trans_date',
    time_transformation='float',
    cols_category=["trans_date", "small_group"],
    cols_log_norm=["amount_rur"],
    cols_identity=[],
    print_dataset_info=False,
)

In [6]:
%%time

dataset_pysparkdf = preprocessor.fit_transform(source_data).persist()
dataset_pysparkdf.count()

22/06/23 11:37:35 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
22/06/23 11:37:35 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
22/06/23 11:37:38 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
22/06/23 11:37:38 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
22/06/23 11:37:38 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
22/06/23 11:37:38 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
22/06/23 1

CPU times: user 296 ms, sys: 120 ms, total: 416 ms
Wall time: 24.6 s


                                                                                

30000

In [7]:
dataset_pysparkdf.show()

+---------+--------------------+--------------------+--------------------+--------------------+
|client_id|          trans_date|         small_group|          amount_rur|          event_time|
+---------+--------------------+--------------------+--------------------+--------------------+
|      463|[726, 724, 725, 7...|[7, 1, 7, 3, 3, 2...|[0.32135927726085...|[1.0, 2.0, 5.0, 7...|
|      471|[730, 726, 726, 7...|[68, 5, 4, 7, 1, ...|[0.26422890225828...|[0.0, 1.0, 1.0, 1...|
|      496|[723, 723, 722, 7...|[3, 1, 1, 1, 1, 4...|[0.27119266729746...|[3.0, 3.0, 4.0, 5...|
|      833|[726, 726, 726, 7...|[17, 15, 1, 49, 3...|[0.28922101008224...|[1.0, 1.0, 1.0, 2...|
|     1238|[730, 723, 716, 7...|[3, 11, 3, 3, 1, ...|[0.13326234667635...|[0.0, 3.0, 9.0, 1...|
|     1342|[730, 722, 702, 7...|[14, 3, 3, 21, 37...|[0.11547227625749...|[0.0, 4.0, 11.0, ...|
|     1591|[724, 724, 723, 7...|[37, 20, 7, 6, 1,...|[0.35099852868535...|[2.0, 2.0, 3.0, 3...|
|     1645|[724, 723, 723, 7...|[2, 16, 

In [8]:
dataset_pysparkdf.dtypes

[('client_id', 'int'),
 ('trans_date', 'array<int>'),
 ('small_group', 'array<int>'),
 ('amount_rur', 'array<double>'),
 ('event_time', 'array<float>')]

In [9]:
test_df = dataset_pysparkdf.sample(fraction=0.2)
train_df = dataset_pysparkdf.subtract(test_df)

valid_df = train_df.sample(fraction=0.1)
train_df = train_df.subtract(valid_df)

print('Size of test dataset:', test_df.count())
print('Size of train dataset', train_df.count())
print('Size of valid dataset', valid_df.count())

test_df.write.parquet('test.parquet', mode='overwrite')
train_df.write.parquet('train.parquet', mode='overwrite')
valid_df.write.parquet('valid.parquet', mode='overwrite')

Size of test dataset: 5965


                                                                                

Size of train dataset 21646


                                                                                

Size of valid dataset 2389


                                                                                ]

## Data access 

In [10]:
from ptls.data_load.datasets import ParquetDataset, ParquetFiles

In [11]:
iterable_train = ParquetDataset(ParquetFiles('train.parquet').data_files)

In [12]:
next(iter(iterable_train))

{'client_id': 6456,
 'trans_date': tensor([724, 724, 723,  ...,  49,  49, 209], dtype=torch.int32),
 'small_group': tensor([4, 1, 3,  ..., 4, 1, 2], dtype=torch.int32),
 'amount_rur': tensor([0.1194, 0.3518, 0.2086,  ..., 0.2233, 0.1576, 0.1701],
        dtype=torch.float64),
 'event_time': tensor([  2.,   2.,   3.,  ..., 720., 720., 721.])}

In [13]:
from ptls.data_load.datasets import MemoryMapDataset
from ptls.data_load.iterable_processing import SeqLenFilter, FeatureFilter

map_processed_train = MemoryMapDataset(
    data=iterable_train,
    i_filters=[
        SeqLenFilter(min_seq_len=25),
    ],
)

In [14]:
map_processed_train[0]

{'client_id': 6456,
 'trans_date': tensor([724, 724, 723,  ...,  49,  49, 209], dtype=torch.int32),
 'small_group': tensor([4, 1, 3,  ..., 4, 1, 2], dtype=torch.int32),
 'amount_rur': tensor([0.1194, 0.3518, 0.2086,  ..., 0.2233, 0.1576, 0.1701],
        dtype=torch.float64),
 'event_time': tensor([  2.,   2.,   3.,  ..., 720., 720., 721.])}

**Attention!**

You cannot use a pretrained `coles-emb.pt` with a different preprocessor than the one used to train the model.
This is because preprocessor save specific category to embedding_id mapping during fit procedure.
Model use this mapping during pretrain.
Using different mapping at inference will corrupt output.