# SGS simulation using Enron emails as a source. Day-by-day processing

## Initialization

In [1]:
include("src/sgs_store.jl")

using ..Entity
using ..HllSets
using .Util
using JSON3
using TextAnalysis

using PyCall
using CSV
using DataFrames
# using WordTokenizers
using Base.Threads
using Dates

redis   = pyimport("redis")
# Connect to Redis
r = redis.Redis(host="localhost", port=6379, db=0)

csv_file_path = "/home/alexmy/Downloads/POC/DATA/enron_05_17_2015_with_labels_v2.csv"
# Read only specific columns from the CSV file
df = DataFrame(CSV.File(csv_file_path, header=true, select=[:Date, :From, :To, :Subject, :content, :user]))

# Reformat fields :Date, f:From, and :To
df.Date = map(x -> Dates.format(Dates.DateTime(x, "yyyy-mm-dd HH:MM:SS"), "yyyy-mm-dd"), df.Date)
df.From = map(x -> ismissing(x) ? "" : (isnothing(match(r"'([^']*)'", x)) ? "" : match(r"'([^']*)'", x).captures[1]), df.From)
df.To   = map(x -> ismissing(x) ? "" : (isnothing(match(r"'([^']*)'", x)) ? "" : match(r"'([^']*)'", x).captures[1]), df.To)

# Replace all remaining missing values with "unknown"
for col in names(df)
    df[!, col] = coalesce.(df[!, col], "unknown")
end

# Extract distinct dates from the Date column, order them in ascending order, and convert to a vector
distinct_dates = unique(df.Date)
sorted_dates = sort(distinct_dates)
dates_vector = collect(sorted_dates)

println(dates_vector)

# println(first(df, 10))

["1980-01-01", "1986-04-26", "1986-05-01", "1997-01-01", "1997-03-03", "1997-03-05", "1997-03-06", "1997-03-07", "1997-03-11", "1997-03-16", "1997-03-20", "1997-03-21", "1997-03-31", "1997-04-07", "1997-04-10", "1997-04-11", "1997-04-15", "1997-04-17", "1997-04-18", "1997-04-25", "1997-04-29", "1997-05-01", "1997-05-13", "1997-05-14", "1997-05-15", "1997-05-16", "1997-05-22", "1997-05-28", "1997-05-29", "1997-06-04", "1997-06-09", "1997-06-10", "1997-06-12", "1997-06-16", "1997-06-17", "1997-06-18", "1997-06-20", "1997-06-23", "1997-06-25", "1997-06-26", "1997-06-27", "1997-06-30", "1997-07-01", "1997-07-02", "1997-07-15", "1997-07-16", "1997-07-17", "1997-07-22", "1997-07-24", "1997-07-25", "1997-07-28", "1997-07-29", "1997-07-30", "1997-07-31", "1997-08-01", "1997-08-04", "1997-08-05", "1997-08-06", "1997-08-07", "1997-08-08", "1997-08-14", "1997-08-18", "1997-08-20", "1997-08-21", "1997-08-22", "1997-08-23", "1997-08-25", "1997-08-26", "1997-08-27", "1997-08-28", "1997-08-29", "1997

## Simulation loop

In [2]:
# Import the Hugging Face Transformers library
transformers    = pyimport("transformers")
torch           = pyimport("torch")

# Import the fine_tune_model and parse_decoded_strings functions from the Python script
py"""
import sys
sys.path.append(".")
from fine_tune_sample import fine_tune_model
from tokenize_text import parse_decoded_strings, tokenize_text, decode_tokens
"""

# Function to split a long chunk into smaller chunks
function split_chunk(chunk, max_length)
    # Split the chunk into smaller chunks of max_length
    return [chunk[i:min(i+max_length-1, end)] for i in 1:max_length:length(chunk)]
end

function ingest_df_column(r, chunks, col_sha1, p=10)
    col_dataset     = zeros(2^p)
    col_json        = JSON3.write(col_dataset)

    # Ensure all elements in chunks are strings
    str_chunks      = [string(chunk) for chunk in chunks]
    
    # Split long chunks into smaller chunks
    max_length      = 1024
    split_chunks    = vcat([split_chunk(chunk, max_length) for chunk in str_chunks]...)

    # Tokenize each chunk
    ingested_chunks     = [py"tokenize_text"([chunk], max_length=max_length) for chunk in split_chunks]
    decoded_chunks      = [py"decode_tokens"(chunk) for chunk in ingested_chunks]
    # Convert decoded_chunks (Vector of arrays) to Vector{String}
    # decoded_strings     = [join(chunk, " ") for chunk in decoded_chunks]
    # Parse the decoded strings
    decoded_strings = py"parse_decoded_strings"(decoded_chunks)

    unique_tokens = Set()

    if !isempty(decoded_strings) 
        unique_tokens   = Set(decoded_strings)
        tokens          = JSON3.write(collect(unique_tokens))
        result_json     = r.fcall("ingest_01", 2, "", col_sha1, p, tokens)
        col_json        = r.fcall("bit_ops", 0, col_json, result_json, "OR")                        
    end
    return col_json, decoded_strings
end


2024-09-17 04:57:09.599646: E external/local_xla/xla/stream_executor/cuda/cuda_fft.cc:485] Unable to register cuFFT factory: Attempting to register factory for plugin cuFFT when one has already been registered
2024-09-17 04:57:09.621393: E external/local_xla/xla/stream_executor/cuda/cuda_dnn.cc:8454] Unable to register cuDNN factory: Attempting to register factory for plugin cuDNN when one has already been registered
2024-09-17 04:57:09.629053: E external/local_xla/xla/stream_executor/cuda/cuda_blas.cc:1452] Unable to register cuBLAS factory: Attempting to register factory for plugin cuBLAS when one has already been registered
2024-09-17 04:57:09.644705: I tensorflow/core/platform/cpu_feature_guard.cc:210] This TensorFlow binary is optimized to use available CPU instructions in performance-critical operations.
To enable the following instructions: AVX2 FMA, in other operations, rebuild TensorFlow with the appropriate compiler flags.


ingest_df_column (generic function with 2 methods)

In [4]:
cols            = [:From, :To, :Subject, :content, :user]
p::Int          = 10 
chunk_size::Int = 512000
_parent         = csv_file_path

i = 1

1

In [5]:
the_date    = dates_vector[i]
filtered_df = df[df.Date .== the_date, :]

# Generate a timestamp-based ID for the commit
timestamp_id = Dates.format(now(), "yyyy-mm-ddTHH:MM:SS")
println("Timestamp ID: $timestamp_id")

# Initialize a set to track new tokens for this commit
new_tokens_set = Set{String}()

for column in cols 
    println(column)
    
    col_values  = filtered_df[:, column]
    col_sha1    = Util.sha1([string(_parent), string(column)])

    println(col_sha1)

    column_size = Base.summarysize(col_values)
    num_chunks  = ceil(Int, column_size / chunk_size)
    chunks      = Store.chunk_array(col_values, num_chunks)
    
    data, sets  = ingest_df_column(r, chunks, col_sha1)

    println(data)

    println("decoded_string: ", sets)
    sets = Set(collect(sets))
    new_tokens_set  = union!(new_tokens_set, sets)  
    
    println("new: ", new_tokens_set)

    hll             = HllSets.HllSet{10}()
    # dataset         = JSON3.write(data)

    # println(dataset)

    hll             = HllSets.restore(hll, data)
    entity          = Entity.Instance{10}(hll)
    response, sha1  = Entity.store_entity(r, entity, prefix="wb:entity:attr" )

    println("Column entity instance: ", entity)

    Store.stack_push(r, "attr:", col_sha1, entity.sha1)
end

# Convert Set{String} to Vector{String}
new_tokens_list = collect(new_tokens_set)

# Import the fine_tune_model function from the Python script
py"""
import sys
sys.path.append(".")
from fine_tune_sample import fine_tune_model
"""

# Call the fine_tune_model function with the list of new tokens
py"fine_tune_model"(new_tokens_list)

# Process new tokens set
println("New tokens for commit $timestamp_id: ", new_tokens_set)

i = i + 1

Timestamp ID: 2024-09-17T04:57:57
From
404b305e787e9aaded3d27ae254750b3b5750b2b
[0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0



new: Set([" \"sally.beckenr", " \"patti.thompsonenron.com\"", " \"harry.aroraenron.com\"", " \"john.arnoldenron.com\"", "AbstractString\"phillip.allenenron.com\"", " \"phillip.allenenron.com\"", " \"sally.beckenron.com\"", " \"don.baughmanenron.com\"", " \"eric.bassenron.com\""])
Column entity instance: 
Instance(
 sha1: 4a9b66603d1b6446899a1e7fcbbd2bd8378d4e61
 card: 1
 hll: HllSet{10}()
 grad: 0.0
 op: nothing)


To
176c6004fb453449a9ddcc9ef299aa438b6bb59f
[0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,

LoadError: PyError ($(Expr(:escape, :(ccall(#= /home/alexmy/.julia/packages/PyCall/1gn3u/src/pyfncall.jl:43 =# @pysym(:PyObject_Call), PyPtr, (PyPtr, PyPtr, PyPtr), o, pyargsptr, kw))))) <class 'torch.OutOfMemoryError'>
OutOfMemoryError('CUDA out of memory. Tried to allocate 148.00 MiB. GPU 0 has a total capacity of 3.94 GiB of which 7.62 MiB is free. Process 493038 has 2.62 GiB memory in use. Process 802151 has 1.23 GiB memory in use. Including non-PyTorch memory, this process has 27.50 MiB memory in use. Of the allocated memory 0 bytes is allocated by PyTorch, and 0 bytes is reserved by PyTorch but unallocated. If reserved but unallocated memory is large try setting PYTORCH_CUDA_ALLOC_CONF=expandable_segments:True to avoid fragmentation.  See documentation for Memory Management  (https://pytorch.org/docs/stable/notes/cuda.html#environment-variables)')
  File "/home/alexmy/JULIA/SGS/SGS/./fine_tune_sample.py", line 25, in fine_tune_model
    trainer = Trainer(
  File "/home/alexmy/JULIA/SGS/SGS/venv/lib64/python3.10/site-packages/transformers/trainer.py", line 535, in __init__
    self._move_model_to_device(model, args.device)
  File "/home/alexmy/JULIA/SGS/SGS/venv/lib64/python3.10/site-packages/transformers/trainer.py", line 782, in _move_model_to_device
    model = model.to(device)
  File "/home/alexmy/JULIA/SGS/SGS/venv/lib64/python3.10/site-packages/transformers/modeling_utils.py", line 2905, in to
    return super().to(*args, **kwargs)
  File "/home/alexmy/JULIA/SGS/SGS/venv/lib64/python3.10/site-packages/torch/nn/modules/module.py", line 1174, in to
    return self._apply(convert)
  File "/home/alexmy/JULIA/SGS/SGS/venv/lib64/python3.10/site-packages/torch/nn/modules/module.py", line 780, in _apply
    module._apply(fn)
  File "/home/alexmy/JULIA/SGS/SGS/venv/lib64/python3.10/site-packages/torch/nn/modules/module.py", line 780, in _apply
    module._apply(fn)
  File "/home/alexmy/JULIA/SGS/SGS/venv/lib64/python3.10/site-packages/torch/nn/modules/module.py", line 805, in _apply
    param_applied = fn(param)
  File "/home/alexmy/JULIA/SGS/SGS/venv/lib64/python3.10/site-packages/torch/nn/modules/module.py", line 1160, in convert
    return t.to(
