In [3]:
!pip install --upgrade pip --quiet
!pip install --upgrade awscli python-dotenv --quiet
%load_ext dotenv
%dotenv env





[0m

In [4]:
!pip install pqdm --quiet

[0m

In [5]:
!rm -r rl_ready_data_conv/*

In [6]:
# !aws s3 sync rl_ready_data_conv s3://dissertation-data-dmiller/rl_ready_data_conv --delete
# !aws s3 sync rl_ready_data s3://dissertation-data-dmiller/rl_ready_data --delete

In [7]:
# ! rm *.csv *.json

In [8]:
# %load rl_constant
LABEL = [
    "continue_work_session_30_minutes"
]

METADATA = [
    "user_id",
    "session_30_raw",
    "cum_platform_event_raw",
    "cum_platform_time_raw",
    "cum_session_time_raw",
    "cum_session_event_raw",
    "global_events_user",
    "global_session_time",
    "date_time"
]

OUT_FEATURE_COLUMNS = [
    "country_count", 
    "date_hour_sin", 
    "date_hour_cos",
    "date_minute_sin",
    "date_minute_cos",
    
    "session_30_count",
    "session_5_count",
    "cum_session_event",
    "cum_session_time",
    "expanding_click_average",
   
    "cum_platform_time",
    "cum_platform_event",
    "cum_projects",
    "average_event_time",
    "delta_last_event",
    
    "rolling_session_time",
    "rolling_session_events",
    "rolling_session_gap",
    "previous_session_time",
    "previous_session_events",
]

RESCALER_COLS = [
    'session_30_count',
    'session_5_count',
    'cum_session_event',
    'cum_session_time',
    'cum_platform_time',
    'cum_platform_event'
]

PREDICTION_COLS = [
    'pred',
    'label'
]


GROUPBY_COLS = ['user_id']

RL_STAT_COLS = [
    'session_size',
    'sim_size',
    'session_minutes',
    'sim_minutes',
    'reward',
    'cum_session_time_raw'
]

TORCH_LOAD_COLS = [
    'user_id',
    'date_time'
] + [
    "country_count", 
    "date_hour_sin", 
    "date_hour_cos",
    "date_minute_sin",
    "date_minute_cos",
    "expanding_click_average",
    "cum_projects",
    "average_event_time",
    "rolling_session_time",
    "rolling_session_events",
    "rolling_session_gap",
    "previous_session_time",
    "previous_session_events",
    "delta_last_event"
] + [
    'continue_work_session_30_minutes',
    'session_30_raw',
]

In [9]:
import logging
import os
import cudf
import pandas as pd
import numpy as np
from pprint import pprint, pformat
import cudf as gpu_pd

logging.basicConfig( level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s')


COLS_FOR_INFLECTION = [
    'user_id',
    'date_time',
] + OUT_FEATURE_COLUMNS + PREDICTION_COLS

class SessionCalculate:
    logger = logging.getLogger('rl_results_eval')
    def __init__(self, df, use_gpu) -> None:
        self.df = df
        self.use_gpu = use_gpu
            
        
    def calculate_inflections(self):
      
        self.logger.info('Calculating subsequent date time')
        self.df['next_date_time'] = self.df.groupby('user_id')['date_time'].shift(-1)
        self.df = self.df.drop_duplicates(subset=['user_id', 'date_time'], keep='last').reset_index()
        if self.use_gpu:
            self.logger.info('Bringing to CPU for second calculation')
            self.df = self.df.to_pandas()
           
            
        self.df['diff_seconds'] = (self.df['next_date_time'] - self.df['date_time']).apply(lambda x: x.total_seconds())
        
        self.logger.info('Diff seconds calculated')
        if self.use_gpu:
            self.logger.info('Bringing back to GPU for final calculations')
            self.df = cudf.from_pandas(self.df)

        self.df['diff_minutes'] = (self.df['diff_seconds'] / 60)
        self.df['session_5'] = (self.df['diff_minutes'] < 5)
        self.df['session_30'] = self.df['diff_minutes'] < 30
        
        self.df['session_30'] = self.df['session_30'].fillna(False)
        self.df['session_5'] = self.df['session_5'].fillna(False)        
        self.logger.info(f'Labels calculated: removing rows with diff seconds > 0')
       
        

        self.logger.info(f'Number of rows following drop: {self.df.shape[0]}')
        self.logger.info(f'Sorting rows by date time and applying row count')
        self.df = self.df.sort_values(['date_time']).reset_index()
        self.df['row_count'] = self.df.index.values
        self.logger.info(f'Sorted rows and applied row count on updated index')  
        self.logger.info('Calculating inflection points')
        self.df['user_id'] = self.df['user_id'].astype('int32')
        
       
        inflections_5_merge = self.df[self.df['session_5'] == False].sort_values(by=['date_time'])
        inflections_30_merge = self.df[self.df['session_30'] == False].sort_values(by=['date_time']) 
     
        self.logger.info('Calculating session 5 inflections') 
        inflections_5_merge['session_5'] = inflections_5_merge.groupby('user_id').cumcount() + 1
        inflections_5_merge = inflections_5_merge.rename(columns={'session_5': 'session_5_count'})
        
        self.logger.info('Calculating session 30 inflections')
        inflections_30_merge['session_30'] = inflections_30_merge.groupby('user_id').cumcount() + 1
        inflections_30_merge = inflections_30_merge.rename(columns={'session_30': 'session_30_count'})
        
        inflections_5_merge = inflections_5_merge[['user_id', 'date_time', 'row_count', 'session_5_count']].sort_values(by=['row_count', 'user_id'])
        inflections_30_merge = inflections_30_merge[['user_id', 'date_time', 'row_count', 'session_30_count']].sort_values(by=['row_count', 'user_id'])
        inflections_5_merge = inflections_5_merge.drop(columns=['date_time'])
        
        inflections_30_merge = inflections_30_merge.rename(columns={'date_time': 'session_end_time'})

        
        if self.use_gpu:
            self.logger.info('Bringing back to GPU for labelling')
            self.df, inflections_5_merge, inflections_30_merge = self.df.to_pandas(), inflections_5_merge.to_pandas(), inflections_30_merge.to_pandas()
            self.df = self.df.sort_values(by=['row_count', 'user_id'])
            self.df = pd.merge_asof(self.df, inflections_5_merge, on='row_count', by='user_id', direction='forward')
            self.df = pd.merge_asof(self.df, inflections_30_merge, on='row_count', by='user_id', direction='forward')
            self.df['session_terminates_30_minutes'] = (self.df['session_end_time'] - self.df['date_time']).apply(lambda x: x.total_seconds() / 60) < 30
            self.df = cudf.from_pandas(self.df)
        else:
            self.logger.info('Labelling on CPU')
            self.df = pd.merge_asof(self.df.sort_values(by=['row_count', 'user_id']), inflections_5_merge, on='row_count', by='user_id', direction='forward')

            self.df = pd.merge_asof(self.df.sort_values(by=['row_count', 'user_id']), inflections_5_merge, on='row_count', by='user_id', direction='forward')
            self.df = pd.merge_asof(self.df.sort_values(by=['row_count', 'user_id']), inflections_30_merge, on='row_count', by='user_id', direction='forward') 
            self.df['session_terminates_30_minutes'] = (self.df['session_end_time'] - self.df['date_time']).apply(lambda x: x.total_seconds() / 60) < 30
 
        self.logger.info('Inflections calculated')
 
        session_end_30_minutes = self.df[self.df['session_terminates_30_minutes'] == False].shape[0]
        self.logger.info(f'Percent sessions end in 30 minutes: {session_end_30_minutes / self.df.shape[0]}')
        self.logger.info(f'Columns for df') 
        self.logger.info(pformat(self.df.columns))
        
        return self.df
        

In [10]:
import os
import pandas as pd
import logging

from tqdm import tqdm
import numpy as np
global logger
logging.basicConfig( level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s')
logger = logging.getLogger('rl_results_eval')
from functools import reduce
from pprint import pformat
import cudf as gpu_pd
import cupy as gpu_np
pd.set_option('display.max_columns', None)
pd.set_option('display.max_rows', 200)
pd.set_option('mode.use_inf_as_na', True)
from pqdm.processes import pqdm
from cuml.preprocessing import MinMaxScaler

def convolve_delta_events(df, window, write_path):
    
    df = df.to_pandas()
   
    df = df.replace([np.inf, -np.inf], np.nan)
    before_resample = df.shape
    logger.info(f'Convolutional shape before resample: {before_resample}')
    logger.info(f'Convolution over delta last event')
    
    df['delta_last_event'] = (
        df.sort_values(by=['session_30_raw', 'cum_session_event_raw']) \
            .set_index('date_time') \
            .groupby(by=['user_id', 'session_30_raw'], group_keys=False) \
            .rolling(f'{window}T', min_periods=1)['delta_last_event'] \
            .mean()
            .reset_index(name='convolved_event_delta')['convolved_event_delta'] \
    ) 

    df = df.drop(columns=['session_30_raw'])
    df = df.loc[:,~df.columns.duplicated()].reset_index(drop=True)  
    

    
    df = gpu_pd.from_pandas(df)
  
    # remove duplicate columns
    df['year'] = df['date_time'].dt.year
    df['month'] = df['date_time'].dt.month
    df['day'] = df['date_time'].dt.day
    df['hour'] = df['date_time'].dt.hour
    df['minute'] = df['date_time'].dt.minute
    df['second'] = df['date_time'].dt.second
    
    df['user_id'] = df['user_id'].astype('int32')
    
   
    resampled_df = df.sort_values(by='date_time') \
        .drop_duplicates(subset=['user_id', 'year', 'month', 'day', 'hour', 'minute'], keep='last') \
        .sort_values(by=['date_time']) \
        .reset_index()
    
    logger.info(f'Convolution complete: {before_resample} -> {resampled_df.shape}')
    logger.info(f'Writing intermediate results to {write_path}_convolve.parquet')
    
    resampled_df.to_parquet(os.path.join(write_path, 'convolve.parquet'))
    
    logger.info(f'Recalculating inflections')
    resample_events = SessionCalculate(resampled_df, use_gpu=True)
    resampled_event_out = resample_events.calculate_inflections()
   
    logger.info(f'Events sessionized: writing to {write_path}_session.parquet')
    
    resampled_event_out.to_parquet(os.path.join(write_path, 'session.parquet'))
    logger.info(f'Events resampled')
    
    return resampled_event_out
     
     

def generate_metadata_session(dataset):
    
    logger.info(f'Calculating session size and minutes')
    session_size = dataset.groupby(['user_id', 'session_30_count'])['cum_session_event'].max().reset_index(name='session_size')
    session_minutes = dataset.groupby(['user_id', 'session_30_count'])['cum_session_time'].max().reset_index(name='session_minutes')
    
    
    logger.info(f'Calculating sim size and minutes')
    sim_minutes = dataset.groupby(['user_id', 'session_30_count'])['cum_session_time'].quantile(.5, interpolation='nearest').reset_index(name='time_cutoff')
    sim_size = dataset.groupby(['user_id', 'session_30_count'])['cum_session_event'].quantile(.5, interpolation='nearest').reset_index(name='size_cutoff')
    
    
    sessions = [session_size, session_minutes, sim_minutes, sim_size]
    logger.info(f'Merging metadata')
    sessions = reduce(lambda left, right: pd.merge(left, right, on=['user_id', 'session_30_count']), sessions)
   
    logger.info(f'Merging metadata complete')
    dataset = pd.merge(dataset, sessions, on=['user_id', 'session_30_count'])

    return dataset



def setup_data_at_window(df, window, write_path):
    logger.info(f'Convolution over {window} minute window')
    df = df.sort_values(by='date_time')
    df['cum_session_event_raw'] = df.groupby(['user_id', 'session_30_raw'])['date_time'].cumcount() + 1
    df = df.sort_values(by='date_time').reset_index(drop=True)
    df  = convolve_delta_events(df, window, write_path)
    logger.info(f'Convolving over {window} minute window complete: generating metadata')
    logger.info(f'Generating metadata complete')
    return df

def partition_and_scale_data(df):
    scaler = MinMaxScaler(feature_range=(-1, 1))
    train_split, eval_split = df[:int(df.shape[0] * .7)], df[int(df.shape[0] * .7):]
    scaler.fit_transform(train_split[RESCALER_COLS])
    scaler.transform(eval_split[RESCALER_COLS])
    return train_split, eval_split


def _parralel_partition_users(unique_sessions, df, index, vec_df_path):
    subset_session = df.merge(unique_sessions, on=['user_id', 'session_30_count_raw'], how='inner').reset_index(drop=True)
    subset_session.to_parquet(f'{vec_df_path}/batch_{index}.parquet')
    

def batch_environments_for_vectorization(df, n_envs, vec_df_path):
   
    df[['user_id', 'session_30_count_raw']] = df[['user_id', 'session_30_count_raw']].astype(int)
   
    unique_sessions = df[['user_id', 'session_30_count_raw']].drop_duplicates().sample(frac=1).reset_index(drop=True)
    logger.info(f'Unique sessions shape: {unique_sessions.shape}. Splitting into {n_envs} environments')
    unique_session_split = np.array_split(unique_sessions, n_envs)
    
    unique_session_args = [{
        'unique_sessions': sess,
        'df': df,
        'index': i,
        'vec_df_path': vec_df_path,
    } for i, sess in enumerate(unique_session_split)]

    logger.info(f'Environments split: running parralel partitioning')
    result = pqdm(unique_session_args, _parralel_partition_users, n_jobs=os.cpu_count() * 2, argument_type='kwargs')
    logger.info(f'Environments split: finished parralel partitioning')
    return result


def reset_intra_session(df):
   
    logger.info(f'Dropping sessions with less than one event') 
    
    
    
    logger.info(f'Resetting cum_session_event_count')
    df['cum_session_event'] = df.groupby(['user_id', 'session_30_count'])['date_time'].cumcount() + 1
    logger.info(f'Resetting cum_session_time and setting reward')
    df = df.to_pandas()
    df['reward'] = df.groupby(['user_id', 'session_30_count'])['date_time'].diff().dt.total_seconds().fillna(0) / 60
    df['reward'] = df[['reward', 'cum_session_event']].apply(lambda x: x['reward'] if x['cum_session_event'] > 1 else 0, axis=1)
    df['cum_session_time'] = df.groupby(['user_id', 'session_30_count'])['reward'].cumsum()
    
    logger.info(f'Resetting cum_platform_time and cum_platform_events')
    df['cum_platform_time'] = df.groupby(['user_id'])['reward'].cumsum()
    df['cum_platform_event'] = df.groupby(['user_id'])['cum_session_event'].cumcount()
    
    df = gpu_pd.from_pandas(df)
    
    return df

def get_dataset(read_path, conv_path, n_files, window, n_envs):
    
    conv_path, read_path = (
        os.path.join(conv_path, f'files_used_{n_files}'),
        os.path.join(read_path, f'files_used_{n_files}', 'predicted_data.parquet')
    )

    if not os.path.exists(conv_path):
        logger.info(f'Creating directory {conv_path}')
        os.makedirs(conv_path)
    
    logger.info(f'Convolutional dataset not found at {conv_path}: creating')
    logger.info(f'Getting dataset from {read_path}')
    columns = TORCH_LOAD_COLS + ['seq_20' if n_files == 2 else 'seq_40']
    df = gpu_pd.read_parquet(read_path, columns=columns)
    
    df['date_time'] = gpu_pd.to_datetime(df['date_time'])
    
    df = df.rename(columns={
        'continue_work_session_30_minutes': 'label',
        'seq_40' if 'seq_40' in df.columns else 'seq_20': 'pred'
    })
    

    logger.info(f'Non nan values: {df.count().min()}: 3ropping na')
    logger.info(f'NA values dropped: {df.count().min()}')
    
    df = df.sort_values(by='date_time')

   
    logger.info(f'Initial shape: {df.shape}: dropping na and inf')
    df = df.dropna()
    logger.info(f'Final shape: {df.shape}: dropping na and inf')
    
        
    base_conv_path = os.path.join(conv_path, f'window_{window}')
    intermediate_conv_path = os.path.join(base_conv_path, 'intermediate')
    if not os.path.exists(base_conv_path):
        logger.info(f'Creating directory {base_conv_path}')
        os.makedirs(base_conv_path)
    
    if not os.path.exists(intermediate_conv_path):
        logger.info(f'Creating directory {intermediate_conv_path}')
        os.makedirs(intermediate_conv_path)
        
    df = df.sort_values(by='date_time')
    logger.info(f'Subset setup complete: {df.shape}')
    df = setup_data_at_window(df, window, intermediate_conv_path)
    logger.info(f'Subset convolution complete: {df.shape}, resetting stats')
    df = df.sort_values(by='date_time')
    df = reset_intra_session(df)    
    logger.info(f'Intra session stats calculated: {df.shape}, saving intermediate')
    df.to_parquet(os.path.join(intermediate_conv_path, 'intra_session.parquet'))
    logger.info(f'Intra session reset complete: {df.shape}')
    logger.info(f'Stats reset complete, resetting metadata')
    df = df.to_pandas()
    df = generate_metadata_session(df)
    logger.info(f'Metadata reset complete: {df.shape}')
        
    is_monotic_increasing_sess_time = df.round(3).groupby(['user_id', 'session_30_count'])['cum_session_time'].is_monotonic_increasing.reset_index(name='is_monotic_increasing')
        
    if is_monotic_increasing_sess_time[is_monotic_increasing_sess_time['is_monotic_increasing'] == False].shape[0] > 0:
        logger.info(f'Non monotonic increasing reward found: perc {is_monotic_increasing_sess_time[is_monotic_increasing_sess_time["is_monotic_increasing"] == False].shape[0] / is_monotic_increasing_sess_time.shape[0]}')
        logger.info(is_monotic_increasing_sess_time[is_monotic_increasing_sess_time["is_monotic_increasing"] == False])
    else:
        logger.info(f'All rewards are monotonic increasing and no errors')
            
        
    is_monotic_increasing_date_time = df.round(3).groupby(['user_id'])['date_time'].is_monotonic_increasing.reset_index(name='is_monotic_increasing')
        
    if is_monotic_increasing_date_time[is_monotic_increasing_date_time['is_monotic_increasing'] == False].shape[0] > 0:
        logger.info(f'Non monotonic increasing date time found: perc {is_monotic_increasing_date_time[is_monotic_increasing_date_time["is_monotic_increasing"] == False].shape[0] / is_monotic_increasing_date_time.shape[0]}')
        logger.info(is_monotic_increasing_date_time[is_monotic_increasing_date_time["is_monotic_increasing"] == False])
    else:
        logger.info(f'All date times are monotonic increasing and no errors')
        
    logger.info(f'Rescaling feature cols: {RESCALER_COLS}')
    for col in RESCALER_COLS:
        df[f'{col}_raw'] = df[col] 
    train_split, eval_split =  partition_and_scale_data(df)
    
    train_path, eval_path = (os.path.join(base_conv_path, 'train.parquet'), os.path.join(base_conv_path, 'eval.parquet'))
    logger.info(f'Saving train splits o {train_path}')
    train_split.to_parquet(train_path)
    logger.info(f'Saving eval splits to {eval_path}')
    eval_split.to_parquet(eval_path)
    logger.info(f'Saving train and eval splits to complete: setting batches {n_envs}')
    batched_train_path, batched_eval_path = (os.path.join(base_conv_path, 'batched_train'), os.path.join(base_conv_path, 'batched_eval'))
    
    if not os.path.exists(batched_train_path):
        logger.info(f'Creating directory {batched_train_path}')
        os.makedirs(batched_train_path)
    
    if not os.path.exists(batched_eval_path):
        logger.info(f'Creating directory {batched_eval_path}')
        os.makedirs(batched_eval_path)
    logger.info(f'Writing training batches to {batched_train_path}')
    batch_environments_for_vectorization(train_split, n_envs, batched_train_path)
    logger.info(f'Writing eval batches to {batched_eval_path}')
    batch_environments_for_vectorization(eval_split, n_envs, batched_eval_path)
    
    
    
    
    





In [15]:
class Arguments:
    read_path = 'rl_ready_data'
    conv_path = 'rl_ready_data_conv'
    n_files = 30
    window = 1
    n_envs = 100

In [16]:
get_dataset(Arguments.read_path, Arguments.conv_path, Arguments.n_files, Arguments.window, Arguments.n_envs)

2023-06-05 08:29:52,494 - rl_results_eval - INFO - Creating directory rl_ready_data_conv/files_used_30
2023-06-05 08:29:52,495 - rl_results_eval - INFO - Convolutional dataset not found at rl_ready_data_conv/files_used_30: creating
2023-06-05 08:29:52,496 - rl_results_eval - INFO - Getting dataset from rl_ready_data/files_used_30/predicted_data.parquet
2023-06-05 08:29:55,733 - rl_results_eval - INFO - Non nan values: 36241442: 3ropping na
2023-06-05 08:29:55,734 - rl_results_eval - INFO - NA values dropped: 36241442
2023-06-05 08:29:55,863 - rl_results_eval - INFO - Initial shape: (36241442, 19): dropping na and inf
2023-06-05 08:29:55,954 - rl_results_eval - INFO - Final shape: (36241442, 19): dropping na and inf
2023-06-05 08:29:55,955 - rl_results_eval - INFO - Creating directory rl_ready_data_conv/files_used_30/window_1
2023-06-05 08:29:55,956 - rl_results_eval - INFO - Creating directory rl_ready_data_conv/files_used_30/window_1/intermediate
2023-06-05 08:29:56,085 - rl_results_e

KeyboardInterrupt: 

In [13]:
df = pd.read_parquet('rl_ready_data_conv/files_used_30/window_1/train.parquet', columns=[
    'user_id',
    'session_30_count_raw',
    'date_time',
    'pred',
    'label',
    'cum_session_event_raw',
    'cum_session_time_raw',
    'session_minutes',
    'time_cutoff'
])

In [14]:
df.sort_values(by=['user_id', 'session_30_count_raw', 'date_time', 'session_minutes', 'time_cutoff').head(100)

Unnamed: 0,user_id,session_30_count_raw,date_time,pred,label,cum_session_event_raw,cum_session_time_raw,session_minutes
25523,0,1,2021-10-20 06:18:56,0.213766,0.0,1,0.0,9.6
25524,0,1,2021-10-20 06:19:57,0.214752,0.0,2,1.016667,9.6
25525,0,1,2021-10-20 06:20:58,0.218108,0.0,3,2.033333,9.6
25526,0,1,2021-10-20 06:21:55,0.221232,0.0,4,2.983333,9.6
25527,0,1,2021-10-20 06:22:55,0.219664,0.0,5,3.983333,9.6
25528,0,1,2021-10-20 06:23:47,0.221552,0.0,6,4.85,9.6
25529,0,1,2021-10-20 06:24:59,0.215602,0.0,7,6.05,9.6
25530,0,1,2021-10-20 06:25:59,0.215242,0.0,8,7.05,9.6
25531,0,1,2021-10-20 06:26:58,0.215682,0.0,9,8.033333,9.6
25532,0,1,2021-10-20 06:27:13,0.224885,0.0,10,8.283333,9.6
