This notebook is used for: <br>
1. Getting the .parquet files for large datasets
2. Matching target and rtk (user id in clickstreams dataset) via user id in transactions dataset

In [1]:
import pandas as pd
import findspark
findspark.init('C:\Spark')
from pyspark.sql import SparkSession
import os
import pyspark
import pyspark.sql.functions as F
from pyspark.sql import types as T

# Configuring the PySpark

In [2]:
data_path = './main_data/' #Change to path of your data folder

spark_conf = pyspark.SparkConf()
spark_conf.setMaster("local[*]").setAppName("PysparkDataPreprocessor")
spark_conf.set("spark.driver.maxResultSize", "4g")
spark_conf.set("spark.executor.memory", "16g")
spark_conf.set("spark.executor.memoryOverhead", "4g")
spark_conf.set("spark.driver.memory", "16g")
spark_conf.set("spark.driver.memoryOverhead", "4g")
spark_conf.set("spark.cores.max", "24")
spark_conf.set("spark.sql.shuffle.partitions", "200")
spark_conf.set("spark.local.dir", "../../spark_local_dir")


spark = SparkSession.builder.config(conf=spark_conf).getOrCreate()
spark.sparkContext.getConf().getAll()

[('spark.driver.extraJavaOptions',
  '-XX:+IgnoreUnrecognizedVMOptions --add-opens=java.base/java.lang=ALL-UNNAMED --add-opens=java.base/java.lang.invoke=ALL-UNNAMED --add-opens=java.base/java.lang.reflect=ALL-UNNAMED --add-opens=java.base/java.io=ALL-UNNAMED --add-opens=java.base/java.net=ALL-UNNAMED --add-opens=java.base/java.nio=ALL-UNNAMED --add-opens=java.base/java.util=ALL-UNNAMED --add-opens=java.base/java.util.concurrent=ALL-UNNAMED --add-opens=java.base/java.util.concurrent.atomic=ALL-UNNAMED --add-opens=java.base/sun.nio.ch=ALL-UNNAMED --add-opens=java.base/sun.nio.cs=ALL-UNNAMED --add-opens=java.base/sun.security.action=ALL-UNNAMED --add-opens=java.base/sun.util.calendar=ALL-UNNAMED --add-opens=java.security.jgss/sun.security.krb5=ALL-UNNAMED'),
 ('spark.driver.memoryOverhead', '4g'),
 ('spark.local.dir', '../../spark_local_dir'),
 ('spark.executor.id', 'driver'),
 ('spark.driver.memory', '16g'),
 ('spark.app.id', 'local-1679642713502'),
 ('spark.executor.memory', '16g'),
 (

# Preprocessing

In [3]:
source_data = spark.read.options(header=True, inferSchema=True).csv(os.path.join(data_path, 'clickstream.csv')) #Reading data

In [4]:
source_data = source_data.drop('new_uid')
source_data.show(2)

+--------------------+------+-------------------+
|             user_id|cat_id|          timestamp|
+--------------------+------+-------------------+
|000143baebad4467a...|   165|2021-01-30 20:08:12|
|000143baebad4467a...|   165|2021-01-31 20:06:29|
+--------------------+------+-------------------+
only showing top 2 rows



In [5]:
source_data.count()

126752515

In [6]:
#Preprocessing using ptls library
from ptls.preprocessing import PysparkDataPreprocessor

preprocessor = PysparkDataPreprocessor(
    col_id='user_id',
    col_event_time='timestamp',
    event_time_transformation='dt_to_timestamp',
    cols_category=['cat_id'],
)

In [7]:
%%time

dataset_pysparkdf = preprocessor.fit_transform(source_data).persist()
dataset_pysparkdf.count()

CPU times: total: 156 ms
Wall time: 3min 39s


19623

In [8]:
dataset_pysparkdf.show()

+--------------------+--------------------+--------------------+
|             user_id|              cat_id|          event_time|
+--------------------+--------------------+--------------------+
|018d951f1ccb4f288...|[29, 1, 1, 60, 60...|[1611903458, 1612...|
|01ef0e619d0f4a6eb...|[4, 2, 2, 3, 23, ...|[1610973443, 1612...|
|0570e0db5cbb47b1a...|[12, 8, 11, 36, 3...|[1612968600, 1613...|
|0677be173662457d8...|[12, 12, 12, 12, ...|[1622604120, 1622...|
|0914be36f20d4c9fb...|[1, 1, 9, 9, 9, 3...|[1612744599, 1612...|
|0d8486356bb14e509...|[1, 1, 3, 3, 3, 1...|[1613341920, 1613...|
|0ee126d04de04082b...|[6, 5, 5, 6, 49, ...|[1612281145, 1612...|
|1327d47e0b9f43c6b...|[7, 10, 1, 22, 3,...|[1612679673, 1612...|
|13ccb15a7ba046f4a...|[2, 23, 23, 2, 23...|[1611412404, 1611...|
|1b0cf53c7ef342159...|[3, 3, 1, 1, 1, 1...|[1613761740, 1613...|
|1e42703fbc9945949...|[10, 12, 1, 10, 1...|[1611207638, 1611...|
|201b130b924f4e31b...|[134, 4, 8, 8, 12...|[1611232125, 1611...|
|20ffea50cd5e4ee1b...|[3,

In [9]:
dataset = dataset_pysparkdf.toPandas()

In [9]:
preprocessor.get_category_dictionary_sizes()

{'cat_id': 404}

In [10]:
import pickle

with open('preprocessor.p', 'wb') as f:
    pickle.dump(preprocessor, f)

In [11]:
target = spark.read.options(header=True, inferSchema=True)\
    .csv(os.path.join(data_path, 'train.csv')) #Dataset containing targets
matching = spark.read.options(header=True, inferSchema=True)\
    .csv(os.path.join(data_path, 'train_matching.csv')) #Dataset containing matched rtk and transactions ids

In [12]:
matching.show()

+--------------------+--------------------+
|                bank|                 rtk|
+--------------------+--------------------+
|178b387813ac4a63a...|e19c0f141e9442c5b...|
|47cffa46e6b04389b...|7df3371aabd349e4a...|
|f73b767cfd72472aa...|b23d62c7e41145a7a...|
|48da649603734185b...|63ad789541c54463a...|
|37304ef19de542ee8...|c0e96de5dd594d948...|
|3c26cb845a4941ca9...|647e736a6a064cb5b...|
|a4840524c1b64416a...|b29febc9938749b59...|
|9a5e1c28552f4b82b...|41812d47e3614cc18...|
|d9968143901d4914a...|ec1adc1e08c7403fa...|
|22bef074cccb4a2a9...|9bf3775d68644f718...|
|31ef5cb8b89b4a16a...|6de89fa7fd054d9a9...|
|611fad24d9a44be1b...|a66eca3f820149d29...|
|09910580d99e44c28...|121e22e062b84acea...|
|b8e64a76a78a4036a...|fe433f40eb194c89b...|
|33af0b2b21ec4227a...|a24756560ecc44c2a...|
|5047955ee2654efaa...|aa79c504266a4d1a8...|
|fdc09565cf544c30b...|bf962314ef3040d5a...|
|0ca3e196bc3942e89...|238029b5fcd242359...|
|3d08d56ee08944799...|0f08048f4ff84e37b...|
|1d9acce9245d489db...|661166af46

In [13]:
target.show()

+--------------------+----------------+
|                bank|higher_education|
+--------------------+----------------+
|3755b59782464456b...|             0.0|
|604a550439d644718...|             0.0|
|542d4776ebe5454fb...|             1.0|
|ee37fecea44d475ca...|             0.0|
|18617a1100f44a99b...|             0.0|
|079f07153c0149d19...|             0.0|
|6dee55b3d7284d18b...|             1.0|
|13fdbc4dbd394c7fb...|             1.0|
|f94284392a064a93b...|             1.0|
|8653cbc7d48148a6a...|             1.0|
|b38863ecab6340c2a...|             1.0|
|fe19bba195414500b...|             1.0|
|0a0ae064b7d8423a8...|             0.0|
|546e6e2e490f42d68...|             1.0|
|499c20f1e53b4429b...|             1.0|
|9a35b2d9f02f49e09...|             1.0|
|5f3b2dbc151f4067b...|             0.0|
|aaef9bc7a58e40bba...|             1.0|
|91836eaf9d4e4975b...|             1.0|
|c883d8fdb75a49daa...|             1.0|
+--------------------+----------------+
only showing top 20 rows



In [14]:
target = target.sort('bank')
target.show()

+--------------------+----------------+
|                bank|higher_education|
+--------------------+----------------+
|000932580e404dafb...|             1.0|
|0020afcd52f54e9fa...|             1.0|
|0034020d25da4951b...|             1.0|
|0046da5a3d934f2db...|             1.0|
|004b3ef36faa40f08...|             1.0|
|0054a0388a8647d99...|             0.0|
|0059cb4a0de44cff9...|             1.0|
|005e2282c9ea4ddfb...|             1.0|
|0082e5d4d8074f05b...|             0.0|
|008607c1098d4e689...|             0.0|
|0087c80c55924740b...|             1.0|
|008a98f3d27e40b58...|             1.0|
|008ca092454a4ecd9...|             1.0|
|009fb6e432894d3a9...|             1.0|
|00a000619ec24ee39...|             0.0|
|00ad819ef6184f8b9...|             1.0|
|00b64a4131744fe78...|             1.0|
|00be95f5e4a8478e8...|             1.0|
|00c834f3d1064f28a...|             0.0|
|00d4a8c57ff14d0cb...|             1.0|
+--------------------+----------------+
only showing top 20 rows



In [15]:
matching = matching.sort('bank')
matching.show()

+--------------------+--------------------+
|                bank|                 rtk|
+--------------------+--------------------+
|000932580e404dafb...|97d2092878ea42678...|
|0009e36b42cb4caeb...|beed41e945754ac5a...|
|000b29acb6bd44f99...|bb1230b232af439e9...|
|000c5327d99941fe9...|7d7b83b85f3f4584b...|
|000e0d54d7c945ebb...|78e9b8a98fff4f019...|
|0012e60b16f14da4b...|f4a70c0d6b8e4878b...|
|001879c9110d46ed9...|a58617b4b3424468b...|
|001c99d8cd6f409f8...|d2a0951ee0d445039...|
|0020536c52ee4257b...|e0208c1d86824a09b...|
|0020afcd52f54e9fa...|ae1a28a8428740e7a...|
|00260161e7fd40369...|091c259cd60844afa...|
|00262245fd4344b0b...|1262ca30efea49bc9...|
|002d5bbe9a80403b8...|cdf0f9eb0bdd4f07a...|
|0033ef60398646ff8...|                   0|
|003812b529ad4e579...|                   0|
|003d93fb918846ada...|cb4d0db7a2a5490cb...|
|0041da6ae2ab46108...|                   0|
|004231ff0d034d1e9...|                   0|
|0046da5a3d934f2db...|03de39a7ee8d482b8...|
|0047dbb5ef764871a...|a68ffabf07

In [16]:
train_dataset = matching.join(target,'bank','outer') # matching target and matching datasets

In [17]:
train_dataset.show()

+--------------------+--------------------+----------------+
|                bank|                 rtk|higher_education|
+--------------------+--------------------+----------------+
|000932580e404dafb...|97d2092878ea42678...|             1.0|
|0009e36b42cb4caeb...|beed41e945754ac5a...|            null|
|000b29acb6bd44f99...|bb1230b232af439e9...|            null|
|000c5327d99941fe9...|7d7b83b85f3f4584b...|            null|
|000e0d54d7c945ebb...|78e9b8a98fff4f019...|            null|
|0012e60b16f14da4b...|f4a70c0d6b8e4878b...|            null|
|001879c9110d46ed9...|a58617b4b3424468b...|            null|
|001c99d8cd6f409f8...|d2a0951ee0d445039...|            null|
|0020536c52ee4257b...|e0208c1d86824a09b...|            null|
|0020afcd52f54e9fa...|ae1a28a8428740e7a...|             1.0|
|00260161e7fd40369...|091c259cd60844afa...|            null|
|00262245fd4344b0b...|1262ca30efea49bc9...|            null|
|002d5bbe9a80403b8...|cdf0f9eb0bdd4f07a...|            null|
|0033ef60398646ff8...|  

In [18]:
# dropping not used bank ids column
train_dataset = train_dataset.drop('bank') 
# dropping null values (caused by lack of matched rtk and bank ids in initial data)
train_dataset = train_dataset.dropna() 
train_dataset.show()

+--------------------+----------------+
|                 rtk|higher_education|
+--------------------+----------------+
|97d2092878ea42678...|             1.0|
|ae1a28a8428740e7a...|             1.0|
|03de39a7ee8d482b8...|             1.0|
|9c4627b2935041099...|             1.0|
|5a9867b5c5b54819a...|             0.0|
|d1119df4cce24574a...|             1.0|
|51c52794be054932b...|             1.0|
|cfdefb5f0db9496d9...|             0.0|
|0aac66f864f149d3a...|             1.0|
|bbefdcf97f73473f8...|             1.0|
|cf57eefa27824b22b...|             1.0|
|add3026385c0416ba...|             0.0|
|e15004fd2b014295b...|             1.0|
|182ad88293bb4ffa9...|             1.0|
|274d03fc64f84f67a...|             1.0|
|b1f376bb6b744f3ca...|             0.0|
|ddfda8e42a434e28a...|             1.0|
|04548acddc584bf39...|             1.0|
|ff54a4a9cf6a429cb...|             1.0|
|aa5e786ae5fe4618b...|             1.0|
+--------------------+----------------+
only showing top 20 rows



In [19]:
# Checking if there are some strange data in rtk column
from pyspark.sql.functions import count,when,col

check = train_dataset.select(count(when(col('rtk') == 0, 'rtk')).alias('rtk'))
check.show()

+---+
|rtk|
+---+
|  0|
+---+



In [20]:
test_df = dataset_pysparkdf.sample(fraction=0.2)
train_df = dataset_pysparkdf.subtract(test_df)

print('Size of test dataset:', test_df.count())
print('Size of train dataset', train_df.count())


Size of test dataset: 3908
Size of train dataset 15715


In [22]:
# Saving .parquet files for model training
test_df.write.parquet('test.parquet', mode='overwrite')
train_df.write.parquet('train.parquet', mode='overwrite')


In [42]:
# Saving matched rtk and target for model training
train_dataset.toPandas().to_csv('target_dataset_matched.csv',index=False)