# Spark modelling - optimized

In [None]:
# 1. Libraries and Spark setup

import os
import sys
import json
import datetime
import numpy as np
import pandas as pd
from tqdm.auto import tqdm
import matplotlib.pyplot as plt
from pyspark import SparkContext, SparkConf
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql.types import *
from pyspark.sql.window import Window
from pyspark.ml.feature import Word2Vec
from pyspark.ml import Pipeline
from pyspark.ml.functions import vector_to_array
from pyspark.ml.feature import Imputer
from pyspark.ml.classification import LogisticRegression, RandomForestClassifier
from pyspark.ml.feature import OneHotEncoder, StringIndexer, VectorAssembler
from pyspark.ml.evaluation import BinaryClassificationEvaluator
from pyspark.mllib.evaluation import BinaryClassificationMetrics
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder
pd.set_option('display.max_columns', None)
pd.set_option('display.max_rows', None)

In [None]:
def access_data(file_path):
    with open(file_path) as file:
        access_data = json.load(file)
    return access_data

access_s3_data = access_data('.access_jhub_data')

In [None]:
def uiWebUrl(self):
    from urllib.parse import urlparse
    web_url = self._jsc.sc().uiWebUrl().get()
    port = urlparse(web_url).port
    return '{}proxy/{}/jobs/'.format(os.environ['JUPYTERHUB_SERVICE_PREFIX'], port)


SparkContext.uiWebUrl = property(uiWebUrl)
conf = SparkConf()
conf.set('spark.master', 'local[5]')
conf.set('spark.driver.memory', '32G')
conf.set('spark.driver.maxResultSize', '8G')
sc = SparkContext(conf=conf)
spark = SparkSession(sc)
spark._jsc.hadoopConfiguration().set('fs.s3a.access.key', access_s3_data['aws_access_key_id'])
spark._jsc.hadoopConfiguration().set('fs.s3a.secret.key', access_s3_data['aws_secret_access_key'])
spark._jsc.hadoopConfiguration().set('fs.s3a.impl', 'org.apache.hadoop.fs.s3a.S3AFileSystem')
spark._jsc.hadoopConfiguration().set('fs.s3a.multipart.size', '104857600')
spark._jsc.hadoopConfiguration().set('fs.s3a.block.size', '33554432')
spark._jsc.hadoopConfiguration().set('fs.s3a.threads.max', '256')
spark._jsc.hadoopConfiguration().set('fs.s3a.endpoint', 'http://storage.yandexcloud.net')
spark._jsc.hadoopConfiguration().set('fs.s3a.aws.credentials.provider', 
                                     'org.apache.hadoop.fs.s3a.SimpleAWSCredentialsProvider')
spark

In [None]:
## 2. Dataset

In [None]:
"""This section defines variables for data processing and file paths, then performs several data processing stages:
Variables:

* `VER`: Version identifier.
* `PROC_DS`, `PROC_LAGS`, `PROC_VECS`: Flags to control data processing stages.
* `FRAC_0`: Fraction of negative examples to sample when processing lags.
* `BUCKET`: S3 bucket name.
* `files_path`, `files_mask`: Local paths and masks for raw data files.
* `file_path_ds`, `file_path_lags`, `file_path_trn`, `file_path_tst`: S3 paths for different stages of processed data.


• `VER = 'v2'`: This defines the version of your data processing pipeline. You might increment this version number when you make significant changes to the processing steps. V2 is just an example, you will choose your own version. 
• `BUCKET` = 'pvc-84ea79a0-dc20-4a2d-86ab-f83c1f8d4a7b': This specifies the name of the bucket where your data is stored (likely on a cloud storage platform like AWS S3 or Yandex Object Storage).

**Flags:**

• `PROC_DS` (False): This flag controls whether to process the raw dataset. If True, the code will read the raw CSV files, extract relevant columns, filter by date, and create the data_raw.parquet file.

* Change to True: When you have new raw data or need to reprocess the existing raw data due to changes in the extraction logic.
* Keep as False: When you already have a processed data_raw.parquet file and don't need to re-process it.

• `PROC_LAGS` (False): This flag controls whether to process and create lag features. If True, the code will calculate lag features based on event history for different time windows and store them in the data_lags.parquet file.

* Change to True: When you need to recalculate lag features, such as when you've changed the time window definitions or added new events.
* Keep as False: When you already have the desired lag features in data_lags.parquet and don't need to recompute them.

• `FRAC_0` (.001): This variable sets the sampling fraction for events with payment_event_flag = 0 when processing lags. This is used to reduce the size of the dataset for faster processing while maintaining a representative sample.

* Adjust Value: You might change this value depending on the size of your dataset and the desired balance between processing time and data representativeness.

• `PROC_VECS` (True): This flag controls whether to vectorize the data using TF-IDF. If True, the code will transform the lag features into numerical vectors using TF-IDF and store them in data_vec_train.parquet and data_vec_test.parquet files.

* Change to False: If you want to experiment with other vectorization methods or use the data in its raw form.
* Keep as True: When TF-IDF vectorization is the desired approach for your modeling tasks.

**File Paths:**

• `files_path`, `files_mask`: These define the location and pattern of the raw data files.
• `file_path_ds`, `file_path_lags`, `file_path_trn`, `file_path_tst`: These specify the storage locations for the processed datasets at different stages of the pipeline.

By understanding these flags and variables, you can control which parts of the data processing pipeline are executed, allowing for efficient experimentation and iteration."""

In [None]:
VER = 'vPP2'
PROC_DS = True
PROC_LAGS = True
FRAC_0 = .002  # used only if `PROC_LAGS = True`
PROC_VECS = True
BUCKET = 'pvc-84ea79a0-dc20-4a2d-86ab-f83c1f8d4a7b'
PRJ_PATH = '/home/jovyan/__RAYPFP'

files_path = 'data/events'
files_mask = f'{files_path}/data_202*-*-*.csv'

file_path_ds = f's3a://{BUCKET}/work/{VER}/data_raw.parquet'
file_path_lags = f's3a://{BUCKET}/work/{VER}/data_lags.parquet'
file_path_trn = f's3a://{BUCKET}/work/{VER}/data_vec_train.parquet'
file_path_tst = f's3a://{BUCKET}/work/{VER}/data_vec_test.parquet'

In [None]:
def clean_parquet(path):
    cmd = path.replace(
        f's3a://{BUCKET}',
        f'rm -rf {PRJ_PATH}'
    )
    !{cmd}
    return f'command to run: {cmd}'

In [None]:
### 2.1. Load or preprocess data - `raw` stage

In [None]:
%%time

flag_min_datetime = datetime.datetime(2024, 4, 30, 0, 0, 0)
flag_max_datetime = datetime.datetime(2024, 5, 21, 23, 59, 59)
print(flag_min_datetime, flag_max_datetime)

if PROC_DS:
    sdf = spark.read.option('escape','"').csv(f's3a://{BUCKET}/{files_mask}', header=True)
    sdf = sdf.withColumn('event_datetime', F.to_timestamp("event_datetime"))
    sdf = sdf.withColumn(
        'payment_event_flag', 
        (
            (F.col('event_name').like('%Мои штрафы/Оплата/Завершили оплату%') | 
            F.col('event_name').like('%Мои штрафы/Оплата/Платёж принят%')) &
            F.col('event_datetime').between(flag_min_datetime, flag_max_datetime)
        ).cast("int")
    )
    sdf = sdf.select(
        'profile_id',
        'event_datetime',
        'payment_event_flag',
        'event_name'
    )
    cmd = file_path_ds.replace(
        f's3a://{BUCKET}',
        f'ls -la {PRJ_PATH}'
    )
    clean_parquet(file_path_ds)
    sdf.repartition(1).write.parquet(file_path_ds)
    sdf.unpersist()
sdf = spark.read.parquet(file_path_ds)
sdf.limit(5).toPandas()

In [None]:
#checking that all profile_id columns have full data 

sdf.filter(F.col('profile_id').isNull()).count()

In [None]:
### 2.2. Load or preprocess data - `lags` stage

In [None]:
def dataset_lags(sdf, shift=0):
    hour = 60 * 60
    day = 24 * 60 * 60

    w_10min_to_1week = (Window()
          .partitionBy(F.col('profile_id'))
          .orderBy(F.col('event_datetime').cast('timestamp').cast('long'))
          .rangeBetween(-7 * day + shift, -10 * 60 + shift))
    w_1week_to_2weeks = (Window()
          .partitionBy(F.col('profile_id'))
          .orderBy(F.col('event_datetime').cast('timestamp').cast('long'))
          .rangeBetween(-14 * day + shift, -7 * day + shift))
    w_2weeks_to_3weeks = (Window()
          .partitionBy(F.col('profile_id'))
          .orderBy(F.col('event_datetime').cast('timestamp').cast('long'))
          .rangeBetween(-21 * day + shift, -14 * day + shift))
    w_3weeks_to_4weeks = (Window()
          .partitionBy(F.col('profile_id'))
          .orderBy(F.col('event_datetime').cast('timestamp').cast('long'))
          .rangeBetween(-28 * day + shift, -21 * day + shift))
    w_4weeks_to_5weeks = (Window()
          .partitionBy(F.col('profile_id'))
          .orderBy(F.col('event_datetime').cast('timestamp').cast('long'))
          .rangeBetween(-35 * day + shift, -28 * day + shift))
    w_5weeks_to_6weeks = (Window()
          .partitionBy(F.col('profile_id'))
          .orderBy(F.col('event_datetime').cast('timestamp').cast('long'))
          .rangeBetween(-42 * day + shift, -35 * day + shift))
    w_6weeks_to_7weeks = (Window()
          .partitionBy(F.col('profile_id'))
          .orderBy(F.col('event_datetime').cast('timestamp').cast('long'))
          .rangeBetween(-49 * day + shift, -42 * day + shift))
    w_7weeks_to_8weeks = (Window()
          .partitionBy(F.col('profile_id'))
          .orderBy(F.col('event_datetime').cast('timestamp').cast('long'))
          .rangeBetween(-56 * day + shift, -49 * day + shift))
    w_8weeks_to_9weeks = (Window()
          .partitionBy(F.col('profile_id'))
          .orderBy(F.col('event_datetime').cast('timestamp').cast('long'))
          .rangeBetween(-63 * day + shift, -56 * day + shift))
    w_9weeks_to_10weeks = (Window()
          .partitionBy(F.col('profile_id'))
          .orderBy(F.col('event_datetime').cast('timestamp').cast('long'))
          .rangeBetween(-70 * day + shift, -63 * day + shift))
    w_10weeks_to_11weeks = (Window()
          .partitionBy(F.col('profile_id'))
          .orderBy(F.col('event_datetime').cast('timestamp').cast('long'))
          .rangeBetween(-77 * day + shift, -70 * day + shift))
    w_11weeks_to_12weeks = (Window()
          .partitionBy(F.col('profile_id'))
          .orderBy(F.col('event_datetime').cast('timestamp').cast('long'))
          .rangeBetween(-84 * day + shift, -77 * day + shift))
    w_12weeks_to_13weeks = (Window()
          .partitionBy(F.col('profile_id'))
          .orderBy(F.col('event_datetime').cast('timestamp').cast('long'))
          .rangeBetween(-91 * day + shift, -84 * day + shift))
    w_13weeks_to_14weeks = (Window()
          .partitionBy(F.col('profile_id'))
          .orderBy(F.col('event_datetime').cast('timestamp').cast('long'))
          .rangeBetween(-98 * day + shift, -91 * day + shift))   
    w_14weeks_to_15weeks = (Window()
          .partitionBy(F.col('profile_id'))
          .orderBy(F.col('event_datetime').cast('timestamp').cast('long'))
          .rangeBetween(-105 * day + shift, -98 * day + shift))
    w_15weeks_to_16weeks = (Window()
          .partitionBy(F.col('profile_id'))
          .orderBy(F.col('event_datetime').cast('timestamp').cast('long'))
          .rangeBetween(-112 * day + shift, -105 * day + shift))
    w_16weeks_to_17weeks = (Window()
          .partitionBy(F.col('profile_id'))
          .orderBy(F.col('event_datetime').cast('timestamp').cast('long'))
          .rangeBetween(-119 * day + shift, -112 * day + shift))
    w_17weeks_to_18weeks = (Window()
          .partitionBy(F.col('profile_id'))
          .orderBy(F.col('event_datetime').cast('timestamp').cast('long'))
          .rangeBetween(-126 * day + shift, -119 * day + shift))
    w_18weeks_to_19weeks = (Window()
          .partitionBy(F.col('profile_id'))
          .orderBy(F.col('event_datetime').cast('timestamp').cast('long'))
          .rangeBetween(-133 * day + shift, -126 * day + shift))
    w_19weeks_to_20weeks = (Window()
          .partitionBy(F.col('profile_id'))
          .orderBy(F.col('event_datetime').cast('timestamp').cast('long'))
          .rangeBetween(-140 * day + shift, -133 * day + shift))
    w_20weeks_to_21weeks = (Window()
          .partitionBy(F.col('profile_id'))
          .orderBy(F.col('event_datetime').cast('timestamp').cast('long'))
          .rangeBetween(-147 * day + shift, -140 * day + shift))
    w_21weeks_to_22weeks = (Window()
          .partitionBy(F.col('profile_id'))
          .orderBy(F.col('event_datetime').cast('timestamp').cast('long'))
          .rangeBetween(-154 * day + shift, -147 * day + shift))
    w_22weeks_to_23weeks = (Window()
          .partitionBy(F.col('profile_id'))
          .orderBy(F.col('event_datetime').cast('timestamp').cast('long'))
          .rangeBetween(-161 * day + shift, -154 * day + shift))
    w_23weeks_to_24weeks = (Window()
          .partitionBy(F.col('profile_id'))
          .orderBy(F.col('event_datetime').cast('timestamp').cast('long'))
          .rangeBetween(-168 * day + shift, -161 * day + shift))
    w_24weeks_to_25weeks = (Window()
          .partitionBy(F.col('profile_id'))
          .orderBy(F.col('event_datetime').cast('timestamp').cast('long'))
          .rangeBetween(-175 * day + shift, -168 * day + shift))
    w_25weeks_to_26weeks = (Window()
          .partitionBy(F.col('profile_id'))
          .orderBy(F.col('event_datetime').cast('timestamp').cast('long'))
          .rangeBetween(-182 * day + shift, -175 * day + shift)) 
    w_26weeks_to_27weeks = (Window()
          .partitionBy(F.col('profile_id'))
          .orderBy(F.col('event_datetime').cast('timestamp').cast('long'))
          .rangeBetween(-189 * day + shift, -182 * day + shift)) 
    
    return (
        sdf
            #.withColumn('lag_10min_to_1week', F.collect_list('event_name').over(w_10min_to_1week))
            .withColumn('lag_1week_to_2weeks', F.collect_list('event_name').over(w_1week_to_2weeks))
            .withColumn('lag_2weeks_to_3weeks', F.collect_list('event_name').over(w_2weeks_to_3weeks))
            .withColumn('lag_3weeks_to_4weeks', F.collect_list('event_name').over(w_3weeks_to_4weeks))
            .withColumn('lag_4weeks_to_5weeks', F.collect_list('event_name').over(w_4weeks_to_5weeks))
            .withColumn('lag_5weeks_to_6weeks', F.collect_list('event_name').over(w_5weeks_to_6weeks))
            .withColumn('lag_6weeks_to_7weeks', F.collect_list('event_name').over(w_6weeks_to_7weeks))
            .withColumn('lag_7weeks_to_8weeks', F.collect_list('event_name').over(w_7weeks_to_8weeks))
            .withColumn('lag_8weeks_to_9weeks', F.collect_list('event_name').over(w_8weeks_to_9weeks))
            .withColumn('lag_9weeks_to_10weeks', F.collect_list('event_name').over(w_9weeks_to_10weeks))
            .withColumn('lag_10weeks_to_11weeks', F.collect_list('event_name').over(w_10weeks_to_11weeks))
            .withColumn('lag_11weeks_to_12weeks', F.collect_list('event_name').over(w_11weeks_to_12weeks))
            .withColumn('lag_12weeks_to_13weeks', F.collect_list('event_name').over(w_12weeks_to_13weeks))
            .withColumn('lag_13weeks_to_14weeks', F.collect_list('event_name').over(w_13weeks_to_14weeks))
            .withColumn('lag_14weeks_to_15weeks', F.collect_list('event_name').over(w_14weeks_to_15weeks))
            .withColumn('lag_15weeks_to_16weeks', F.collect_list('event_name').over(w_15weeks_to_16weeks))
            .withColumn('lag_16weeks_to_17weeks', F.collect_list('event_name').over(w_16weeks_to_17weeks))
            .withColumn('lag_17weeks_to_18weeks', F.collect_list('event_name').over(w_17weeks_to_18weeks))
            .withColumn('lag_18weeks_to_19weeks', F.collect_list('event_name').over(w_18weeks_to_19weeks))
            .withColumn('lag_19weeks_to_20weeks', F.collect_list('event_name').over(w_19weeks_to_20weeks))
            .withColumn('lag_20weeks_to_21weeks', F.collect_list('event_name').over(w_20weeks_to_21weeks))
            .withColumn('lag_21weeks_to_22weeks', F.collect_list('event_name').over(w_21weeks_to_22weeks))
            .withColumn('lag_22weeks_to_23weeks', F.collect_list('event_name').over(w_22weeks_to_23weeks))
            .withColumn('lag_23weeks_to_24weeks', F.collect_list('event_name').over(w_23weeks_to_24weeks))
            .withColumn('lag_24weeks_to_25weeks', F.collect_list('event_name').over(w_24weeks_to_25weeks))
            .withColumn('lag_25weeks_to_26weeks', F.collect_list('event_name').over(w_25weeks_to_26weeks))
            .withColumn('lag_26weeks_to_27weeks', F.collect_list('event_name').over(w_26weeks_to_27weeks))
            .select(
                'profile_id',
                'event_datetime',
                'payment_event_flag',
                'event_name',
                #'lag_10min_to_1week',
                'lag_1week_to_2weeks',
                'lag_2weeks_to_3weeks',
                'lag_3weeks_to_4weeks',
                'lag_4weeks_to_5weeks',
                'lag_5weeks_to_6weeks',
                'lag_6weeks_to_7weeks',
                'lag_7weeks_to_8weeks',
                'lag_8weeks_to_9weeks',
                'lag_9weeks_to_10weeks',
                'lag_10weeks_to_11weeks',
                'lag_11weeks_to_12weeks',
                'lag_12weeks_to_13weeks',
                'lag_13weeks_to_14weeks',
                'lag_14weeks_to_15weeks',
                'lag_15weeks_to_16weeks',
                'lag_16weeks_to_17weeks',
                'lag_17weeks_to_18weeks',
                'lag_18weeks_to_19weeks',
                'lag_19weeks_to_20weeks',
                'lag_20weeks_to_21weeks',
                'lag_21weeks_to_22weeks',
                'lag_22weeks_to_23weeks',
                'lag_23weeks_to_24weeks',
                'lag_24weeks_to_25weeks',
                'lag_25weeks_to_26weeks',
                'lag_26weeks_to_27weeks'
            )
        .orderBy(F.col('event_datetime'), ascending=False)
    )

In [None]:
"""This section defines a list of lag features to be used.

The datasets_tfidf function performs `TF-IDF` vectorization on the lag features for both training and test datasets.

It uses `HashingTF` to convert lists of event names into numerical feature vectors and `IDF` to rescale the features based on their document frequency.

The function also creates a dictionary mapping feature indices to the corresponding event names.""" 

In [None]:
%%time
if PROC_LAGS:
    sdf = sdf.sampleBy(
        'payment_event_flag', 
        fractions={0: FRAC_0, 1: 1}, 
        seed=2023
    )
    sdf = dataset_lags(sdf)
    dates  = (flag_min_datetime, flag_max_datetime)
    sdf = sdf.filter(sdf.event_datetime.between(*dates))
    sdf = sdf.filter(
        #(F.size('lag_10min_to_1week')      > 0) |
        (F.size('lag_1week_to_2weeks')     > 0) |
        (F.size('lag_2weeks_to_3weeks')    > 0) |
        (F.size('lag_3weeks_to_4weeks')    > 0) |
        (F.size('lag_4weeks_to_5weeks')    > 0) |
        (F.size('lag_5weeks_to_6weeks')    > 0) |
        (F.size('lag_6weeks_to_7weeks')    > 0) |
        (F.size('lag_7weeks_to_8weeks')    > 0) |
        (F.size('lag_8weeks_to_9weeks')    > 0) |
        (F.size('lag_9weeks_to_10weeks')   > 0) |
        (F.size('lag_10weeks_to_11weeks')  > 0) |
        (F.size('lag_11weeks_to_12weeks')  > 0) |
        (F.size('lag_12weeks_to_13weeks')  > 0) |
        (F.size('lag_13weeks_to_14weeks')  > 0) |
        (F.size('lag_14weeks_to_15weeks')  > 0) |
        (F.size('lag_15weeks_to_16weeks')  > 0) |
        (F.size('lag_16weeks_to_17weeks')  > 0) |
        (F.size('lag_17weeks_to_18weeks')  > 0) |
        (F.size('lag_18weeks_to_19weeks')  > 0) |
        (F.size('lag_19weeks_to_20weeks')  > 0) |
        (F.size('lag_20weeks_to_21weeks')  > 0) |
        (F.size('lag_21weeks_to_22weeks')  > 0) |
        (F.size('lag_22weeks_to_23weeks')  > 0) |
        (F.size('lag_23weeks_to_24weeks')  > 0) |
        (F.size('lag_24weeks_to_25weeks')  > 0) |
        (F.size('lag_25weeks_to_26weeks')  > 0) |
        (F.size('lag_26weeks_to_27weeks')  > 0)
    )
    clean_parquet(file_path_lags)
    sdf.repartition(8).write.parquet(file_path_lags)
    sdf.unpersist()
sdf = spark.read.parquet(file_path_lags)
sdf.groupBy('payment_event_flag').count().toPandas()

In [None]:
def stratified_split(sdf, frac, label, seed=2023):
    zeros = sdf.filter(sdf[label] == 0)
    ones = sdf.filter(sdf[label] == 1)
    train_, test_ = zeros.randomSplit([1 - frac, frac], seed=seed)
    train, test = ones.randomSplit([1 - frac, frac], seed=seed)
    train = train.union(train_)
    test = test.union(test_)
    return train, test

In [None]:
sdf_train, sdf_test = stratified_split(
    sdf,
    frac=.2,
    label='payment_event_flag',
    seed=2023
)

In [None]:
sdf_train.groupBy('payment_event_flag').count().toPandas()

In [None]:
sdf_test.groupBy('payment_event_flag').count().toPandas()

In [None]:
### 2.3. Load or preprocess data - `vectorize` stage

In [None]:
lags = [
    #'lag_10min_to_1week',
    'lag_1week_to_2weeks',
    'lag_2weeks_to_3weeks',
    'lag_3weeks_to_4weeks',
    'lag_4weeks_to_5weeks',
    'lag_5weeks_to_6weeks',
    'lag_6weeks_to_7weeks',
    'lag_7weeks_to_8weeks',
    'lag_8weeks_to_9weeks',
    'lag_9weeks_to_10weeks',
    'lag_10weeks_to_11weeks',
    'lag_11weeks_to_12weeks',
    'lag_12weeks_to_13weeks',
    'lag_13weeks_to_14weeks',
    'lag_14weeks_to_15weeks',
    'lag_15weeks_to_16weeks',
    'lag_16weeks_to_17weeks',
    'lag_17weeks_to_18weeks',
    'lag_18weeks_to_19weeks',
    'lag_19weeks_to_20weeks',
    'lag_20weeks_to_21weeks',
    'lag_21weeks_to_22weeks',
    'lag_22weeks_to_23weeks',
    'lag_23weeks_to_24weeks',
    'lag_24weeks_to_25weeks',
    'lag_25weeks_to_26weeks',
    'lag_26weeks_to_27weeks'
]

In [None]:
def datasets_vecorized(sdf_train, sdf_test, lags, vec_size=10):
    vectorizers = []
    for lag in tqdm(lags):
        word2Vec = Word2Vec(
            vectorSize=vec_size,
            minCount=0,
            inputCol=lag,
            outputCol=lag + '_vec'
        )
        vectorizer = word2Vec.fit(sdf_train)
        sdf_train = vectorizer.transform(sdf_train)
        sdf_test = vectorizer.transform(sdf_test)
        vectorizers.append(vectorizer)
    return sdf_train, sdf_test, vectorizers

In [None]:
"""## Breakdown of if PROC_VECS code: 


**Conditional Execution:**

• if PROC_VECS:: The code within this block is executed only if the PROC_VECS flag is set to True. This flag controls whether TF-IDF vectorization is performed on the data.

**TF-IDF Vectorization:**

`sdf_train, sdf_test, vectorizers = datasets_tfidf(...)`: This line calls the datasets_tfidf function, which performs TF-IDF vectorization on the lag features present in the sdf_train and sdf_test DataFrames.
* The lags argument provides the list of lag feature column names to be vectorized.
* The vec_size=10 argument specifies the desired dimensionality (number of features) of the resulting TF-IDF vectors.

**The function returns three values:**

* `sdf_train`: The training DataFrame with the added TF-IDF vector columns.
* `sdf_test`: The test DataFrame with the added TF-IDF vector columns.
* `vectorizers`: A list of fitted TF-IDF vectorizer models (one for each lag feature).

**Cleaning and Saving Parquet Files:**

`clean_parquet(file_path_trn)`: This line calls a function (not shown) to clean up any existing Parquet files at the specified path (file_path_trn) before saving the new data.

`sdf_train.repartition(8).write.parquet(file_path_trn)`: The training DataFrame (sdf_train) is repartitioned into 8 partitions for optimized writing.

* The write.parquet method saves the DataFrame as a Parquet file at the specified path (file_path_trn).
* The same process is repeated for the test DataFrame (sdf_test) using file_path_tst.

**Unpersisting DataFrames:**

* `sdf_train.unpersist(), sdf_test.unpersist()`: These lines remove the DataFrames from Spark's memory. Since the data has been saved to disk, it can be reloaded later if needed, freeing up memory for subsequent processing.

**Reloading DataFrames (if necessary):**

* `sdf_train = spark.read.parquet(file_path_trn)`: This line reloads the training data from the saved Parquet file if it's not already in memory.

The same is done for the test data using file_path_tst.""" 

In [None]:
if PROC_VECS:
    sdf_train, sdf_test, vectorizers = datasets_vecorized(
        sdf_train,
        sdf_test,
        lags,
        vec_size=10
    )
    clean_parquet(file_path_trn)
    sdf_train.repartition(8).write.parquet(file_path_trn)
    clean_parquet(file_path_tst)
    sdf_test.repartition(8).write.parquet(file_path_tst)
    sdf_train.unpersist()
    sdf_test.unpersist()
sdf_train = spark.read.parquet(file_path_trn)
sdf_test = spark.read.parquet(file_path_tst)

In [None]:
sdf_train.printSchema()

## 3. Model

### 3.1. Features assembling

In [None]:
def features_assembled(sdf, feats):
    cols_to_model = [x + '_vec' for x in feats]
    cols_to_model.extend(['payment_event_flag'])
    print('columns to model:', cols_to_model)
    vecAssembler = VectorAssembler(
        inputCols=[c for c in cols_to_model if c != 'payment_event_flag'], 
        outputCol='features'
    )
    features = sdf.select(cols_to_model)
    features_vec = vecAssembler.transform(features)
    features_data = features_vec.select('payment_event_flag', 'features')
    return features_data


def upsampled(sdf, label, upsample='max'):
    zeros = sdf.filter(sdf[label] == 0)
    ones = sdf.filter(sdf[label] == 1)
    res = zeros.union(ones)
    if upsample == 'max':
        up_count = int(zeros.count() / ones.count())
        for _ in range(up_count - 1):
            res = res.union(ones)
    else:
        for _ in range(upsample - 1):
            res = res.union(ones)
    return res

In [None]:
UPSAMPLE = None  # can be None or 'max'

In [None]:
feats = [
#    'lag_10min_to_1week',
    'lag_1week_to_2weeks',
    'lag_2weeks_to_3weeks',
    'lag_3weeks_to_4weeks',
    'lag_4weeks_to_5weeks',
    'lag_5weeks_to_6weeks',
    'lag_6weeks_to_7weeks',
    'lag_7weeks_to_8weeks',
    'lag_8weeks_to_9weeks',
    'lag_9weeks_to_10weeks',
    'lag_10weeks_to_11weeks',
    'lag_11weeks_to_12weeks',
    'lag_12weeks_to_13weeks',
    'lag_13weeks_to_14weeks',
    'lag_14weeks_to_15weeks',
    'lag_15weeks_to_16weeks',
    'lag_16weeks_to_17weeks',
    'lag_17weeks_to_18weeks',
    'lag_18weeks_to_19weeks',
    'lag_19weeks_to_20weeks',
    'lag_20weeks_to_21weeks',
    'lag_21weeks_to_22weeks',
    'lag_22weeks_to_23weeks',
    'lag_23weeks_to_24weeks',
    'lag_24weeks_to_25weeks',
    'lag_25weeks_to_26weeks',
    'lag_26weeks_to_27weeks'
]
features_train = features_assembled(sdf_train, feats=feats)
features_test = features_assembled(sdf_test, feats=feats)
if UPSAMPLE:
    features_train = upsampled(
        features_train,
        label='payment_event_flag',
        upsample=UPSAMPLE
    )
    # Use to upsample test set
    features_test = upsampled(
        features_test,
        label='payment_event_flag',
        upsample=UPSAMPLE
    )

In [None]:
features_train.groupBy('payment_event_flag').count().toPandas()

In [None]:
features_test.groupBy('payment_event_flag').count().toPandas()

In [None]:
### 3.2. Training and evaluating

In [None]:
rf = RandomForestClassifier(
    labelCol='payment_event_flag',
    featuresCol='features',
    numTrees=100,
    maxDepth=16
)

In [None]:
%%time
model = rf.fit(features_train)

In [None]:
predictions = model.transform(features_test)
payment_event_flag_preds = predictions.select('prediction', 'payment_event_flag')
metrics = BinaryClassificationMetrics(
    payment_event_flag_preds.rdd.map(
        lambda lines: [float(x) for x in lines]
    )
)
print('ROC AUC:', metrics.areaUnderROC)
print('Area under PR-curve:', metrics.areaUnderPR)

In [None]:
### 3.3. Future look

In [None]:
sdf_pred = spark.read.parquet(file_path_ds)
sdf_pred.limit(5).toPandas()

In [None]:
%%time

SHIFT = 7 * 24 * 60 * 60  # 7 days ahead

sdf_pred = sdf_pred.sample(fraction=.0001)
sdf_pred = dataset_lags(sdf_pred, shift=SHIFT)
sdf = sdf.filter(
       #(F.size('lag_10min_to_1week')      > 0) |
        (F.size('lag_1week_to_2weeks')     > 0) |
        (F.size('lag_2weeks_to_3weeks')    > 0) |
        (F.size('lag_3weeks_to_4weeks')    > 0) |
        (F.size('lag_4weeks_to_5weeks')    > 0) |
        (F.size('lag_5weeks_to_6weeks')    > 0) |
        (F.size('lag_6weeks_to_7weeks')    > 0) |
        (F.size('lag_7weeks_to_8weeks')    > 0) |
        (F.size('lag_8weeks_to_9weeks')    > 0) |
        (F.size('lag_9weeks_to_10weeks')   > 0) |
        (F.size('lag_10weeks_to_11weeks')  > 0) |
        (F.size('lag_11weeks_to_12weeks')  > 0) |
        (F.size('lag_12weeks_to_13weeks')  > 0) |
        (F.size('lag_13weeks_to_14weeks')  > 0) |
        (F.size('lag_14weeks_to_15weeks')  > 0) |
        (F.size('lag_15weeks_to_16weeks')  > 0) |
        (F.size('lag_16weeks_to_17weeks')  > 0) |
        (F.size('lag_17weeks_to_18weeks')  > 0) |
        (F.size('lag_18weeks_to_19weeks')  > 0) |
        (F.size('lag_19weeks_to_20weeks')  > 0) |
        (F.size('lag_20weeks_to_21weeks')  > 0) |
        (F.size('lag_21weeks_to_22weeks')  > 0) |
        (F.size('lag_22weeks_to_23weeks')  > 0) |
        (F.size('lag_23weeks_to_24weeks')  > 0) |
        (F.size('lag_24weeks_to_25weeks')  > 0) |
        (F.size('lag_25weeks_to_26weeks')  > 0) |
        (F.size('lag_26weeks_to_27weeks')  > 0)
)
sdf_pred.count()

In [None]:
print(lags)

In [None]:
for i, lag in enumerate(lags):
    sdf_pred = vectorizers[i].transform(sdf_pred)
    print(lag, '-> done')

In [None]:
features_pred = features_assembled(sdf_pred, feats=feats)

In [None]:
predictions_future = model.transform(features_pred)

In [None]:
%%time

df_pred = sdf_pred.select(sdf_pred.profile_id).toPandas()
df_pred.head()

In [None]:
%%time

df_predictions_future = predictions_future.withColumn(
    'tmp',
    vector_to_array('probability')
).select(
    F.col('tmp')[1].alias('prob_next7days')
).toPandas()
df_predictions_future.head()

In [None]:
import datetime

"""
You should change the days=6 to match how many days ahead you are making the predictions. 
You will find this in your SHIFT variable. 

Your starting date (from which the X days will be added, is your datetime in the variable: 

'flag_max_datetime'

For example, flag_max_datetime = datetime.datetime(2024, 5, 21, 23, 59, 59) with the future_datetime added will give predictions for the 28th May 2024. 


"""

# Get the current datetime
pred_max_datetime = flag_max_datetime

# Add 7 days to the current datetime
future_datetime = pred_max_datetime + datetime.timedelta(days=6)

# Format the future datetime as "YYYYMMDD_HHMMSS"
future_datetime_str = future_datetime.strftime("%Y%m%d_%H%M%S")

In [None]:
file_path_preds = f'{PRJ_PATH}/work/{VER}/preds_{future_datetime_str}.csv'
df_pred.join(df_predictions_future).to_csv(file_path_preds, header=True)