In [1]:
# Import PySpark

from pyspark.sql import SparkSession
# import functions
from pyspark.sql.functions import countDistinct, col, when, collect_set, row_number, collect_list, lag, udf, sqrt, hour, minute, second, from_unixtime, to_timestamp, mean, max, min, first
from pyspark.sql.window import Window
from pyspark.ml.feature import StringIndexer, OneHotEncoder, VectorAssembler, StandardScaler
from pyspark.sql.types import ArrayType, DoubleType

import warnings
warnings.filterwarnings('ignore')

In [2]:
import pandas as pd
import numpy as np
from enum import Enum
import seaborn as sns
import matplotlib.pyplot as plt
import random

import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import TensorDataset, DataLoader

In [3]:
# Create SparkSession
spark = SparkSession.builder \
                    .appName('csv-to-dataset') \
                    .config("spark.executor.memory", "4g") \
                    .getOrCreate()

# read a CSV file and convert it to a DataFrame
df = spark.read.csv('../data/Mouse_Event/Train_Mouse.csv', header=True, inferSchema=True)
df.show(3)
df.count()

+--------------------+--------------------+--------------------+-------------+----------+--------+--------+
|                 uid|          session_id|             user_id|    timestamp|event_type|screen_x|screen_y|
+--------------------+--------------------+--------------------+-------------+----------+--------+--------+
|18d23c24-c5ea-46f...|c4af3096-5a73-4d7...|-2416201411472988629|1655993031186|         2|  1087.0|   442.0|
|25f4ed5c-12d6-416...|c4af3096-5a73-4d7...|-2416201411472988629|1655993031135|         2|  1324.0|   468.0|
|29716657-2778-4f3...|c4af3096-5a73-4d7...|-2416201411472988629|1655993031102|         2|  1504.0|   487.0|
+--------------------+--------------------+--------------------+-------------+----------+--------+--------+
only showing top 3 rows



11633

In [4]:
# functions

# get randon index of 6-lenth session id array
def get_valid_data_index():
    def distance(row):
        idx_valid = []
        i = random.sample(list(range(6)), 2)
        return row.session_ids[i[0]]
    return distance

In [5]:
# Dataset Prepare

new_df = StringIndexer(inputCol="user_id", outputCol="i") \
            .fit(df) \
            .transform(df) \
            .alias('new_df')
max_index = new_df.agg({"i": "max"}).collect()[0][0] + 1

new_df = new_df.withColumn("user_label", (new_df["i"] / max_index) * 20) \
        .withColumn("event_2", when(new_df["event_type"] == 2, 1).otherwise(0)) \
        .withColumn("event_3", when(new_df["event_type"] == 3, 1).otherwise(0)) \
        .withColumn("event_4", when(new_df["event_type"] == 4, 1).otherwise(0)) \
        .withColumn("event_5", when(new_df["event_type"] == 5, 1).otherwise(0)) \
        .withColumn("timestamp_", to_timestamp(from_unixtime(new_df["timestamp"]/1000))) \
        .withColumn("hour", hour("timestamp_")) \
        .withColumn("minute", minute("timestamp_")) \
        .withColumn("second", second("timestamp_"))

print(new_df.columns)

group_df = new_df.groupby('session_id', 'hour', 'minute', 'second') \
            .agg(mean("screen_x").alias("mean_screen_x"), \
                 mean("screen_y").alias("mean_screen_y"), \
                 mean("event_type").alias("mean_event_type"), \
                 mean("event_2").alias("mean_event_2"), \
                 mean("event_3").alias("mean_event_3"), \
                 mean("event_4").alias("mean_event_4"), \
                 mean("event_5").alias("mean_event_5"), \
                 (max("screen_x") - min("screen_x")).alias("diff_screen_x"), \
                 (max("screen_y") - min("screen_y")).alias("diff_screen_y"), \
                 (max("timestamp") - min("timestamp")).alias("diff_time"), \
                 # first("timestamp").alias("timestamp"), \
                 # first("user_label").alias("user_label"), \
                ) \
            .withColumn('distance_xy', sqrt(col('diff_screen_x')**2 + col('diff_screen_y')**2)) \
            .withColumn('second_speed_x', col('diff_screen_x') / col('diff_time')) \
            .withColumn('second_speed_y', col('diff_screen_y') / col('diff_time')) \
            .withColumn('second_speed_xy', col('distance_xy') / col('diff_time')) 
print(group_df.columns)

merged_df = new_df.join(group_df, on=['session_id', 'hour', 'minute', 'second']) \
                  .orderBy(["user_label", "timestamp"]) #, ascending=[True, True])


['uid', 'session_id', 'user_id', 'timestamp', 'event_type', 'screen_x', 'screen_y', 'i', 'user_label', 'event_2', 'event_3', 'event_4', 'event_5', 'timestamp_', 'hour', 'minute', 'second']
['session_id', 'hour', 'minute', 'second', 'mean_screen_x', 'mean_screen_y', 'mean_event_type', 'mean_event_2', 'mean_event_3', 'mean_event_4', 'mean_event_5', 'diff_screen_x', 'diff_screen_y', 'diff_time', 'distance_xy', 'second_speed_x', 'second_speed_y', 'second_speed_xy']


In [6]:
%%time
# get shift data

shift_values = range(1, 21)

for shift in shift_values:
    col_name_x = f"mean_screen_x_{shift}"
    col_name_y = f"mean_screen_y_{shift}"
    col_name_second_x = f"second_speed_x_{shift}"
    col_name_second_y = f"second_speed_y_{shift}"
    col_name_second_xy = f"second_speed_xy_{shift}"
    col_name_event_type = f"mean_event_type_{shift}"
    merged_df = merged_df.withColumn(col_name_x, lag("mean_screen_x", shift).over(Window.partitionBy("session_id").orderBy("timestamp"))) \
                         .withColumn(col_name_y, lag("mean_screen_y", shift).over(Window.partitionBy("session_id").orderBy("timestamp"))) \
                         .withColumn(col_name_second_x, lag("second_speed_x", shift).over(Window.partitionBy("session_id").orderBy("timestamp"))) \
                         .withColumn(col_name_second_y, lag("second_speed_y", shift).over(Window.partitionBy("session_id").orderBy("timestamp"))) \
                         .withColumn(col_name_second_xy, lag("second_speed_xy", shift).over(Window.partitionBy("session_id").orderBy("timestamp"))) \
                         .withColumn(col_name_event_type, lag("mean_event_type", shift).over(Window.partitionBy("session_id").orderBy("timestamp"))) \

# merged_df = group_df.orderBy(["user_label", "timestamp"]).dropna()
merged_df.select('session_id', 'timestamp', 'hour', 'minute', 'second', 'mean_screen_x', 'mean_screen_x_1', 'mean_screen_x_2') \
        .show(10)

    

+--------------------+-------------+----+------+------+-----------------+---------------+---------------+
|          session_id|    timestamp|hour|minute|second|    mean_screen_x|mean_screen_x_1|mean_screen_x_2|
+--------------------+-------------+----+------+------+-----------------+---------------+---------------+
|018a54bb-b6d0-4d4...|1656339827092|  16|    23|    47|            964.0|           null|           null|
|018a54bb-b6d0-4d4...|1656339827521|  16|    23|    47|            964.0|          964.0|           null|
|018a54bb-b6d0-4d4...|1656339827541|  16|    23|    47|            964.0|          964.0|          964.0|
|018a54bb-b6d0-4d4...|1656339827574|  16|    23|    47|            964.0|          964.0|          964.0|
|018a54bb-b6d0-4d4...|1656339827758|  16|    23|    47|            964.0|          964.0|          964.0|
|018a54bb-b6d0-4d4...|1656339827841|  16|    23|    47|            964.0|          964.0|          964.0|
|018a54bb-b6d0-4d4...|1656339833170|  16|    2

# Normalizaton

In [7]:
train_features = [
    'mean_event_2', 'mean_event_3', 'mean_event_4', 'mean_event_5',
    'mean_screen_x',   'mean_screen_y',   'mean_event_type',   'second_speed_x',   'second_speed_y',   'second_speed_xy',
]

for i in range(1, 21):
    train_features.append('mean_screen_x_'+str(i))
    train_features.append('mean_screen_y_'+str(i))
    train_features.append('second_speed_x_'+str(i))
    train_features.append('second_speed_y_'+str(i))
    train_features.append('second_speed_xy_'+str(i))
    train_features.append('mean_event_type_'+str(i))

assembler = VectorAssembler(inputCols=train_features, outputCol='features')
data = assembler.transform(merged_df.dropna().na.fill(0))

# Normalize the vector column
scaler = StandardScaler(inputCol='features', outputCol='scaled_features')
scaler_model = scaler.fit(data)
data = scaler_model.transform(data)


# Split the Train and Valid dataset

In [8]:
%%time
dfi = df.groupby('user_id').agg(collect_set("session_id").alias("session_ids"))
rows = dfi.withColumn('for_valid', get_valid_data_index()(dfi)).select('for_valid').collect()
valid_index = list(np.array([row[0] for row in rows]))

valid_df = data.filter(col('session_id').isin(valid_index))
train_df = data.subtract(valid_df)

train_df.count(), valid_df.count()

CPU times: user 19.1 ms, sys: 8.09 ms, total: 27.2 ms
Wall time: 14.3 s


(3886, 831)

In [9]:
%%time

df_1_x = train_df.select(train_features).toPandas().to_numpy()
df_1_y = train_df.select('user_label').toPandas().to_numpy()[:,0]

df_2_x = valid_df.select(train_features).toPandas().to_numpy()
df_2_y = valid_df.select('user_label').toPandas().to_numpy()[:,0]

df_1_x.shape, df_1_y.shape
df_2_x.shape, df_2_y.shape

CPU times: user 433 ms, sys: 211 ms, total: 644 ms
Wall time: 20 s


((831, 130), (831,))

# NN

In [10]:
# Simple NN model
class MyModel(nn.Module):
    def __init__(self, input_size, hidden_size, num_classes):
        super(MyModel, self).__init__()
        self.fc1 = nn.Linear(input_size, hidden_size)
        self.relu = nn.ReLU()
        self.fc2 = nn.Linear(hidden_size, hidden_size)
        self.dropout = nn.Dropout(0.5)
        # self.fc3 = nn.Linear(hidden_size, hidden_size)
        self.fc4 = nn.Linear(hidden_size, num_classes)
    
    def forward(self, x):
        out = self.fc1(x)
        out = self.relu(out)
        out = self.fc2(out)
        out = self.relu(out)
        # out = self.fc3(out)
        out = self.relu(out)
        out = self.dropout(out)
        out = self.fc4(out)
        return out
    
    def compute_loss(self, output, target):
        loss = nn.CrossEntropyLoss()
        return loss(output, target)

In [33]:
train_tensor_x = torch.tensor(df_1_x, dtype=torch.float)
train_tensor_y = torch.tensor(df_1_y, dtype=torch.float)
train_ds = TensorDataset(train_tensor_x, train_tensor_y)

valid_tensor_x = torch.tensor(df_2_x, dtype=torch.float)
valid_tensor_y = torch.tensor(df_2_y, dtype=torch.float)
valid_ds = TensorDataset(valid_tensor_x, valid_tensor_y)

batch_size = 516
train_loader = DataLoader(train_ds, batch_size=batch_size, shuffle=True)
valid_loader = DataLoader(train_ds, batch_size=batch_size, shuffle=False)

In [44]:
model_NN = MyModel(df_1_x.shape[1], 512, 20)

In [45]:
# define your loss function and optimizer
criterion = nn.CrossEntropyLoss()
optimizer = torch.optim.AdamW(model_NN.parameters())

model_NN.train()
train_losses = []
valid_losses = []
num_epochs = 1000
best_loss = float('inf')
patience = 10
counter = 0

for epoch in range(num_epochs):
    running_loss = 0.0
    model_NN.train()
    for batch_x, batch_y in train_loader:
        # Clear the gradients
        optimizer.zero_grad()

        # Forward pass
        outputs = model_NN(batch_x.float())

        # Compute the loss
        # loss = criterion(outputs, batch_y)
        loss = model_NN.compute_loss(outputs, batch_y.long())
        running_loss += loss.item()

        # Backward pass
        loss.backward()

        # Update the parameters
        optimizer.step()
    

    # Print the loss
    avg_loss_t = running_loss / len(train_loader)
    train_losses.append(avg_loss_t)
    # print(f"Epoch {epoch+1}/{num_epochs}, Train Loss: {avg_loss_t:.4f}")

    model_NN.eval()
    val_loss = 0.0
    with torch.no_grad():
        for batch_x, batch_y in valid_loader:
            outputs = model_NN(batch_x.float())
            loss = criterion(outputs, batch_y.long())
            val_loss += loss.item() # * batch_x.size(0)
    avg_loss_v = val_loss / len(valid_loader)
    valid_losses.append(avg_loss_v)
    print(f"Epoch {epoch+1}/{num_epochs}, Train Loss: {avg_loss_t:.4f}, Valid Loss: {avg_loss_v:.4f}")

    # check is validation loss has improved
    if avg_loss_v < best_loss:
        best_loss = avg_loss_v
        counter = 0
    else:
        counter += 1
    
    # check if early stopping criteria has been met 
    if counter >= patience:
        print(f"Validation loss did not improve for {patience} epochs. Stopping early...")
        break
    

Epoch 1/1000, Train Loss: 59.4163, Valid Loss: 14.2259
Epoch 2/1000, Train Loss: 9.2002, Valid Loss: 2.8192
Epoch 3/1000, Train Loss: 2.8989, Valid Loss: 2.6956
Epoch 4/1000, Train Loss: 2.7215, Valid Loss: 2.5870
Epoch 5/1000, Train Loss: 2.6732, Valid Loss: 2.5914
Epoch 6/1000, Train Loss: 2.6356, Valid Loss: 2.4536
Epoch 7/1000, Train Loss: 2.6088, Valid Loss: 2.4749
Epoch 8/1000, Train Loss: 2.5543, Valid Loss: 2.4068
Epoch 9/1000, Train Loss: 2.5093, Valid Loss: 2.3642
Epoch 10/1000, Train Loss: 2.4850, Valid Loss: 2.3193
Epoch 11/1000, Train Loss: 2.4610, Valid Loss: 2.2788
Epoch 12/1000, Train Loss: 2.4370, Valid Loss: 2.2270
Epoch 13/1000, Train Loss: 2.4100, Valid Loss: 2.2098
Epoch 14/1000, Train Loss: 2.3705, Valid Loss: 2.1750
Epoch 15/1000, Train Loss: 2.3458, Valid Loss: 2.1677
Epoch 16/1000, Train Loss: 2.3069, Valid Loss: 2.1241
Epoch 17/1000, Train Loss: 2.3015, Valid Loss: 2.1057
Epoch 18/1000, Train Loss: 2.2846, Valid Loss: 2.0909
Epoch 19/1000, Train Loss: 2.2566, 

# Check Proformance

In [46]:
from sklearn.metrics import classification_report

model_NN.eval()

predict_train_y = np.argmax(model_NN(train_tensor_x).detach().numpy(), axis=1)
print(f"confusion matrix on train dataset: \n {classification_report(train_tensor_y, predict_train_y)}")

predict_valid_y = np.argmax(model_NN(valid_tensor_x).detach().numpy(), axis=1)
print(f"confusion matrix on valid dataset: \n {classification_report(valid_tensor_y, predict_valid_y)}")

confusion matrix on train dataset: 
               precision    recall  f1-score   support

         0.0       0.82      0.91      0.87       391
         1.0       0.93      0.83      0.88       320
         2.0       0.95      0.78      0.86       332
         3.0       0.79      0.97      0.87       403
         4.0       1.00      1.00      1.00       323
         5.0       0.83      0.88      0.85       297
         6.0       0.96      0.84      0.90       123
         7.0       0.90      0.90      0.90       136
         8.0       0.96      0.92      0.94       146
         9.0       1.00      1.00      1.00        60
        10.0       0.99      0.93      0.96       197
        11.0       1.00      0.97      0.98       145
        12.0       0.95      0.95      0.95       278
        13.0       1.00      0.87      0.93        67
        14.0       0.99      0.98      0.98        92
        15.0       0.91      0.91      0.91       158
        16.0       1.00      1.00      1.00 