# SGS simulation using Enron emails as a source. Day-by-day processing

## Initialization

In [1]:
using Pkg
# Pkg.activate(".")
# Pkg.instantiate()

# Pkg.add("PyCall")
# Pkg.add("DataFrames")
# Pkg.add("CSV")
# Pkg.add("Plots")
# Pkg.add("StatsPlots")
# Pkg.add("StatsBase")
# Pkg.add("GLM")
# Pkg.add("StatsModels")
# Pkg.add("Dates")
# Pkg.add("CSV")
# Pkg.add("EasyConfig")

In [2]:
include("src/sgs_store.jl")

# Import necessary packages
using PyCall
using DataFrames
# import polars as pl
using Dates
using CSV

# Import the fine_tune_model and parse_decoded_strings functions from the Python script
py"""
import sys
sys.path.append(".")
from SGS_Transformers import BertTokenizerWrapper, RobertaTokenizerWrapper, GPT2TokenizerWrapper
"""

In [None]:
redis   = pyimport("redis")

# Define a function to initialize the tokenizer
function initialize_tokenizer()
    return py"RobertaTokenizerWrapper"()
end

# Define a function to filter the DataFrame by date
function filter_dataframe_by_date(df, date)
    return df[df.Date .== date, :]
end

# Define a function to process each column
function process_column(r, tokenizer, filtered_df, column, _parent, chunk_size)
    col_values  = filtered_df[:, column]
    col_sha1    = Util.sha1_union([_parent, string(column)])
    column_size = Base.summarysize(col_values)
    num_chunks  = ceil(Int, column_size / chunk_size)
    chunks      = Store.chunk_array(col_values, num_chunks)

    println(col_sha1, "; num_chunks: ", num_chunks)
    dataset = Store.ingest_df_column(r, tokenizer, chunks, col_sha1)
    dataset_vector = Vector{UInt32}(dataset)

    hll = HllSets.HllSet{10}()
    _hll = HllSets.restore!(hll, dataset_vector)
    
    println("hll: ", HllSets.id(_hll), "; ", HllSets.count(_hll))
    
    entity = Entity.Instance{10}(r, _hll, prefix="b:col")
    
    return entity
end

# Define a function to process each column
function process_row(r, tokenizer, filtered_df, _parent)
    col_sha1    = Util.sha1_union([_parent])

    rows = []

    for row in eachrow(filtered_df)
        # Join column values by space
        row_str = join(row, " ")
        push!(rows, row_str)
    end
        
    dataset = Store.ingest_df_rows(r, tokenizer, rows, col_sha1)
    # dataset_vector = Vector{UInt32}(dataset)

    hll = HllSets.HllSet{10}()
    # _hll = HllSets.restore!(hll, dataset_vector)
    
    # println("hll: ", HllSets.id(_hll), "; ", HllSets.count(_hll))
    
    entity = Entity.Instance{10}(r, hll, prefix="b:row")
    
    return entity
end

# Define a function to process the DataFrame
function process_dataframe(r, start, tokenizer, df, dates_vector, cols, _parent, chunk_size, threshold, batch)
    i = start
    while true && i < length(dates_vector)
        the_date = dates_vector[i]
        filtered_df = filter_dataframe_by_date(df, the_date)

        for column in cols
            entity = process_column(r, tokenizer, filtered_df, column, _parent, chunk_size)
            println("Current Date:", the_date)
        end

        process_row(r, tokenizer, filtered_df, _parent)

        i += 1
        # println("i = ", i)
        if i > threshold
            threshold += batch
            break
        end
    end
    return i, threshold
end

In [None]:

# Main function to run the demo
function main(csv_file_path::String, start, chunk_size, threshold, batch)
    # Initialize the tokenizer
    tokenizer = initialize_tokenizer()

    # Define other necessary variables
    r = redis.Redis(host="localhost", port=6379, db=0)  # Redis connection or other necessary setup
    df = DataFrame(CSV.File(csv_file_path, header=true, select=[:Date, :From, :To, :Subject, :content, :user]))

    # Reformat fields :Date, f:From, and :To
    df.Date = map(x -> Dates.format(Dates.DateTime(x, "yyyy-mm-dd HH:MM:SS"), "yyyy-mm-dd"), df.Date)
    df.From = map(x -> ismissing(x) ? "" : (isnothing(match(r"'([^']*)'", x)) ? "" : match(r"'([^']*)'", x).captures[1]), df.From)
    df.To   = map(x -> ismissing(x) ? "" : (isnothing(match(r"'([^']*)'", x)) ? "" : match(r"'([^']*)'", x).captures[1]), df.To)
    
    # Extract distinct dates from the Date column, order them in ascending order, and convert to a vector
    distinct_dates  = unique(df.Date)
    sorted_dates    = sort(distinct_dates)    
    dates_vector    = collect(sorted_dates)

    cols        = [:From, :To, :Subject, :content, :user]
    _parent     = csv_file_path
    chunk_size  = chunk_size
    threshold   = threshold
    batch       = batch

    # Process the DataFrame
    return process_dataframe(r, start, tokenizer, df, dates_vector, cols, _parent, chunk_size, threshold, batch)

end

In [7]:
csv_file_path = "/home/alexmy/Downloads/POC/DATA/enron_05_17_2015_with_labels_v2.csv"
chunk_size = 512000
threshold = 10
batch = 10
start = 1

1

In [8]:
start, threshold = main(csv_file_path, start, chunk_size, threshold, batch)
start, threshold

6f983ba3758e7233f7379a9c7b6ee565808a8de6; num_chunks: 1
hll: 9748514db03a7c8affd6cff2f1597705f09a40a4; 45
Current Date:1980-01-01
6bc47f481f9b458cf32e52dbd4d6731a5d198af5; num_chunks: 1
hll: 911e0d4a984ad93372d1598ed34836edad38cac0; 86
Current Date:1980-01-01
f6c9fedfe796b71638efc125e924040013ef5234; num_chunks: 1
hll: cff2d63545e5b9c453f5713acda5fa9483730b00; 229
Current Date:1980-01-01
65875368cc6392683f42a0e2938b5c0789485b97; num_chunks: 2
hll: a1966e48ce1bb578d1b63c1a3aff5b922981acef; 2289
Current Date:1980-01-01
981f459d81197edf542958361ef219372da6bd82; num_chunks: 1
hll: e4774580ffd7e39d75c731e24ad74c8f65b69829; 23
Current Date:1980-01-01
processing row . . .
6f983ba3758e7233f7379a9c7b6ee565808a8de6; num_chunks: 1
hll: f2a23e837cd0a755d58fd6258c714af11c07c52d; 2
Current Date:1986-04-26
6bc47f481f9b458cf32e52dbd4d6731a5d198af5; num_chunks: 1
hll: 1c4a447aec77086225c5490e8ee2ed6414876730; 3
Current Date:1986-04-26
f6c9fedfe796b71638efc125e924040013ef5234; num_chunks: 1
hll: da253ec

(11, 20)