## Data Ingestion

In [1]:
import s3fs
import json

fs = s3fs.S3FileSystem(profile='default')

log_lst = []
for fname in fs.ls("msk-tap-raw-logs-sink/topics/raw/partition=0"):
    with fs.open(fname, 'rb') as f:
        for line in f:
            log_lst.append(json.loads(line))

# with fs.open('s3://odni-model-weights/weights/model_5_dict.pth', 'rb') as f:
#     # Test reading the file or just print a message that it's accessible
#     print("Successfully accessed the file.")

In [2]:
print("# logs: ", len(log_lst))

# logs:  9400


## Data Preprocessing

Investigate the clicks, specifically

In [3]:
i = 3
print((log_lst[i].keys()))
print((log_lst[i]["details"].keys()))
print((log_lst[i]["type"]))

dict_keys(['logType', 'userAction', 'scrnRes', 'microTime', 'pageTitle', 'sessionID', 'type', 'clientTime', 'userId', 'target', 'path', 'toolVersion', 'browser', 'useraleVersion', 'pageUrl', 'location', 'details', 'pageReferrer', 'toolName'])
dict_keys(['ctrl', 'meta', 'shift', 'alt', 'clicks'])
click


In [4]:
clicks_lst = []
for log in log_lst:
    if log["type"] == "click":
        clicks_lst.append(log)

In [5]:
print("# click logs: ", len(clicks_lst))

# click logs:  97


## Implement Anomaly Dector using River

Inspiration: https://medium.com/spikelab/anomalies-detection-using-river-398544d3536

Note: Parameters like the number of trees (n_trees), the height of trees (height), and window size (window_size) significantly impact the model's sensitivity. Inappropriate values for these parameters might lead to poor anomaly detection.

How the model works:
*   Constructs "half-spaced trees" structure by randomly spliting feature spaces (vectors) into "half-spaces" according to their range of values
*   As data-points are passed in from the root, at each node, based on the value of the selected feature and the split criterion at that node, the data point moves to the left or right child node
*   The data points ultimately trickle down to a leaf node where counts of the nodes that reach each leaf are recorded
*   If a data-point reaches an infrequent node, it will be considered an anomaly in proportion to the extent to which its ultimate leaf is infrequent

In sum, it goes by **Denisty Estimation** - The idea that regions of the feature space that have lower density (i.e., fewer data points have reached the corresponding leaf nodes) are more likely to represent anomalies.

In [10]:
from river import compose
from river import feature_extraction as fx
from river import anomaly
from river import preprocessing
from river import stats
from river import drift
import numbers

# Define a function to build the model
def build_model(n_trees=25, height=10, window_size=250):
    # Define the feature extraction pipeline
    features_pipeline = compose.TransformerUnion(
        compose.Select('logType', 'userAction', 'scrnRes', 'pageTitle', 'sessionID', 'type', 
                       'clientTime', 'userId', 'target', 'path', 'toolVersion', 'browser', 
                       'useraleVersion', 'pageUrl', 'location', 'details', 'pageReferrer', 
                       'toolName'),
        
        # Using Mean as a replacement for RollingMean
        fx.Agg(on='microTime', by='sessionID', how=stats.Mean())
        # Add more features and aggregations as needed
    )

    # Categorical features processing
    categorical_features = compose.Pipeline(
        compose.SelectType(str),
        preprocessing.OneHotEncoder()
    )

    # Numerical features processing
    numerical_features = compose.Pipeline(
        compose.SelectType(numbers.Number),
        preprocessing.MinMaxScaler()
    )

    # Combining everything into a single pipeline
    model = compose.Pipeline(
        features_pipeline,
        numerical_features + categorical_features,
        anomaly.HalfSpaceTrees(n_trees=n_trees, height=height, window_size=window_size)
    )

    # Add a drift detector to the model (e.g., ADWIN)
    adwin_drift_detector = drift.ADWIN()
    return model, adwin_drift_detector

In [11]:
# Build the model and drift detector
model, drift_detector = build_model()

# Example of training the model on your 'clicks_lst' data
for log in clicks_lst:
    model.learn_one(log)  # No reassignment needed

# Example of using the model to detect anomalies and monitor for concept drift
for i, log in enumerate(clicks_lst):
    # Get anomaly score
    score = model.score_one(log)
    print(f"Anomaly score for log {i}: {score}")

    # Update the drift detector with the anomaly score
    drift_detector.update(score)
    if drift_detector.drift_detected:
        # React to the drift, e.g., retrain the model, log the drift, etc.
        print("Change detected at index", i)
        # Reset the drift detector
        drift_detector.reset()

    # Define a threshold to decide if a log is anomalous (if needed)
    # if score > some_threshold:
    #     print("Anomaly detected!")

Anomaly score for log 0: 0
Anomaly score for log 1: 0
Anomaly score for log 2: 0
Anomaly score for log 3: 0
Anomaly score for log 4: 0
Anomaly score for log 5: 0
Anomaly score for log 6: 0
Anomaly score for log 7: 0
Anomaly score for log 8: 0
Anomaly score for log 9: 0
Anomaly score for log 10: 0
Anomaly score for log 11: 0
Anomaly score for log 12: 0
Anomaly score for log 13: 0
Anomaly score for log 14: 0
Anomaly score for log 15: 0
Anomaly score for log 16: 0
Anomaly score for log 17: 0
Anomaly score for log 18: 0
Anomaly score for log 19: 0
Anomaly score for log 20: 0
Anomaly score for log 21: 0
Anomaly score for log 22: 0
Anomaly score for log 23: 0
Anomaly score for log 24: 0
Anomaly score for log 25: 0
Anomaly score for log 26: 0
Anomaly score for log 27: 0
Anomaly score for log 28: 0
Anomaly score for log 29: 0
Anomaly score for log 30: 0
Anomaly score for log 31: 0
Anomaly score for log 32: 0
Anomaly score for log 33: 0
Anomaly score for log 34: 0
Anomaly score for log 35: 0
An

## Generating Anomalous Data

Things to try:
*   Build webcrawler that does one or many workflows in a reasonable fashion - try playwright library to asynchronously open a browser and does any number of steps 
*   Enter user information, login, navigate to repo, review files, and log out, etc. - needs wait time
*   Make another workflow that's weird - login, click on weird places, click repo, clicking a button a bunch of times, no waittime
*   Could log it to Kafka or do it locally
*   First, work on developing the workflow bots, Evan's gonna send the docs and example scripts and I'll get one working before we figure out uploading that to and extracting from buckets
*   Make a new repository for this, install with pip, 
*   Not sure if it'll work with userale, since it uses a seperate browser. Question is if it uses the plugin as well
*   Ensure the plugin is installed on chromium - Evan will investigate if it does/doesnt
*   Rageclick, anomalous click me

All developments on this front exist in the test-userale-plugin.py file