#Pipeline + Model

<a id="0"></a> <br>
 ## Notebook Plan  
1. [Libraries import](#libraries_import)     
1. [Function Declaration](#functions)
1. [Data Loading and Transformation](#data)
1. [Feature Engineering](#feature)     
1. [Ml Pipeline](#pipeline)   
1. [Statistics on days](#stat)   
1. [Splitting](#split)   




## Importing necesssary libraries <a class="anchor" id="libraries_import"></a>

In [None]:
# !pip install pyspark py4j pyarrow
# !pip install implicit
# !pip install sparktorch


import random
import os
import sys
import math as m
import time
from tqdm.notebook import tqdm
from google.colab import drive
import pandas as pd
import numpy as np
import datetime
import plotly
from plotly.subplots import make_subplots
import plotly.graph_objects as go
import plotly.figure_factory as ff
import plotly.express as px
import matplotlib.pyplot as plt

#pyspark
from pyspark.streaming import StreamingContext
import pyspark.sql.functions as F
from pyspark.sql import SparkSession
from pyspark.sql.functions import monotonically_increasing_id, pandas_udf,regexp_replace,col
from pyspark.sql.types import IntegerType, FloatType, BooleanType, DateType, StringType
from pyspark.sql.window import Window

#pyspark ml
from pyspark.ml.functions import vector_to_array
from pyspark.ml.recommendation import ALS
from pyspark.ml import Pipeline
from pyspark.ml.feature import OneHotEncoder,StringIndexer,QuantileDiscretizer, OneHotEncoder, StandardScaler, StringIndexer, VectorAssembler

#imlicit
from implicit.als import AlternatingLeastSquares
from implicit.nearest_neighbours import tfidf_weight

#torch
from sparktorch import serialize_torch_obj, SparkTorch, PysparkPipelineWrapper
import torch

## Declaring functions <a class="anchor" id="functions"></a>

In [None]:
def OneHotEncoding(dataframe: pd.DataFrame, input_col: pd.Series) -> pd.DataFrame:

  indexer = StringIndexer(inputCol=input_col, outputCol='output_1')
  indexer_fitted = indexer.fit(dataframe)
  df_indexed = indexer_fitted.transform(dataframe)

  encoder = OneHotEncoder(inputCols=['output_1'], outputCols=['output_2'], dropLast=False)
  df_onehot = encoder.fit(df_indexed).transform(df_indexed)

  df_col_onehot = df_onehot.select('*', vector_to_array('output_2').alias('output_3'))

  num_categories = len(df_col_onehot.first()['output_3'])
  cols_expanded = [(F.col('output_3')[i].alias(f'{indexer_fitted.labels[i]}')) for i in range(num_categories)]
  df_cols_onehot = df_col_onehot.select('*', *cols_expanded)
  df_cols_onehot = df_cols_onehot.select([column for column in df_cols_onehot.columns if column not in ['output_1','output_2','output_3']])
  return df_cols_onehot

def delete_brackets(dataframe: pd.DataFrame, column: pd.Series) -> pd.Series:
    df = dataframe.withColumn(column, F.translate(column, '[]', ' '))
    return df

def change_datatype_of_ohe(dataframe: pd.DataFrame) -> pd.DataFrame:
  for col,col_type in dataframe.dtypes:
    if col_type == 'double':
      dataframe = dataframe.withColumn(col, F.col(col).cast(BooleanType()))
  return dataframe

def avg_session_duration(dataframe:pd.DataFrame):
  df3 = dataframe.groupBy("user_id",'session_duration').agg(F.countDistinct('session_duration'))
  w = Window.partitionBy("user_id").orderBy('user_id')
  df3 = df3.withColumn('avg_session_duration', F.from_unixtime(F.sum(F.to_timestamp(F.col('session_duration')).cast('long')).over(w) / F.sum(F.to_timestamp(F.col('count(session_duration)')).cast('long')).over(w),"HH:mm:ss"))
  df3 = df3.select('user_id','avg_session_duration')
  dataframe = dataframe.join(df3, on=["user_id"],how='left')
  df3.unpersist()

  return dataframe

def day_time(dataframe:pd.DataFrame):
  dataframe = dataframe.withColumn('day_time', F.when(F.hour(F.to_timestamp(F.col('utc_event_time'))).between(6, 8),'early morning') \
                            .when(F.hour(F.to_timestamp(F.col('utc_event_time'))).between(9, 11),'morning') \
                            .when(F.hour(F.to_timestamp(F.col('utc_event_time'))).between(12, 16),'afternoon') \
                            .when(F.hour(F.to_timestamp(F.col('utc_event_time'))).between(17, 23),'evening') \
                            .otherwise('night'))
  return dataframe

def day_of_week(dataframe:pd.DataFrame):
  dataframe = dataframe.withColumn('day_of_week', F.dayofweek(F.col('utc_event_date')))

  return dataframe


def mean_product_price_per_person(dataframe:pd.DataFrame):
  df3 = dataframe.groupBy("user_id").agg(F.mean('price').alias("mean_price"))
  dataframe = dataframe.join(df3, on=["user_id"],how='left')
  df3.unpersist()

  return dataframe


def interacted_categories_per_person(dataframe:pd.DataFrame):
  df3 = dataframe.groupBy("user_id").agg(F.countDistinct('main_category').alias("inter_categ"))
  dataframe = dataframe.join(df3, on=["user_id"],how='left')
  df3.unpersist()

  return dataframe

def interacted_goods_per_person(dataframe:pd.DataFrame):
  df3 = dataframe.groupBy("user_id").agg(F.countDistinct('product_id').alias("inter_goods"))
  dataframe = dataframe.join(df3, on=["user_id"],how='left')
  df3.unpersist()

  return dataframe

def avg_goods_per_person(dataframe:pd.DataFrame):
  df3 = dataframe.groupBy("user_id",'session_id').agg(F.countDistinct('product_id').alias("num_of_goods"))
  expr = [F.count(F.col("num_of_goods")),F.count(F.col("session_id"))]
  df3 = df3.groupBy("user_id").agg(*expr)
  df3 = df3.withColumn('avg_goods_per_session', F.col('count(num_of_goods)') / F.col('count(session_id)'))
  dataframe = dataframe.join(df3.select(["user_id",'avg_goods_per_session']), on=["user_id"],how='left')
  df3.unpersist()

  return dataframe

def change_dtype(dataframe:pd.DataFrame, int_cols:list, string_cols:list, date_cols:list):
  for column in dataframe.columns:
    if column in int_cols:
      dataframe = dataframe.withColumn(col,F.col(column).cast(IntegerType()))
    elif column in string_cols:
      dataframe = dataframe.withColumn(col,F.to_timestamp(column).cast(StringType()))
    elif column in date_cols:
      dataframe = dataframe.withColumn(col,F.to_timestamp(column).cast(DateType()))
    else:
      pass

  return dataframe

def split_on_sessions(dataframe:pd.DataFrame):
  #time dofference with previous event
  window = Window.partitionBy("user_id").orderBy("user_id","utc_event_time")
  df_new = dataframe.withColumn("id", monotonically_increasing_id()) # add id
  df.unpersist()
  df_new = df_new.withColumn("prev_value", F.lag(F.to_timestamp(F.col("utc_event_time"))).over(window)) #keep previous records
  df_new = df_new.withColumn("min_diff", F.when(F.isnull(F.to_timestamp(F.col("utc_event_time")).cast("long") - F.col("prev_value").cast("long")), np.nan) #count difference in minutes
                                .otherwise(F.from_unixtime((F.to_timestamp(F.col("utc_event_time")).cast("long") - F.to_timestamp(F.col("prev_value")).cast("long")), "HH:mm:ss")))


  # df with starts of sessions
  sessions_start_df = df_new[(F.col('min_diff') == np.nan) | (F.col('min_diff') > '00:30:00' )]
  sessions_start_df = sessions_start_df.withColumn("session_id", F.col('id'))
  sessions_start_df = sessions_start_df.select('id','session_id')


  #numbering sessions
  df_new = df_new.join(sessions_start_df, on=["id"],how='left')
  sessions_start_df.unpersist()

  w = Window.orderBy('user_id','utc_event_time')
  df_new = df_new.withColumn('session_id', F.when(F.isnull(F.col('session_id')),F.last('session_id', True).over(w)).otherwise(F.col('session_id')))
  df_new = df_new.sort(F.col('user_id'),F.col('utc_event_time'))


  # start of the session
  df_new = df_new.withColumn("is_first_event_in_session", F.col('id') == F.col('session_id'))


  # session duration
  expr = [F.min(F.col("utc_event_time")),F.max(F.col("utc_event_time"))]
  df2 = df_new.groupBy("session_id").agg(*expr)
  df2 = df2.withColumn('session_duration',F.from_unixtime(F.to_timestamp(F.col('max(utc_event_time)')).cast('long') - F.to_timestamp(F.col('min(utc_event_time)')).cast('long'),"HH:mm:ss"))
  df2 = df2.select('session_id','session_duration')
  df_new = df_new.join(df2, on=["session_id"],how='left').drop('id','min_diff','prev_value')
  df2.unpersist()

  return df_new

def product_popularity(dataframe: pd.DataFrame):
  num_of_users = dataframe.select('user_id').distinct().count()
  df1 = dataframe.groupBy('product_id').agg(F.count_distinct(F.col('user_id')) / num_of_users)
  dataframe = dataframe.join(df1, on=['product_id'], how = 'left')
  df1.unpersist()

  return dataframe

## Loading & transforming DataFrame <a class="anchor" id="data"></a>

Mounting a csv-file

In [None]:
drive.mount('/content/drive/', force_remount=True)

Mounted at /content/drive/


In [None]:
!unzip '/content/drive/MyDrive/Colab Notebooks/data_v2_new.zip'
!unzip '/content/drive/MyDrive/Colab Notebooks/data_v2_old.zip'

Archive:  /content/drive/MyDrive/Colab Notebooks/data_v2_new.zip
  inflating: ab_data_new.csv         
Archive:  /content/drive/MyDrive/Colab Notebooks/data_v2_old.zip
  inflating: ab_data_old.csv         


Creating a PySpark session and getting a DataFrame

In [None]:
os.environ['PYSPARK_PYTHON'] = sys.executable
os.environ['PYSPARK_DRIVER_PYTHON'] = sys.executable


spark = SparkSession.builder\
        .appName('Model')\
        .master('local[*]') \
        .getOrCreate()

spark.sparkContext.setCheckpointDir("/content/drive/MyDrive/Colab Notebooks/spark_checkpoints")
spark.conf.set("spark.sql.execution.arrow.enabled", "true")
spark.conf.set("spark.sql.autoBroadcastJoinThreshold", -1)

spark

In [None]:
df = spark.createDataFrame(
    [("a", 1, 0), ("a", -1, 42), ("b", 3, -1), ("b", 10, -2),("c", 10, -2),("d", 10, -2),("e", 10, -2),("f", 10, -2),("g", 10, -2)],
    ("key", "value1", "value2")
)

In [None]:
df.show()

+---+------+------+
|key|value1|value2|
+---+------+------+
|  a|     1|     0|
|  a|    -1|    42|
|  b|     3|    -1|
|  b|    10|    -2|
|  c|    10|    -2|
|  d|    10|    -2|
|  e|    10|    -2|
|  f|    10|    -2|
|  g|    10|    -2|
+---+------+------+



In [None]:
def rowwise_function(df):
  a = df.select(F.min(F.col('value1'))).collect()[0][0]
  # + random.randint(1,10)
  return a

In [None]:
df1 = df
# apply our function to RDD
cases_rdd_new = df.rdd.map(lambda row: rowwise_function(df1))
# Convert RDD Back to DataFrame
casesNewDf = spark.createDataFrame(cases_rdd_new)
casesNewDf.show()

Traceback (most recent call last):
  File "/usr/local/lib/python3.10/dist-packages/pyspark/serializers.py", line 459, in dumps
    return cloudpickle.dumps(obj, pickle_protocol)
  File "/usr/local/lib/python3.10/dist-packages/pyspark/cloudpickle/cloudpickle_fast.py", line 73, in dumps
    cp.dump(obj)
  File "/usr/local/lib/python3.10/dist-packages/pyspark/cloudpickle/cloudpickle_fast.py", line 632, in dump
    return Pickler.dump(self, obj)
  File "/usr/local/lib/python3.10/dist-packages/pyspark/context.py", line 462, in __getnewargs__
    raise RuntimeError(
RuntimeError: It appears that you are attempting to reference SparkContext from a broadcast variable, action, or transformation. SparkContext can only be used on the driver, not in code that it run on workers. For more information, see SPARK-5063.


PicklingError: ignored

In [None]:
path_old = '/content/ab_data_old.csv'
path_new = '/content/ab_data_new.csv'

df_old = spark.read.csv(path_old, header=True)
df_new = spark.read.csv(path_new, header=True)
df = df_old.unionByName(df_new, allowMissingColumns=True)
# df = df.repartitionByRange(200, "utc_event_date")
# print('Rows: ', df.count())
df.limit(10).show()

+--------+--------------------+--------------+--------------------+--------------+-------------+--------+--------------------+--------------------+--------------------+
|platform|      utc_event_time|utc_event_date|             user_id|    event_type|ecom.price100|ecom.qty|             ecom.nm|       main_category|        sub_category|
+--------+--------------------+--------------+--------------------+--------------+-------------+--------+--------------------+--------------------+--------------------+
|    Site|2023-07-31 20:52:...|    2023-07-31|61896930866132383...|ec.add_to_cart|      [27700]|     [1]|[2081100339580357...|[6968191755455670...|[1664831343325037...|
|    Site|2023-07-31 20:54:...|    2023-07-31|61896930866132383...|ec.add_to_cart|      [20100]|     [1]|[2150579222891727...|[6968191755455670...|[1664831343325037...|
|    Site|2023-07-31 20:58:...|    2023-07-31|61896930866132383...|ec.add_to_cart|      [27700]|     [1]|[2081100339580357...|[6968191755455670...|[1664831

Changing column types & names

In [None]:
start_time = time.time()
print("\x1b[31m\"DataFrame before\"\x1b[0m")
df.printSchema()

#changing names
new_names = ['platform', 'utc_event_time','utc_event_date','user_id','event_type','price','quantity','product_id','main_category','sub_category']
df = df.toDF(*new_names)

#deleting '[' and ']' and '.' from str data
for col in ['user_id',"price","quantity","product_id",'main_category','sub_category']:
  df = delete_brackets(df, col)
df = df.withColumn('event_type', F.regexp_replace('event_type', 'ec.', ''))

print("\x1b[31m\"DataFrame after\"\x1b[0m")
df.printSchema()
print("--- %s seconds ---" % (time.time() - start_time))

[31m"DataFrame before"[0m
root
 |-- platform: string (nullable = true)
 |-- utc_event_time: string (nullable = true)
 |-- utc_event_date: string (nullable = true)
 |-- user_id: string (nullable = true)
 |-- event_type: string (nullable = true)
 |-- ecom.price100: string (nullable = true)
 |-- ecom.qty: string (nullable = true)
 |-- ecom.nm: string (nullable = true)
 |-- main_category: string (nullable = true)
 |-- sub_category: string (nullable = true)

[31m"DataFrame after"[0m
root
 |-- platform: string (nullable = true)
 |-- utc_event_time: string (nullable = true)
 |-- utc_event_date: string (nullable = true)
 |-- user_id: string (nullable = true)
 |-- event_type: string (nullable = true)
 |-- price: string (nullable = true)
 |-- quantity: string (nullable = true)
 |-- product_id: string (nullable = true)
 |-- main_category: string (nullable = true)
 |-- sub_category: string (nullable = true)

--- 0.29206395149230957 seconds ---


Counting Null/NaN values

In [None]:
# start_time = time.time()
# df1 = df.select('utc_event_time','user_id','price','quantity', 'product_id', 'main_category','sub_category')
# df2 = df.select([c for c in df.columns if c not in ['utc_event_time','user_id','price','quantity', 'product_id']])

# df1.select([(F.count(F.when(F.isnan(c) | F.col(c).isNull(), c))/F.count(F.lit(1))).alias(c) for c in df1.columns]).show()
# df2.select([(F.count(F.when(F.col(c).isNull(), c))/F.count(F.lit(1))).alias(c) for c in df2.columns]).show()
# print("--- %s seconds ---" % (time.time() - start_time))

In [None]:
#Drop missing values
df = df.dropna(how='any')

Checking 'quantity' for reasonable values

In [None]:
#df.select('quantity').distinct().collect()

In [None]:
# replace 0 with 1
df = df.withColumn("quantity", F.when(F.col("quantity") == 0, 1).otherwise(F.col("quantity")))
# df.select('quantity').distinct().collect()

Checking 'price' for reasonable values

In [None]:
# df.select('price').describe().show()
# q25 = df.approxQuantile(["price"], [0.25], 0.1)[0][0]
# q75 = df.approxQuantile(["price"], [0.75], 0.1)[0][0]
# q90 = df.approxQuantile(["price"], [0.90], 0.1)[0][0]
# q95 = df.approxQuantile(["price"], [0.95], 0.1)[0][0]
# print('25% значение - ', q25)
# print('75% значение - ', q75)
# print('90% значение - ', q90)
# print('95% значение - ', q95)

In [None]:
# limit upper boundary to 0.9 percentile
# df = df.withColumn("price", F.col("price").cast(IntegerType()))
# q90 = df.approxQuantile(["price"], [0.90], 0.2)[0][0]
# df = df.withColumn("price", F.when(F.col("price") >= q90, q90).otherwise(F.col("price")))
# df = df.withColumn("price", F.col("price").cast(IntegerType()))

In [None]:
start_time = time.time()
df.limit(10).show()
print("--- %s seconds for printing ---" % (time.time() - start_time))

+--------+--------------------+--------------+--------------------+-----------+------+--------+--------------------+--------------------+--------------------+
|platform|      utc_event_time|utc_event_date|             user_id| event_type| price|quantity|          product_id|       main_category|        sub_category|
+--------+--------------------+--------------+--------------------+-----------+------+--------+--------------------+--------------------+--------------------+
|    Site|2023-07-31 20:52:...|    2023-07-31|61896930866132383...|add_to_cart| 27700|       1| 2081100339580357...| 6968191755455670...| 1664831343325037...|
|    Site|2023-07-31 20:54:...|    2023-07-31|61896930866132383...|add_to_cart| 20100|       1| 2150579222891727...| 6968191755455670...| 1664831343325037...|
|    Site|2023-07-31 20:58:...|    2023-07-31|61896930866132383...|add_to_cart| 27700|       1| 2081100339580357...| 6968191755455670...| 1664831343325037...|
|    Site|2023-07-31 21:00:...|    2023-07-31|

## Adding new features <a class="anchor" id="feature"></a>



- <b>session duration </b>

  We consider a session - continuous interactions with the markeplace within 30 min. If event_k - event_n > 30, event_k is a start of a new session.

  For each session we count its duration.
- <b>is_first_event_in_session</b>

  Marking each event as a starting (true) or not (false)
- <b>session_id</b>

  Giving each session unique index


- <b>avg_session_duration</b>

  Average session duration per user (sum of unique durations/ sum unique sessions)

- <b>day_time </b>

  Specifying utc_event_time on

  - Early Morning: (06.00.00 - 09.00.00)

  - Morning: (09.00.00 - 12.00.00)

  - Afternoon: (12.00.00 - 17.00.00)

  - Evening: (17.00.00 - 24.00.00)

  - Night (00.00.00 - 06.00.00)

- <b>day_of_week</b>

  Derive a day from the date and write in number

- <b>mean_product_price</b>

  Average product price the user has interected with

- <b>inter_categ</b>

  Number of categories the user paid attention to

- <b>inter_goods</b>

  Number of products the user paid attention to

- <b>avg_goods_per_session </b>

  Average Number of products the user paid attention to throught his sessions

In [None]:
df = split_on_sessions(df)
df = avg_session_duration(df)
df = day_time(df)
df = day_of_week(df)
df = mean_product_price_per_person(df)
df = interacted_categories_per_person(df)
df = interacted_goods_per_person(df)
df = avg_goods_per_person(df)
# df= product_popularity(df)


int_cols = ['price','quantity','day_of_week']
date_col = ['utc_event_time','utc_event_date','session_duration','avg_session_duration']
string_cols = ['platform','user_id','product_id','main_category','sub_category','event_type']

df = change_dtype(df,int_cols=int_cols, string_cols=string_cols, date_cols=date_col)

df.write.mode("overwrite").saveAsTable('df_tab', format="parquet")
df = spark.read.table('df_tab')

Py4JJavaError: ignored

## ML pipeline <a class="anchor" id="pipeline"></a>

In [None]:
cat_columns = ['platform','event_type','day_time','day_of_week']
num_cols = ['price','quantity','session_duration','avg_session_duration','mean_price','inter_categ','inter_goods','avg_goods_per_session']

In [None]:
stages = []

#OHE of cat_cols
for cat_column in cat_columns:
  stringIndexer = StringIndexer(inputCol = cat_column, outputCol = cat_column +'_idx')
  encoder = OneHotEncoder(inputCols=[stringIndexer.getOutputCol()],outputCols=[cat_column +'_categ'])

  stages += [stringIndexer, encoder]

# indexing
stages += [StringIndexer(inputCol = 'is_first_event_in_session', outputCol = 'is_first_event_in_session' +'_idx')]
#qcut of price in 5 buckets
price_buckets = QuantileDiscretizer(numBuckets=5, inputCol='price', outputCol='price' + "bucket")
stages += [price_buckets]

#scaling num_cols
for num_col in num_cols:
  scaler = StandardScaler(inputCol=num_col, outputCol=num_col + "_scal",
                          withStd=True, withMean=False)
  stages += [scaler]

#assembling

assemble_inputs = [c + '_categ' for c in cat_columns] + num_cols
assembler = VectorAssembler(inputCols = assemble_inputs, outputCol = 'features')
stages += [assembler]

In [None]:
pipeline = Pipeline(stages=stages)

## Statistics on days <a class="anchor" id="stat"></a>

In [None]:
expr = [F.countDistinct(F.col('user_id')), F.count(F.col('event_type'))]
df_time = df.groupBy(F.col("utc_event_date")).agg(*expr)
pdf_time = df_time.toPandas()

In [None]:
df1 = df.groupBy('utc_event_date').agg(F.collect_set(F.col('user_id')).alias('unique_users_id'))

my_window = Window.orderBy("utc_event_date")
df1 = df1.withColumn("prev_users", F.lag((F.col("unique_users_id"))).over(my_window))
df1 = df1.withColumn("future_users", F.lead((F.col("unique_users_id"))).over(my_window))
df1 = df1.withColumn("prev_inters", F.array_intersect(F.col('prev_users'), F.col('unique_users_id')))
df1 = df1.withColumn("future_inters", F.array_intersect(F.col('future_users'), F.col('unique_users_id')))

for row in ['unique_users_id','prev_inters','future_inters']:
  df1 = df1.withColumn(row, F.size(F.col(row)))

df1 = df1.withColumn('previous_overlap', F.round(F.col('prev_inters') / F.col('unique_users_id'),2))
df1 = df1.withColumn('future_overlap', F.round(F.col('future_inters') / F.col('unique_users_id'),2))
pdf1 = df1.select("utc_event_date",'previous_overlap','future_overlap').toPandas()

In [None]:
print(f"Min date: {pdf_time['utc_event_date'].min()}\nMax date: {pdf_time['utc_event_date'].max()}")
pdf1.merge(pdf_time, how='left', on ='utc_event_date').set_index('utc_event_date').T

Min date: 2023-07-31
Max date: 2023-08-28


utc_event_date,2023-07-31,2023-08-01,2023-08-02,2023-08-03,2023-08-04,2023-08-05,2023-08-06,2023-08-07,2023-08-08,2023-08-09,...,2023-08-19,2023-08-20,2023-08-21,2023-08-22,2023-08-23,2023-08-24,2023-08-25,2023-08-26,2023-08-27,2023-08-28
previous_overlap,0.0,0.56,0.56,0.56,0.57,0.56,0.53,0.53,0.56,0.57,...,0.56,0.54,0.54,0.56,0.57,0.57,0.57,0.56,0.55,0.54
future_overlap,0.55,0.55,0.55,0.55,0.52,0.57,0.57,0.57,0.57,0.57,...,0.57,0.57,0.57,0.57,0.56,0.56,0.54,0.57,0.57,0.0
count(user_id),116709.0,114509.0,113913.0,113740.0,108470.0,101025.0,106932.0,113279.0,115504.0,115474.0,...,109438.0,115354.0,121534.0,123532.0,122131.0,118955.0,117778.0,113898.0,118328.0,123822.0
count(event_type),3451387.0,3393743.0,3379766.0,3405500.0,3143646.0,3031366.0,3383943.0,3389026.0,3576467.0,3539820.0,...,3416461.0,3751232.0,3594258.0,3772552.0,3715644.0,3545791.0,3407302.0,3430817.0,3609939.0,3562750.0


__________

* Всего имеем данные за 3 недели (21 дней).
* Распределение пользователей и их активноcтей одинаково в каждом дне.
* Пересечение пользователей по дням одинаково.
* 80% (17 дней) отправляем на обучение , на остальных 20% (4 днях) будет тестировать модели.
* 17 дней разделим ещё на 2 части: по 13 и 4 дней для обучения и валидации модели 1 и 2 уровня

__________

## Splitting <a class="anchor" id="split"></a>

In [None]:
#global train-test
split_date = df_new.select(F.to_timestamp(F.max(F.col("utc_event_date")))).collect()[0][0] - datetime.timedelta(days=4)
global_train = df_new.where(df_new.utc_event_date <= split_date)
global_test = df_new.where(df_new.utc_event_date > split_date)
global_train.persist()

## Creating Model <a class="anchor" id="model"></a>

In [None]:
global_train.show(15)

In [None]:
pipeline_model = pipeline.fit(global_train)
train_prep = pipeline_model.transform(global_train)
test_prep = pipeline_model.transform(global_test)