In [None]:
import pandas as pd
import numpy as np
from sklearn.compose import ColumnTransformer
from sklearn import preprocessing
import pickle
import re
from scipy.stats import lognorm, skew, kurtosis, entropy

In [None]:
MS_PER_S = 1000
PATH_TRAIN_LOGS = "./data/external/train_logs.csv"
PATH_TRAIN_OUTCOMES = "./data/external/train_scores.csv"

In [None]:
ACTIVITY_CATEGORIES = ['Nonproduction', 'Input', 'Remove/Cut', 'Replace', 'Paste', 'Move']

In [None]:
def extract(path):

    X = pd.read_csv(path)
    X = X.sort_values(["id", "event_id"], ascending=[True, True])
    
    return X

In [None]:
def enrich_activity(X, is_training_run):

    # 'Move From' activity recorded with low-level cursor loc details
    # extract bigger-picture 'Move From'
    # QUESTION: what's the difference between Move From, and a cut+paste?
    X['activity_detailed'] = X['activity']
    X.loc[X['activity'].str.contains('Move From'), 'activity'] = 'Move'

    if is_training_run:

        pipeline = ColumnTransformer(
            transformers=[(
                'onehot_encode', 
                preprocessing.OneHotEncoder(
                    categories=[ACTIVITY_CATEGORIES], 
                    sparse=False, 
                    handle_unknown='infrequent_if_exist'
                    ),
                ["activity"]
            )],
            remainder='passthrough',
            verbose_feature_names_out=False
            )
        
        pipeline.fit(X)

        with open("pipeline_activity_onehot.pkl", "wb") as f:
            pickle.dump(pipeline, f)

    else:
        with open("pipeline_activity_onehot.pkl", "rb") as f:
            pipeline = pickle.load(f)

    original_categorical = X[['activity']]

    X_dtypes = X.dtypes.to_dict()
    X = pipeline.transform(X)
    X = pd.DataFrame(X, columns=pipeline.get_feature_names_out())
    X = pd.concat([X, original_categorical], axis=1)
    X = X.astype(X_dtypes)

    return X

In [None]:
def scrub_text_change(X):
    """
    Problems with initial text data:

    - Some hex expressions (\\xHH) not decoded. Instead, written literally.
        - Examples: emdash (\\x96), slanted quotations & ticks.
        
    - Some foreign characters (accent a, overring a) not anonymized with generic q.
    Problem confirmed via Kaggle data viewer, for id-event_id cases like 
    0916cdad-39 or 9f328eb3-19. Solutions:
        - An Input event cannot include multiple characters: 
        foreign character & something else. 
        Then, 
            - If Input event contains any emdash, overwrite as strictly emdash
            - If Input event contains no emdash & foreign character, overwrite with single q
            - If Move event, replace any foreign character with single q
    """

    X['text_change_original'] = X['text_change']

    # expect this transforms all \xHH literals
    X['text_change'] = (
        X
        ['text_change_original']
        # arrived at utf-8 encode, windows-1252 decode after several iterations.
        # tested latin-1, but not all \xHH instances caught.
        # tested utf-16, just rose errors.
        .apply(lambda x: x.encode(encoding='utf-8').decode("windows-1252"))
    )


    is_text_change_decode_english = (
        X['text_change'].apply(lambda x: x.isascii())
    )

    is_input_event_foreign_any_emdash = (
        (~ is_text_change_decode_english)
        & (X['activity'] == "Input") 
        & (X['text_change'].str.contains("—"))
    )
    X.loc[is_input_event_foreign_any_emdash, 'text_change'] = "—"

    is_input_event_foreign_no_overwrite = (
        (~ is_text_change_decode_english)
        & (X['activity'] == "Input")
        & (~ X['text_change'].str.contains("—"))
    )
    X.loc[is_input_event_foreign_no_overwrite, 'text_change'] = 'q'


    # given block text change, proceed one character at a time,
    # replacing foreign ones 
    def anonymize_non_ascii(x):
        value = ""
        for x_i in x:
            if not x_i.isascii():
                value += "q"
            else:
                value += x_i
        return value

    X['text_change'] = np.where(
        X['activity'].str.contains('Move|Remove|Paste|Replace', regex=True),
        X['text_change'].apply(lambda x: anonymize_non_ascii(x)),
        X['text_change']
    )

    X.drop(columns='text_change_original', inplace=True)

    return X

In [None]:
PAUSE_THRESHOLD_MS = 1000
N_ACTIVITIES_UNTIL_START_WINDOW_CLOSES = 100

def enrich_pauses(X):
    """
    Must infer pauses, as no explicit record indicates.
    'Latency' implies, any time delta between keystrokes.
    'Pause' implies, a 'significant' time delta, not just physical-mechanical
    requirement of typing.
    """

    X['up_time_lag1'] = X.groupby(['id'])['up_time'].shift(1)
    X['latency_time'] = X['down_time'] - X['up_time_lag1']

    X['preceding_pause_time'] = X['latency_time']
    # first record lacks preceding_pause_time (time before first key press)
    X.loc[X['event_id'] == 1, 'preceding_pause_time'] = X['down_time']
    # expect some negative pause times -- interpret as, no real pause
    has_no_real_pause = X['preceding_pause_time'] <= PAUSE_THRESHOLD_MS
    X.loc[has_no_real_pause, 'preceding_pause_time'] = None

    # not obvious how to tag "initial planning pause" 
    # tried "first 5 minutes", but when that pause is 10 minutes, that fails.
    # first XX minutes is fragile
    # first XX events may help -- what's your extent of pause before *action*?
    X['preceding_pause_time_start_window'] = X['preceding_pause_time']
    X.loc[
        X['event_id'] <= N_ACTIVITIES_UNTIL_START_WINDOW_CLOSES, 
        'preceding_pause_time_start_window'
        ] = None

    X['total_pause_time'] = (
        X
        .groupby(['id'])
        ['preceding_pause_time']
        .transform('sum')
        )
    X['rolling_pause_time'] = (
        X
        .groupby(['id'])
        ['preceding_pause_time']
        .cumsum()
        )
    X['rolling_pause_time_fraction'] = (
        X['rolling_pause_time'] / X['total_pause_time']
        )

    return X

In [None]:
SECONDS_PER_BURST = 2

def enrich_time_bursts(X, is_training_run):
    """
    If pause exceeds threshold duration, a "burst" has ended. 
    A burst is characterized by one dominant activity.
    """

    X['is_new_burst_start'] = (
        X['preceding_pause_time'] > MS_PER_S * SECONDS_PER_BURST
        ).astype(int)
    X.loc[X['event_id'] == 1, 'is_new_burst_start'] = 1
    X['burst_id'] = (
        X
        .groupby(['id'])
        ['is_new_burst_start']
        .cumsum()
        )
    X['burst_time_start'] = (
        X
        .groupby(['id', 'burst_id'])
        ['down_time']
        .transform('min')
        )
    X['burst_time_end'] = (
        X
        .groupby(['id', 'burst_id'])
        ['up_time']
        .transform('max')
        )
    X['burst_time_duration'] = X['burst_time_end'] - X['burst_time_start']
    

    for activity in ACTIVITY_CATEGORIES:

        X['burst_events_' + activity] = (
            X
            .groupby(['id', 'burst_id'])
            ['activity_' + activity]
            .transform('sum')
            ).astype(float)
        
    X['burst_type'] = (
        X
        [['burst_events_' + activity for activity in ACTIVITY_CATEGORIES]]
        .idxmax(axis=1)
        )
    X['burst_type'] = X['burst_type'].str.replace(
        "burst_events_", "", regex=True
        )


    if is_training_run:
        
        pipeline = ColumnTransformer(
            transformers=[(
                'onehot_encode', 
                preprocessing.OneHotEncoder(
                    categories=[ACTIVITY_CATEGORIES], 
                    sparse=False, 
                    handle_unknown='infrequent_if_exist'
                    ),
                ["burst_type"]
            )],
            remainder='passthrough',
            verbose_feature_names_out=False
            )
        
        pipeline.fit(X)
        
        with open("pipeline_burst_type_onehot.pkl", "wb") as f:
            pickle.dump(pipeline, f)

    else:
        with open("pipeline_burst_type_onehot.pkl", "rb") as f:
            pipeline = pickle.load(f)

    original_categorical = X['burst_type']
    X_dtypes = X.dtypes.to_dict()
    X = pipeline.transform(X)
    X = pd.DataFrame(X, columns=pipeline.get_feature_names_out())
    X = pd.concat([X, original_categorical], axis=1)
    X = X.astype(X_dtypes)

    for activity in ACTIVITY_CATEGORIES:
        X['is_new_burst_start_' + activity] = (
            X['is_new_burst_start'] * 
            X['burst_type_' + activity]
            )
    
    return X

In [None]:
def enrich_activity_streaks(X):
    """
    Consecutive activity (independent of time) suggests productive writing flow 
    """

    X['activity_lag1'] = X.groupby(['id'])['activity'].shift(1)

    X['is_new_activity_streak_start'] = (
        X['activity'] != X['activity_lag1']
        ).astype(int)
    X.loc[X['event_id'] == 1, 'is_new_activity_streak_start'] = 1

    X['is_activity_streak_end'] = (
        X
        .groupby(['id'])
        ['is_new_activity_streak_start']
        .shift(-1)
        )
    X['is_activity_streak_end'] = X['is_activity_streak_end'].fillna(1) 

    X['activity_streak_id'] = (
        X
        .groupby(['id'])
        ['is_new_activity_streak_start']
        .cumsum()
    )

    X['activity_streak_length_thin'] = (
        X
        .groupby(['id', 'activity_streak_id'])
        .transform('size')
    )
    X.loc[
        X['is_activity_streak_end'] == 0, 
        'activity_streak_length_thin'
        ] = None

    for activity in ACTIVITY_CATEGORIES:
        X['is_new_activity_streak_start_' + activity] = (
            X["activity_" + activity] * X['is_new_activity_streak_start']
            )

    return X

In [None]:
def enrich_word_count(X):
    """
    Word count is a primary productivity measure. 
    Expect score to increase with word count.
    """

    X['word_count_lag1'] = X.groupby(['id'])['word_count'].shift(1)
    X['word_count_delta_event'] = X['word_count'] - X['word_count_lag1']

    X['word_count_delta_burst'] = (
        X
        .groupby(['id', 'burst_id'])
        ['word_count_delta_event']
        .transform('sum')
        )
    # de-duplication allows easier downstream aggregation
    X['word_count_delta_burst_thin'] = X['word_count_delta_burst']
    X.loc[X['is_new_burst_start'] == 0, 'word_count_delta_burst_thin'] = None

    X['word_count_delta_activity_streak'] = (
        X
        .groupby(['id', 'streak_id'])
        ['word_count_delta_event']
        .transform('sum')
        )
    # de-duplicate to one value per burst -- easier for downstream aggregation
    X['word_count_delta_activity_streak_thin'] = X['word_count_delta_activity_streak']
    X.loc[
        X['is_new_activity_streak_start'] == 0, 
        'word_count_delta_activity_streak_thin'
        ] = None


    return X

In [None]:
def enrich_cursor_position(X):

    # one-way cursor movement might be most productive
    # jumping around is choppy
    X['cursor_position_lag1'] = (
        X
        .groupby(['id'])
        ['cursor_position']
        .shift(1)
        ).fillna(0)
    X['cursor_position_delta'] = (
        X['cursor_position'] - X['cursor_position_lag1'] 
    )

    # if cursor position increases due to copy+paste (perhaps of essay prompt),
    # that doesn't reflect grade-driving output
    X['cursor_position_input'] = np.where(
        X['activity'] == "Input", 
        X["cursor_position"], 
        np.nan
        )
    X['cursor_position_cummax'] = (
        X
        .groupby(['id'])
        ['cursor_position_input']
        .cummax()
        )
    # for some reason, unable to chain below statements with above
    X['cursor_position_cummax'] = (
        X
        .groupby(['id'])
        ['cursor_position_cummax']
        .ffill()
        .fillna(0)
    )

    X['cursor_position_vs_max'] = (
        X['cursor_position'] - X['cursor_position_cummax']
        )

    X['cursor_position_last_space'] = np.where(
        (X['activity'] == "Input") & (X["text_change"] == ' '),
        X['cursor_position'],
        np.nan
    ) 
    X['cursor_position_last_space'] = (
        X
        .groupby(['id'])
        ['cursor_position_last_space']
        .ffill()
        # likely not beginning essay with a space
        .fillna(0)
    )

    X = X.drop(columns='cursor_position_input')

    return X

In [None]:
def enrich_word_length(X):
        
    # word length offers a content quality measure.
    # hard to track entire words sequence in rolling fashion.
        # every word's length, in a list of one element per word?  
    # more tractable to track very latest string

    is_edit_to_latest_string = (
        X['cursor_position'] > X['cursor_position_last_space']
    )

    X['is_latest_space'] = (
        (X['cursor_position_vs_max'] == 0)
        & (X['activity'] == "Input")
        & (X["text_change"] == ' ')
        )

    X['is_latest_string_end'] = (
        X
        .groupby(['id'])
        ['is_latest_space']
        .shift(-1)
        # last process records
        .fillna(True)
        )

    X['n_alphanum_char_added_to_latest_string'] = 0
    is_alphanumeric_addition = (
        (X['activity'] == "Input")
        & (X["text_change"] == 'q')
        )
    X.loc[
        (is_alphanumeric_addition & is_edit_to_latest_string), 
        'n_alphanum_char_added_to_latest_string'
        ] = 1
    is_alphanumeric_subtraction = (
        (X['activity'] == "Remove/Cut")
        & (X['up_event'] == 'Backspace')
        & (X["text_change"] == 'q')
        )
    X.loc[
        (is_alphanumeric_subtraction & is_edit_to_latest_string), 
        'n_alphanum_char_added_to_latest_string'
        ] = -1

    # example: 2nd string, 2 characters in.
    # considering cumsum for each character in 2nd string, 
    # subtract those characters from 1st
    X['rolling_length_strings'] = (
        X
        .groupby(['id'])
        ['n_alphanum_char_added_to_latest_string']
        .cumsum() 
        ) 

    X['rolling_length_completed_strings'] = None
    X.loc[
        X['is_latest_space'], 'rolling_length_completed_strings'
        ] = X['rolling_length_strings']
    X['rolling_length_completed_strings'] = (
        X
        .groupby(['id'])
        ['rolling_length_completed_strings']
        .ffill()
        .fillna(0)
    )

    X['rolling_length_latest_string'] = (
        X['rolling_length_strings'] 
        - X['rolling_length_completed_strings']
    )

    X['length_latest_string'] = None
    X.loc[
        X['is_latest_string_end'], 'length_latest_string'
        ] = X['rolling_length_latest_string']
    
    return X

In [None]:
def enrich_punctuation(X):
        
    # if thoughts aren't separated by punctuation, writing won't score well
    X['is_thought_delimiting_punctuation'] = (
        (X['text_change'] == ".")
        | (X['text_change'] == ". ")
        | (X['text_change'] == ",")
        | (X['text_change'] == "-")
        | (X['text_change'] == "!")
        | (X['text_change'] == ";")
        | (X['text_change'] == "?")
        | (X['text_change'] == ":")
        ).astype(int)

    X['is_special_punctuation'] = (
        (X['text_change'] == "=")
        | (X['text_change'] == "/")
        | (X['text_change'] == "\\")
        | (X['text_change'] == "(")
        | (X['text_change'] == ")")
        | (X['text_change'] == "\n")
        | (X['text_change'] == "[")
        | (X['text_change'] == "]")
        | (X['text_change'] == ">")
        | (X['text_change'] == "<")
        | (X['text_change'] == "$")
        | (X['text_change'] == "*")
        | (X['text_change'] == "&")
    )

    return X

In [None]:
TOTAL_MIN_MAX_EXPECTED = 30
TOTAL_MIN_PLUS_BUFFER = 150 # id 21bbc3f6 case extended to 140 min ... odd
SECONDS_PER_MIN = 60
SECONDS_PER_WINDOW = 30

def enrich_time_windows(X):

    # windows allow for time-sequence features
    # expect that some essays extend beyond 30 min described in 'Data Collection'
    # downstream, **do not tabulate over a writer's unused time windows**!!

    X['window_30s'] = pd.cut(
        X['down_time'],
        bins=np.arange(
            0, 
            TOTAL_MIN_PLUS_BUFFER * SECONDS_PER_MIN * MS_PER_S, 
            SECONDS_PER_WINDOW * MS_PER_S
            )
        )

    X['is_time_beyond_expected_max'] = (
        X['up_time'] > TOTAL_MIN_MAX_EXPECTED * SECONDS_PER_MIN * MS_PER_S
    ).astype(int)

    return X

In [None]:
def subset_features(X):

    return X[[
        "id",
        "event_id",
        "is_time_beyond_expected_max",
        "window_30s",
        "burst_id",
        "burst_type",
        "burst_type_Nonproduction",
        "burst_type_Input",
        "burst_type_Remove/Cut",
        "burst_type_Replace",
        "burst_type_Paste",
        "burst_type_Move",
        "is_new_burst_start",
        "is_new_burst_start_Nonproduction",
        "is_new_burst_start_Input",
        "is_new_burst_start_Remove/Cut",
        "is_new_burst_start_Replace",
        "is_new_burst_start_Paste",
        "is_new_burst_start_Move",
        "burst_time_start",
        "burst_time_end",
        "burst_time_duration",
        "burst_events_Nonproduction",
        "burst_events_Input",
        "burst_events_Remove/Cut",
        "burst_events_Replace",
        "burst_events_Paste",
        "burst_events_Move",
        "word_count_delta_burst",
        "word_count_delta_burst_thin",
        "activity_streak_id",
        "is_new_activity_streak_start",
        "is_new_activity_streak_start_Nonproduction",
        "is_new_activity_streak_start_Input",
        "is_new_activity_streak_start_Remove/Cut",
        "is_new_activity_streak_start_Replace",
        "is_new_activity_streak_start_Paste",
        "is_new_activity_streak_start_Move",
        "is_activity_streak_end",
        "activity_streak_length_thin",
        "word_count_delta_activity_streak",
        "word_count_delta_activity_streak_thin",

        "down_time",
        "up_time",	
        "action_time",	
        "activity_detailed",
        "activity",	
        "activity_Nonproduction",
        "activity_Input",
        "activity_Remove/Cut",
        "activity_Replace",
        "activity_Paste",
        "activity_Move",
        "down_event",	
        "up_event",	
        "text_change",
        "is_thought_delimiting_punctuation",
        "cursor_position",	
        "word_count",

        "cursor_position_delta",
        "cursor_position_vs_max",
        "cursor_position_cummax",
        "cursor_position_last_space",

        "word_count_lag1",
        "word_count_delta_event",

        "up_time_lag1",
        "latency_time",
        "preceding_pause_time",
        "preceding_pause_time_start_window",
        "rolling_pause_time",
        "rolling_pause_time_fraction",
        "total_pause_time"
        ]]  

In [None]:
def concatenate_essay_from_logs(df):
    """
    Concatenate essay text from disparate logged input events.
    Expect df to be *one* author's log.
    Adapted from sources: 
        https://www.kaggle.com/code/hiarsl/feature-engineering-sentence-paragraph-features,
        https://www.kaggle.com/code/kawaiicoderuwu/essay-contructor.
    """

    input_events = df.loc[
        (df.activity != 'Nonproduction'), 
        ['activity_detailed', 'cursor_position', 'text_change']
        ].rename(columns={'activity_detailed': 'activity'})

    essay_text = ""
    for input_event in input_events.values:

        activity = input_event[0]
        cursor_position_after_event = input_event[1]
        text_change_log = input_event[2]

        if activity == 'Replace':

            replace_from_to = text_change_log.split(' => ')
            text_add = replace_from_to[1]
            text_remove = replace_from_to[0]
            cursor_position_start_text_change = (
                cursor_position_after_event - len(text_add)
                )
            cursor_position_after_skip_replace = (
                cursor_position_start_text_change + len(text_remove)
            )

            # essayText start: "the blue cat"
            # replace "blue" with "red"
            # "the redblue cat", skip blue
            essay_text = (
                essay_text[:cursor_position_start_text_change] # "the "
                + text_add # "red"
                # essayText value: "the blue cat" 
                # want remaining " cat", NOT "blue cat"
                + essay_text[cursor_position_after_skip_replace:] 
                )

            continue

        if activity == 'Paste':

            cursor_position_start_text_change = (
                cursor_position_after_event - len(text_change_log)
                )

            # essayText start: "the cat"
            # paste "blue " between
            essay_text = (
                essay_text[:cursor_position_start_text_change] # "the " 
                + text_change_log # "blue "
                # essayText value: "the cat"
                + essay_text[cursor_position_start_text_change:]
            )

            continue

        if activity == 'Remove/Cut':
            # similar process to "Replace" action

            text_remove = text_change_log
            cursor_position_after_skip_remove = (
                cursor_position_after_event + len(text_remove)
            )

            essay_text = (
                essay_text[:cursor_position_after_event] 
                + essay_text[cursor_position_after_skip_remove:]
                )

            continue
        
        if "Move" in activity:

            cursor_intervals_raw_str = (
                activity[10:]
                .replace("[", "")
                .replace("]", "")
                )
            cursor_intervals_separate = cursor_intervals_raw_str.split(' To ')
            cursor_intervals_vectors = [
                x.split(', ') 
                for x in cursor_intervals_separate
                ]
            cursor_interval_from = [
                int(x) for x in cursor_intervals_vectors[0]
                ]
            cursor_interval_to = [
                int(x) for x in cursor_intervals_vectors[1]
                ]

            # "the blue cat ran", move "blue" to
            # "the cat blue ran"
            # note: no change in total text length

            if cursor_interval_from[0] != cursor_interval_to[0]:

                if cursor_interval_from[0] < cursor_interval_to[0]:
                    
                    essay_text = (
                        # all text preceding move-impacted window
                        essay_text[:cursor_interval_from[0]] +
                        # skip where moved block _was_,
                        # proceed to end of move-impacted window
                        essay_text[cursor_interval_from[1]:cursor_interval_to[1]] +
                        # add moved block
                        essay_text[cursor_interval_from[0]:cursor_interval_from[1]] + 
                        # all text proceeding move-impacted window
                        essay_text[cursor_interval_to[1]:]
                    )

                # "the cat ran fast", move "ran" to 
                # "ran the cat fast"
                else:

                    essay_text = (
                        # all text preceding move-impacted window
                        essay_text[:cursor_interval_to[0]] + 
                        # add moved block
                        essay_text[cursor_interval_from[0]:cursor_interval_from[1]] +
                        # skip moved block, still within move-impacted window
                        essay_text[cursor_interval_to[0]:cursor_interval_from[0]] + 
                        # all text proceeding move-impacted window
                        essay_text[cursor_interval_from[1]:]
                    )
      
            continue
        

        cursor_position_start_text_change = (
            cursor_position_after_event - len(text_change_log)
            )
        essay_text = (
            essay_text[:cursor_position_start_text_change] 
            + text_change_log
            + essay_text[cursor_position_start_text_change:]
            )
        
    return pd.DataFrame({'id': df['id'].unique(), 'essay': [essay_text]})

In [None]:
def enrich_logs(X, is_training_run):

    X = enrich_activity(X, is_training_run)
    print("Enriched activity")

    # live test data raise Exception during decode-encode attempt.
    # still, higher quality model should follow from 
    # higher-quality train data 
    if is_training_run:
        X = scrub_text_change(X)

    X = enrich_pauses(X)
    print("Enriched pauses")

    X = enrich_time_bursts(X, is_training_run)
    print("Enriched time bursts")

    X = enrich_activity_streaks(X)
    print("Enriched activity streaks")

    X = enrich_word_count(X)
    print("Enriched word count")

    X = enrich_cursor_position(X)
    print("Enriched cursor position")

    X = enrich_word_length(X)
    print("Enriched word length")

    X = enrich_punctuation(X)
    print("Enriched punctuation")

    X = enrich_time_windows(X)
    print("Enriched time windows")

    return subset_features(X)


In [None]:
def aggregate_essay_text_features(X):
    """
    Aggregates covering final writing product, not writing process narrowly.
    """

    essays_text = pd.concat(
        [concatenate_essay_from_logs(x) for _, x in X.groupby('id')], axis=0
        ).reset_index(drop=True)
    
    # two consecutive newlines constitute one effective
    # no paragraph breaks imply, all 1 paragraph
    essays_text['n_paragraphs'] = essays_text['essay'].str.count("[\n]+")
    essays_text.loc[essays_text['n_paragraphs'] == 0, 'n_paragraphs'] = 1
    essays_text['paragraphs'] = essays_text['essay'].str.split("[\n]+")
    essays_text['n_sentences_by_paragraph'] = (
        essays_text['paragraphs']
        .apply(lambda paragraphs: np.array([
            len(re.findall("[\.]+|[?]+|[!]+", p)) 
            for p in paragraphs
            ]) 
            )
        )
    # for bounds guidance, see overall distribution
    varnames_n_paragraphs_by_n_sentences_bin = []
    for geq_low, lt_high in [
        (0, 2),
        (2, 3),
        (3, 4),
        (4, 5),
        (5, 6),
        (6, 7),
        (7, 10),
        (10, 20),
        (20, 50)
        ]:

        bin_var = f'n_paragraphs_with_n_sentences_geq{geq_low}_lt{lt_high}'
        varnames_n_paragraphs_by_n_sentences_bin += [bin_var, bin_var + "_frac"]

        essays_text[bin_var] = (
            essays_text['n_sentences_by_paragraph']
            .apply(lambda x: ( (x >= geq_low) & (x < lt_high) ).sum() )
            )
        
        essays_text[bin_var + "_frac"] = (
            essays_text[bin_var] / essays_text['n_paragraphs']
            )


    # sentences split can leave last hanging ' ', 
    # if not scrubbed by search for 'q'
    essays_text['sentences'] = essays_text['essay'].str.split("[\.]+|[?]+|[!]+")
    essays_text['sentences'] = (
        essays_text['sentences']
        .apply(lambda sentences: [s for s in sentences if 'q' in s])
    )
    essays_text['n_sentences'] = (
        essays_text['sentences']
        .apply(lambda s_split: len(s_split))
    )

    essays_text['words_by_sentence'] = (
        essays_text['sentences']
        .apply(lambda sentences: [s.split() for s in sentences])
    )
    essays_text['i_words_by_sentence'] = (
        essays_text['words_by_sentence']
        .apply(lambda sentences: np.array([len(s) for s in sentences]))
    )

    # for bounds guidance, see overall distribution
    varnames_n_sentences_by_word_count_bin = []
    for geq_low, lt_high in [
        (0, 5),
        (5, 10),
        (10, 15),
        (15, 20),
        (20, 25),
        (25, 30),
        (30, 50),
        (50, 5000)
        ]:

        bin_var = f'n_sentences_words_geq{geq_low}_lt{lt_high}'
        varnames_n_sentences_by_word_count_bin += [bin_var, bin_var + "_frac"]

        essays_text[bin_var] = (
            essays_text['i_words_by_sentence']
            .apply(lambda x: ( (x >= geq_low) & (x < lt_high) ).sum() )
            )
        
        essays_text[bin_var + "_frac"] = (
            essays_text[bin_var] / essays_text['n_sentences']
            )


    essays_text['words'] = essays_text['essay'].str.split(" +", regex=True)
    essays_text["word_count_reconstructed"] = (
        essays_text
        ["words"]
        .apply(lambda x: len(x))
    )
    essays_text["words_length"] = (
        essays_text["words"]
        .apply(lambda x: np.array([len(a) for a in x]))
    )

    # for bounds guidance, see distribution of word lengths
    varnames_i_words_by_length_bin = []
    for geq_low, lt_high in [
        (0, 2),
        (2, 3),
        (3, 4),
        (4, 5),
        (5, 6),
        (6, 7),
        (7, 8),
        # "incomprehensible" is a reasonable, long (21-char) word
        (8, 25),
        (25, 500)
    ]:
        bin_var = f'words_length_geq{geq_low}_lt{lt_high}'
        varnames_i_words_by_length_bin += [bin_var, bin_var + "_frac"]

        essays_text[bin_var] = (
            essays_text['words_length']
            .apply(lambda x: ( (x >= geq_low) & (x < lt_high) ).sum() )
            )
        essays_text[bin_var + "_frac"] = (
            essays_text[bin_var] / essays_text['word_count_reconstructed']
            )


    essays_text['n_thought_delimiting_punctuation'] = (
        essays_text
        ['essay']
        .str
        .count("[\.]+|[?]+|[!]+|[,]+|[-]+|[;]+|[:]+|[—]+")
        )
    essays_text["words_per_thought_delimiting_punctuation_avg"] = (
        essays_text["word_count_reconstructed"] / 
        essays_text['n_thought_delimiting_punctuation']
    )

    essays_text['n_parenthetical_punctuation'] = (
        essays_text
        ['essay']
        .str
        .count("\(|\)|\[|\]|\*|{|}")
    )

    essays_text['n_quant_punctuation'] = (
        essays_text
        ['essay']
        .str
        .count("=|>|<|\$|\%|\+")
    )

    essays_text['n_apostrophe'] = (
        essays_text
        ['essay']
        .str
        .count("'")
    )

    essays_text['n_quotes'] = (
        essays_text
        ['essay']
        .str
        .count("\"")
    )

    essays_text['n_shortening_punctuation'] = (
        essays_text
        ['essay']
        .str
        .count("&|@")
    )


    for var in ['i_words_by_sentence', 'words_length']:
        essays_text[f"{var}_stddev"] = essays_text[var].apply(lambda x: x.std())


    aggregates_essay_text = essays_text[[
        'id',
        'n_paragraphs', 
        'n_sentences', 
        
        'n_thought_delimiting_punctuation',
        'n_parenthetical_punctuation',
        'n_quant_punctuation',
        'n_apostrophe',
        'n_quotes',
        'n_shortening_punctuation',

        "words_per_thought_delimiting_punctuation_avg",

        "i_words_by_sentence_stddev",
        "words_length_stddev"
        ]

        + varnames_n_paragraphs_by_n_sentences_bin

        + varnames_n_sentences_by_word_count_bin

        + varnames_i_words_by_length_bin
        
        ]
    aggregates_essay_text = aggregates_essay_text.set_index('id')

    return aggregates_essay_text


event_vars_sum = (
    ['activity_' + x for x in ACTIVITY_CATEGORIES] 
    + ['is_new_burst_start'] 
    + ['is_new_burst_start_' + x for x in ACTIVITY_CATEGORIES]
    + ["is_new_activity_streak_start_" + x for x in ACTIVITY_CATEGORIES]
    )

conti_vars_sum = (
    ['word_count_delta_event']
    + ["preceding_pause_time"]
    )

distribution_vars = [
    'latency_time', 
    'preceding_pause_time', 
    'cursor_position_delta',
    'word_count_delta_burst_thin',
    'activity_streak_length_thin',
    'cursor_position_vs_max'  
]


def aggregate_no_time_dependence_measures(X, is_training_run):
    """
    Aggregate measures irrespective of time dependence. 
    Ex: sum of inputs over entire essay.
    """

    # discretizing conti var allows sum of vars, as though they were events.
    # because discretization involves col expansion via one-hot encoding,
    # reduce dataset to small-as-possible
    # extracting non-float id allows ColumnTransformer's properly typed numpy result
    X_attributes = X[['id']]
    X_to_sum = X[event_vars_sum + ['word_count_delta_event'] + distribution_vars]
    X_orig_to_sum = X_to_sum[['preceding_pause_time', 'latency_time']].copy()

    if is_training_run:

        pipeline = ColumnTransformer(
            transformers=[(
                'discretizer', 
                preprocessing.KBinsDiscretizer(
                    n_bins=10, 
                    encode='onehot-dense', 
                    strategy='quantile'
                    ),
                distribution_vars
            )],
            remainder='passthrough',
            verbose_feature_names_out=False
            )

        # if nulls not explicitly handled, Exception raises
        pipeline.fit(X_to_sum.fillna(-1))
        with open("pipeline_no_time_dep_discretizer.pkl", "wb") as f:
            pickle.dump(pipeline, f)

    else:
        with open("pipeline_no_time_dep_discretizer.pkl", "rb") as f:
            pipeline = pickle.load(f)

    # follow pipeline fit nulls treatment
    X_to_sum = pipeline.transform(X_to_sum.fillna(-1))

    X_to_sum = pd.DataFrame(X_to_sum, columns=pipeline.get_feature_names_out())
    X_to_sum = pd.concat([X_attributes, X_to_sum, X_orig_to_sum], axis=1)
    cols_in = set(pipeline.feature_names_in_)
    cols_out = set(pipeline.get_feature_names_out())
    distribution_vars_discretized = cols_out.difference(cols_in)

    X_to_sum['nobs'] = 1
    # with distribution_vars discretized, everything sums
    sums_over_time = X_to_sum.groupby('id').agg(sum)
    for var in distribution_vars_discretized:
        sums_over_time[var + '_share_distr'] = (
            sums_over_time[var] / sums_over_time['nobs']
        )
    sums_over_time.drop(columns='nobs', inplace=True)
    sums_over_time['delete_insert_ratio'] = (
        sums_over_time['activity_Remove/Cut'] / 
        sums_over_time['activity_Input'] 
        )
    del X_to_sum


    expr = {}
    for var in distribution_vars:
        expr[f"{var}_stddev"] = (var, np.std)
    expr['preceding_pause_time_max'] = ('preceding_pause_time', 'max')
    expr['initial_pause_time_max'] = ('preceding_pause_time_start_window', 'max')
    expr["total_time"] = ('up_time', 'max')
    expr['is_time_beyond_expected_max'] = ('is_time_beyond_expected_max', 'max')

    distribution_summaries = X.groupby('id').agg(**expr)
    distribution_summaries['is_initial_pause_max_pause'] = (
        distribution_summaries['preceding_pause_time_max'] == 
        distribution_summaries['initial_pause_time_max']
        ).astype(int)


    aggregates_essay_text = aggregate_essay_text_features(X)


    mle_summary_subjects = []
    for X_subject in [x for _, x in X.groupby('id')]:

        subject_id = X_subject['id'].iloc[0]
        mle_by_var = {}
        for var in ['preceding_pause_time', 'latency_time']:
            shape, location, scale = lognorm.fit(X_subject[var].dropna())
            mle_by_var[f"{var}_lognorm_shape"] = shape
            mle_by_var[f"{var}_lognorm_location"] = location
            mle_by_var[f"{var}_lognorm_scale"] = scale

        mle_by_var = pd.DataFrame(mle_by_var, index=[subject_id])
        mle_by_var = mle_by_var.fillna(-1)

        mle_summary_subjects.append(mle_by_var)

    distr_params_over_time = pd.concat(mle_summary_subjects, axis=0)


    aggregates_over_time = pd.merge(
        sums_over_time, 
        distribution_summaries,
        how='left',
        left_index=True,
        right_index=True
        )

    aggregates_over_time = pd.merge(
        aggregates_over_time, 
        distr_params_over_time,
        how='left',
        left_index=True,
        right_index=True
        )
    
    aggregates_over_time = pd.merge(
        aggregates_over_time, 
        aggregates_essay_text,
        how='left',
        left_index=True,
        right_index=True
        )
    

    for var in event_vars_sum:

        aggregates_over_time[var + '_per_s'] = (
            1000 * (aggregates_over_time[var] / aggregates_over_time['total_time'])
            )

    aggregates_over_time = (
        aggregates_over_time
        .assign(
            keystroke_speed = lambda x: (x.activity_Input + x['activity_Remove/Cut']) / x.total_time,
            word_count_speed = lambda x: x.word_count_delta_event / x.total_time,
            pause_time_fraction = lambda x: x.preceding_pause_time / x.total_time
            )
        )
    
    
    return aggregates_over_time

In [None]:
def aggregate_time_variability_measures(
    X, aggregates_over_time, is_training_run
    ):
    """
    Tabulate author's measures by fixed time window (ex: 30-second increments),
    and derive features from that by-time window distribution.

    Use over-time aggregates to normalize select by-time window tabulations. 
    """

    # need to sum events, conti vars by fixed-time window.
    # ensure a writer's fixed-time windows are all used -- drop excess ones.
    # for events, normalize by overall average event rates, & overall sums.
    # for conti var, normalize by overall sums.

    # then, over time windows, compute percentiles. this is novel for event vars,
    # which lack percentiles over all time. p90_time_window

    sums_by_window = (
        X
        .groupby(['id', 'window_30s'])
        [event_vars_sum + conti_vars_sum]
        .agg(sum)
        .astype(float)
        .fillna(0)
        .reset_index(drop=False)
    )
    sums_by_window['delete_insert_ratio'] = (
        sums_by_window['activity_Remove/Cut'] / 
        sums_by_window['activity_Input'] 
        ).replace(np.inf, np.nan)


    # by default, every categorical time window ever observed across data
    # tabulates for every writer. instead, per writer, truncate to time windows
    # actually used.
    sums_by_window['has_activity'] = (
        sums_by_window
        [['activity_' + x for x in ACTIVITY_CATEGORIES]].sum(axis=1) 
        > 0
    )
    sums_by_window['idx_window_by_id'] = (
        sums_by_window
        .groupby('id')
        .cumcount()
    )
    sums_by_window['idx_has_activity'] = np.where(
        sums_by_window['has_activity'], 
        sums_by_window['idx_window_by_id'],
        np.nan
        )
    sums_by_window['idx_activity_max'] = (
        sums_by_window
        .groupby(['id'])
        ['idx_has_activity']
        .transform('max')
    )
    sums_by_window = (
        sums_by_window
        .loc[sums_by_window['idx_window_by_id'] <= sums_by_window['idx_activity_max']]
        .drop(columns=['has_activity', 'idx_has_activity', 'idx_activity_max'])
    )


    # for variability measure more comparable between writers, de-mean by writer. 
    # Ex: higher-throughput writer incurs higher stddev, 
    # because values have higher magnitude.

    # join method allows for merge on one index column, of multiple possible
    sums_by_window = sums_by_window.join(
        aggregates_over_time[[x + '_per_s' for x in event_vars_sum]],
        on='id',
        how='left'
        )
    for var in event_vars_sum:
        sums_by_window[var + '_time_norm'] = (
            sums_by_window[var] / 
            (sums_by_window[var + '_per_s'].replace(0, None) * 30)
            ).fillna(1)
    sums_by_window.drop(
        columns=[x + '_per_s' for x in event_vars_sum],
        inplace=True
        )

    sums_over_time_ren = aggregates_over_time[event_vars_sum + conti_vars_sum]
    sums_over_time_ren.columns = [
        x + "_total" for x in sums_over_time_ren.columns
        ]
    sums_by_window = sums_by_window.join(sums_over_time_ren, on='id', how='left')
    for var in event_vars_sum + conti_vars_sum:
        sums_by_window[var + '_frac_total'] = (
            sums_by_window[var] / 
            sums_by_window[var + '_total'].replace(0, None)
            ).fillna(1)
    sums_by_window.drop(
        columns=[x + '_total' for x in event_vars_sum + conti_vars_sum],
        inplace=True
        )


    expr = {}
    distr_vars = (
        event_vars_sum
        + conti_vars_sum
        + [var + '_time_norm' for var in event_vars_sum]
        + [var + '_frac_total' for var in event_vars_sum]
        + [var + '_frac_total' for var in conti_vars_sum]
        )
    X_attributes = sums_by_window[['id']]
    X_to_sum = sums_by_window[distr_vars]
    if is_training_run:

        pipeline = ColumnTransformer(
            transformers=[(
                'discretizer', 
                preprocessing.KBinsDiscretizer(
                    n_bins=10, 
                    encode='onehot-dense', 
                    strategy='quantile'
                    ),
                distr_vars
            )],
            remainder='passthrough',
            verbose_feature_names_out=False
            )

        # if nulls not explicitly handled, Exception raises
        pipeline.fit(X_to_sum.fillna(-1))
        with open("pipeline_time_dep_discretizer.pkl", "wb") as f:
            pickle.dump(pipeline, f)

    else:
        with open("pipeline_time_dep_discretizer.pkl", "rb") as f:
            pipeline = pickle.load(f)

    # follow pipeline fit nulls treatment
    X_to_sum = pipeline.transform(X_to_sum.fillna(-1))

    X_to_sum = pd.DataFrame(X_to_sum, columns=pipeline.get_feature_names_out())
    X_to_sum = pd.concat([X_attributes, X_to_sum], axis=1)
    cols_in = set(pipeline.feature_names_in_)
    cols_out = set(pipeline.get_feature_names_out())
    distribution_vars_discretized = list( cols_out.difference(cols_in) )

    X_to_sum['nobs'] = 1
    # with distribution_vars discretized, everything sums
    distr_summaries = X_to_sum.groupby('id').agg(sum)
    for var in distribution_vars_discretized:
        distr_summaries[var + '_share_distr'] = (
            distr_summaries[var] / distr_summaries['nobs']
        )
    distr_summaries.drop(columns='nobs', inplace=True)
    distr_summaries.columns = [
        x + '_time_window' for x in distr_summaries.columns
        ]
    

    entropy_by_window = (
        sums_by_window
        .groupby(['id'])
        [[var for var in sums_by_window.columns if 'frac_total' in var]]
        .agg(lambda x: entropy(x.value_counts()))
        )
    entropy_by_window.columns = [
        x + '_entropy' 
        for x in entropy_by_window.columns
        ]


    trend_by_window = (
        sums_by_window
        .sort_values(['id', 'idx_window_by_id'])
        .drop(columns=['window_30s'])
        .groupby(['id'])
        [['idx_window_by_id'] + event_vars_sum + conti_vars_sum]
        .corr()
        )
    trend_by_window = trend_by_window.fillna(0)
    # extract correlations strictly with time index
    trend_by_window = trend_by_window.xs('idx_window_by_id', level=1)
    trend_by_window.columns = [x + "_ttrend" for x in trend_by_window.columns]


    vari_by_window = pd.merge(
        distr_summaries,
        entropy_by_window,
        how='left',
        left_index=True,
        right_index=True
        )


    vari_by_window = pd.merge(
        vari_by_window,
        trend_by_window,
        how='left',
        left_index=True,
        right_index=True
        )     
    
    
    return vari_by_window

In [None]:
def feature_transform_pipeline(X_logs, is_training_run):

    X_logs_enriched = enrich_logs(X_logs, is_training_run)

    aggregates_over_time = aggregate_no_time_dependence_measures(
        X_logs_enriched, is_training_run
        )
    vari_by_window = aggregate_time_variability_measures(
        X_logs_enriched, aggregates_over_time, is_training_run
        )

    X_transform = pd.merge(
        aggregates_over_time,
        vari_by_window,
        how='left',
        left_index=True,
        right_index=True
        )
    
    return X_transform

In [None]:
# expect train_logs are too large for single batch processing
X_train_logs = extract(PATH_TRAIN_LOGS)

X_train_logs_groups = [x for _, x in X_train_logs.groupby('id')]
del X_train_logs

X_train_logs_chunk1 = X_train_logs_groups[0:1200]
X_train_logs_chunk2 = X_train_logs_groups[1200:]
del X_train_logs_groups

X_train_logs_chunk1 = pd.concat(X_train_logs_chunk1, axis=0)
X_train_logs_chunk2 = pd.concat(X_train_logs_chunk2, axis=0).reset_index(drop=True)


In [None]:
X_train_chunk1 = feature_transform_pipeline(X_train_logs_chunk1, True)
del X_train_logs_chunk1

In [None]:
# rare train cases in chunk2 can yield new discretized bins versus chunk1,
# if pipeline re-trained
X_train_chunk2 = feature_transform_pipeline(X_train_logs_chunk2, False)
del X_train_logs_chunk2

In [None]:
X_train = pd.concat([X_train_chunk1, X_train_chunk2], axis=0)
del X_train_chunk1, X_train_chunk2

In [None]:
# can't learn from zero-variance features
has_zero_var_col = (X_train.std() == 0).to_dict()
has_zero_var_col = [
    x for x, has_zero_var in has_zero_var_col.items()
    if has_zero_var
    ]
X_train = X_train.drop(columns=has_zero_var_col)

In [None]:
assert(all(X_train.notnull()))

In [None]:
# expect large universe of possible features --
# then, optuna runs very slowly, model fitting generally is an issue.
# that's besides concerns of noise features.
# use random forest for feature selection.
from sklearn.model_selection import train_test_split

y = pd.read_csv(PATH_TRAIN_OUTCOMES)
y.index = y["id"]
y = y.rename(columns={"score": "y"})
y = y.drop(columns="id")
XY = pd.merge(X_train, y, how="left", left_index=True, right_index=True)
y = XY["y"]
X = XY.drop(columns="y")

X, X_test, y, y_test = train_test_split(X, y, test_size=0.33, random_state=777)

In [None]:
from sklearn.ensemble import RandomForestRegressor
from sklearn.inspection import permutation_importance

# following feature universe expansion to percentiles over all time & windows,
# tested top 500 features kept. 
# 750-tree gbm feature importance suggested, >0 importance for 385/500.
# so, extend pruning beyond top 500.  
N_TOP_FEATURES_KEEP = 500

model = RandomForestRegressor(
    n_estimators=1000,
    max_features="sqrt",
    max_depth=None,
)
model.fit(X, y.values)

result = permutation_importance(model, X, y, n_repeats=5, n_jobs=-1)
feature_imp = pd.DataFrame({
    'feature': X_test.columns,
    'score': result.importances_mean
    }).sort_values('score', ascending=False).reset_index(drop=True)
features_keep = feature_imp['feature'].iloc[0:N_TOP_FEATURES_KEEP]

In [None]:
X = X[features_keep]
X_test = X_test[features_keep]

In [None]:
X.to_pickle("./data/processed/X_train.pkl")
y.to_pickle("./data/processed/y_train.pkl")

X_test.to_pickle("./data/processed/X_test.pkl")
y_test.to_pickle("./data/processed/y_test.pkl")