In [None]:
from pyspark.sql.functions import hour, when

# Define Work Hours (8 AM - 6 PM)
WORK_HOURS_START = 8
WORK_HOURS_END = 18

# Extract Hour from Timestamp and Categorize as Work or Off-Hours
logon_df = logon_df.withColumn("hour", hour(col("date")))
logon_df = logon_df.withColumn("work_hours", when((col("hour") >= WORK_HOURS_START) & (col("hour") <= WORK_HOURS_END), "work").otherwise("offhours"))

device_df = device_df.withColumn("hour", hour(col("date")))
device_df = device_df.withColumn("work_hours", when((col("hour") >= WORK_HOURS_START) & (col("hour") <= WORK_HOURS_END), "work").otherwise("offhours"))

http_df = http_df.withColumn("hour", hour(col("date")))
http_df = http_df.withColumn("work_hours", when((col("hour") >= WORK_HOURS_START) & (col("hour") <= WORK_HOURS_END), "work").otherwise("offhours"))

email_df = email_df.withColumn("hour", hour(col("date")))
email_df = email_df.withColumn("work_hours", when((col("hour") >= WORK_HOURS_START) & (col("hour") <= WORK_HOURS_END), "work").otherwise("offhours"))

file_df = file_df.withColumn("hour", hour(col("date")))
file_df = file_df.withColumn("work_hours", when((col("hour") >= WORK_HOURS_START) & (col("hour") <= WORK_HOURS_END), "work").otherwise("offhours"))


In [None]:
from pyspark.sql.functions import concat, lit

# Append Work/Off-Hours to Activity Names
logon_df = logon_df.withColumn("activity", concat(col("activity"), lit("_"), col("work_hours")))
device_df = device_df.withColumn("activity", concat(col("activity"), lit("_"), col("work_hours")))
http_df = http_df.withColumn("activity", concat(lit("http_access_"), col("work_hours")))  # URLs don't have predefined activity names
email_df = email_df.withColumn("activity", concat(lit("email_sent_"), col("work_hours")))  # Email content treated as sent activity
file_df = file_df.withColumn("activity", concat(lit("file_transfer_"), col("work_hours")))  # File access treated as transfer

# Apply Activity Encoding AFTER Work/Off-Hours Classification
logon_df = logon_df.withColumn("activity_encoded", activity_mapping_udf(col("activity")))
device_df = device_df.withColumn("activity_encoded", activity_mapping_udf(col("activity")))
http_df = http_df.withColumn("activity_encoded", activity_mapping_udf(col("activity")))
email_df = email_df.withColumn("activity_encoded", activity_mapping_udf(col("activity")))
file_df = file_df.withColumn("activity_encoded", activity_mapping_udf(col("activity")))


In [None]:
# Merge All Processed DataFrames into User-Day Sequences
user_day_sequences = (
    logon_df.union(device_df).union(http_df).union(email_df).union(file_df)
    .groupBy("user", "date")
    .agg(collect_list("activity_encoded").alias("activity_sequence"))
)

# Save Final Sequence Data for LSTM Training
user_day_sequences.write.csv("lstm_input.csv", header=True)

print("✅ Successfully classified activities as work/off-hours and saved sequences for LSTM!")


In [None]:
from pyspark.sql.functions import when

# Categorize sentiment
http_df = http_df.withColumn(
    "sentiment_category",
    when(col("sentiment_score") > 0.2, "positive")
    .when(col("sentiment_score") < -0.2, "negative")
    .otherwise("neutral")
)

# Append work/off-hours classification
http_df = http_df.withColumn("activity", concat(lit("http_"), col("sentiment_category"), lit("_"), col("work_hours")))

http_df.select("user", "date", "url", "sentiment_score", "activity").show(5)


In [None]:
from pyspark.sql.functions import collect_list

# Group activities per user-day
user_day_sequences = (
    logon_df.union(device_df).union(http_df).union(email_df).union(file_df)
    .groupBy("user", "date")
    .agg(collect_list("activity").alias("activity_sequence"))
)

user_day_sequences.show(5)


start


In [1]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, to_date, hour, when

# Initialize Spark
spark = SparkSession.builder.appName("CERT-Preprocessing").getOrCreate()

# Load CSV files
logon_df = spark.read.csv("processed_logon.csv", header=True, inferSchema=True)
device_df = spark.read.csv("processed_device.csv", header=True, inferSchema=True)
http_df = spark.read.csv("processed_http.csv", header=True, inferSchema=True)
email_df = spark.read.csv("final_email.csv", header=True, inferSchema=True)
file_df = spark.read.csv("processed_file.csv", header=True, inferSchema=True)



In [2]:
# Convert timestamps to user-day format
logon_df = logon_df.withColumn("date", to_date(col("date")))
device_df = device_df.withColumn("date", to_date(col("date")))
http_df = http_df.withColumn("date", to_date(col("date")))
email_df = email_df.withColumn("date", to_date(col("date")))
file_df = file_df.withColumn("date", to_date(col("date")))



In [None]:
# Extract hour for work/off-hours classification
logon_df = logon_df.withColumn("hour", hour(col("date")))
device_df = device_df.withColumn("hour", hour(col("date")))
http_df = http_df.withColumn("hour", hour(col("date")))
email_df = email_df.withColumn("hour", hour(col("date")))
file_df = file_df.withColumn("hour", hour(col("date")))

# Classify activities into work or off-hours
logon_df = logon_df.withColumn("work_hours", when((col("hour") >= 8) & (col("hour") <= 18), "work").otherwise("offhours"))
device_df = device_df.withColumn("work_hours", when((col("hour") >= 8) & (col("hour") <= 18), "work").otherwise("offhours"))
http_df = http_df.withColumn("work_hours", when((col("hour") >= 8) & (col("hour") <= 18), "work").otherwise("offhours"))
email_df = email_df.withColumn("work_hours", when((col("hour") >= 8) & (col("hour") <= 18), "work").otherwise("offhours"))
file_df = file_df.withColumn("work_hours", when((col("hour") >= 8) & (col("hour") <= 18), "work").otherwise("offhours"))

from pyspark.sql.functions import concat, lit, col


# Append work/off-hours classification to activity names
logon_df = logon_df.withColumn("activity", col("activity") + "_" + col("work_hours"))
device_df = device_df.withColumn("activity", col("activity") + "_" + col("work_hours"))
http_df = http_df.withColumn(
    "activity",
    concat(lit("http_access_"), col("work_hours"))
)
http_df = http_df.withColumn(
    "activity",
    concat(lit("http_access_"), col("work_hours"))
)
file_df = file_df.withColumn("activity", col("activity") + "_" + col("work_hours"))



AnalysisException: [UNRESOLVED_COLUMN.WITH_SUGGESTION] A column or function parameter with name `activity` cannot be resolved. Did you mean one of the following? [`cc`, `content`, `id`, `size`, `to`].;
'Project [id#106, date#200, user#108, pc#109, to#110, cc#111, bcc#112, from#113, size#114, attachments#115, content_x#116, anomalous#117, content_y#118, cleaned_content_x#119, email_sentiment_x#120, content#121, cleaned_content_y#122, email_sentiment_y#123, email_activity#124, hour#530, work_hours#590, (('activity + _) + work_hours#590) AS activity#651]
+- Project [id#106, date#200, user#108, pc#109, to#110, cc#111, bcc#112, from#113, size#114, attachments#115, content_x#116, anomalous#117, content_y#118, cleaned_content_x#119, email_sentiment_x#120, content#121, cleaned_content_y#122, email_sentiment_y#123, email_activity#124, hour#530, CASE WHEN ((hour#530 >= 8) AND (hour#530 <= 18)) THEN work ELSE offhours END AS work_hours#590]
   +- Project [id#106, date#200, user#108, pc#109, to#110, cc#111, bcc#112, from#113, size#114, attachments#115, content_x#116, anomalous#117, content_y#118, cleaned_content_x#119, email_sentiment_x#120, content#121, cleaned_content_y#122, email_sentiment_y#123, email_activity#124, hour(cast(date#200 as timestamp), Some(Asia/Calcutta)) AS hour#530, work_hours#452]
      +- Project [id#106, date#200, user#108, pc#109, to#110, cc#111, bcc#112, from#113, size#114, attachments#115, content_x#116, anomalous#117, content_y#118, cleaned_content_x#119, email_sentiment_x#120, content#121, cleaned_content_y#122, email_sentiment_y#123, email_activity#124, hour#392, CASE WHEN ((hour#392 >= 8) AND (hour#392 <= 18)) THEN work ELSE offhours END AS work_hours#452]
         +- Project [id#106, date#200, user#108, pc#109, to#110, cc#111, bcc#112, from#113, size#114, attachments#115, content_x#116, anomalous#117, content_y#118, cleaned_content_x#119, email_sentiment_x#120, content#121, cleaned_content_y#122, email_sentiment_y#123, email_activity#124, hour(cast(date#200 as timestamp), Some(Asia/Calcutta)) AS hour#392, work_hours#313]
            +- Project [id#106, date#200, user#108, pc#109, to#110, cc#111, bcc#112, from#113, size#114, attachments#115, content_x#116, anomalous#117, content_y#118, cleaned_content_x#119, email_sentiment_x#120, content#121, cleaned_content_y#122, email_sentiment_y#123, email_activity#124, hour#255, CASE WHEN ((hour#255 >= 8) AND (hour#255 <= 18)) THEN work ELSE offhours END AS work_hours#313]
               +- Project [id#106, date#200, user#108, pc#109, to#110, cc#111, bcc#112, from#113, size#114, attachments#115, content_x#116, anomalous#117, content_y#118, cleaned_content_x#119, email_sentiment_x#120, content#121, cleaned_content_y#122, email_sentiment_y#123, email_activity#124, hour(cast(date#200 as timestamp), Some(Asia/Calcutta)) AS hour#255]
                  +- Project [id#106, to_date(date#107, None, Some(Asia/Calcutta), false) AS date#200, user#108, pc#109, to#110, cc#111, bcc#112, from#113, size#114, attachments#115, content_x#116, anomalous#117, content_y#118, cleaned_content_x#119, email_sentiment_x#120, content#121, cleaned_content_y#122, email_sentiment_y#123, email_activity#124]
                     +- Relation [id#106,date#107,user#108,pc#109,to#110,cc#111,bcc#112,from#113,size#114,attachments#115,content_x#116,anomalous#117,content_y#118,cleaned_content_x#119,email_sentiment_x#120,content#121,cleaned_content_y#122,email_sentiment_y#123,email_activity#124] csv


In [None]:
# Define 24 activity categories (12 work + 12 off-hours)
activity_mapping = {
    "logon_work": 0, "logoff_work": 1, "file_open_work": 2, "file_copy_work": 3,
    "file_delete_work": 4, "web_browsing_work": 5, "email_sent_work": 6, "usb_insert_work": 7,
    "email_attachment_work": 8, "logon_offhours": 9, "logoff_offhours": 10, "file_open_offhours": 11,
    "file_copy_offhours": 12, "file_delete_offhours": 13, "web_browsing_offhours": 14, "email_sent_offhours": 15,
    "usb_insert_offhours": 16, "email_attachment_offhours": 17, "http_access_work": 18, "http_access_offhours": 19,
    "file_transfer_work": 20, "file_transfer_offhours": 21, "external_email_work": 22, "external_email_offhours": 23
}

from pyspark.sql.functions import udf
from pyspark.sql.types import IntegerType

# Apply activity encoding
activity_mapping_udf = udf(lambda activity: activity_mapping.get(activity, 24), IntegerType())

logon_df = logon_df.withColumn("activity_encoded", activity_mapping_udf(col("activity")))
device_df = device_df.withColumn("activity_encoded", activity_mapping_udf(col("activity")))
http_df = http_df.withColumn("activity_encoded", activity_mapping_udf(col("activity")))
email_df = email_df.withColumn("activity_encoded", activity_mapping_udf(col("activity")))
file_df = file_df.withColumn("activity_encoded", activity_mapping_udf(col("activity")))

# Show transformed data
logon_df.select("user", "date", "activity", "activity_encoded").show(5)


start


In [None]:
# Define work hours
WORK_START = 7  # 9 AM
WORK_END = 18   # 5 PM

In [None]:
import pandas as pd

# Load the logon.csv file
logon_df = pd.read_csv("logon.csv").drop(columns=['id'])
device_df=pd.read_csv("device.csv").drop(columns=['id','pc'])
file_df=pd.read_csv("file.csv").drop(columns=['id','content'])
http_df = pd.read_csv("http.csv").drop(columns=['id','pc','url','content'])
email_df= pd.read_csv("email.csv").drop(columns=['id','pc','cc','bcc','size','content'])

# Display the first few rows
print(email_df.head())


In [7]:
email_df.drop(columns=['content_x','content_y','cleaned_content_x','cleaned_content_y','email_activity','email_sentiment_x'])

Unnamed: 0,date,user,to,from,attachments,anomalous,email_sentiment_y
0,01/02/2010 07:11:45,LAP0338,Dean.Flynn.Hines@dtaa.com;Wade_Harrison@lockhe...,Lynn.Adena.Pratt@dtaa.com,0,0,0.100000
1,01/02/2010 07:13:00,LAP0338,Penelope_Colon@netzero.com,Lynn_A_Pratt@earthlink.net,0,0,-0.191667
2,01/02/2010 07:13:17,LAP0338,Judith_Hayden@comcast.net,Lynn_A_Pratt@earthlink.net,0,0,0.050000
3,01/02/2010 08:10:07,HSB0196,Sawyer.A.Turner@sbcglobal.net,Hadley.S.Bowen@charter.net,0,0,0.000000
4,01/02/2010 08:33:56,HSB0196,Sawyer.Abel.Turner@dtaa.com,Hadley.Sonya.Bowen@dtaa.com,3,0,-0.208333
...,...,...,...,...,...,...,...
1115161,05/16/2011 20:46:19,OSH0655,Brewer-Armand@hotmail.com;Kay-Skinner@yahoo.co...,Harrison_Olympia@earthlink.net,2,0,-0.047500
1115162,05/16/2011 20:49:51,OSH0655,Anjolie.Bowman@gmail.com,Harrison_Olympia@earthlink.net,0,0,0.012037
1115163,05/16/2011 20:52:54,DID0650,Talley_Eagan@msn.com;Jonah.M.Wilder@aol.com;Ur...,Denise.I.Doyle@optonline.net,0,0,0.053571
1115164,05/16/2011 20:54:43,LAF0991,Hu.Akeem.Vincent@dtaa.com,Lucas.Ahmed.Ferrell@dtaa.com,1,0,0.000000


In [12]:
device_df=pd.read_csv("device.csv").drop(columns=['id'])

In [13]:
activity_mapping = {
    # ✅ Logon & Logoff
    "logon_work": 0, "logoff_work": 1,
    "logon_offhours": 15, "logoff_offhours": 16,

    # ✅ File Activities
    "file_copy_work": 2, "file_copy_offhours": 17, 

    # ✅ Web Browsing / HTTP
   
    

    # ✅ Email Activities
    "email_sent_work": 3, "email_sent_offhours": 18,
    "email_received_work": 4, "email_received_offhours": 29,
    "email_attachment_work": 5, "email_attachment_offhours": 20,
    "email_positive_work": 6, "email_positive_offhours": 21,
    "email_negative_work": 7, "email_negative_offhours": 22,
    "email_neutral_work": 8, "email_neutral_offhours": 23,

    # ✅ USB Device (Thumb Drive)
    "thumbdrive_connected_work": 9, "thumbdrive_connected_offhours": 24,
    "thumbdrive_disconnected_work": 10, "thumbdrive_disconnected_offhours": 25,

    # ✅ File Transfers
    
    
    "http_request_work": 11, "http_request_offhours": 26,
    "http_positive_work": 12, "http_positive_offhours": 27,
    "http_negative_work": 13, "http_negative_offhours": 28,
    "http_neutral_work": 14, "http_neutral_offhours": 29
}


In [14]:
# Convert 'date' column to datetime
logon_df['date'] = pd.to_datetime(logon_df['date'])
device_df['date'] = pd.to_datetime(device_df['date'])
email_df['date'] = pd.to_datetime(email_df['date'])
http_df['date'] = pd.to_datetime(http_df['date'])
file_df['date'] = pd.to_datetime(file_df['date'])


In [18]:
def classify_logon_event(row):
    hour = row['date'].hour  # Extract the hour from timestamp

    if row['activity'] == 'logon':
        return 'logon_work' if WORK_START <= hour < WORK_END else 'logon_offhours'
    elif row['activity'] == 'logoff':
        return 'logoff_work' if WORK_START <= hour < WORK_END else 'logoff_offhours'
    return None  # Ignore other cases

def classify_device_event(row):
    """Classifies device connection/disconnection based on work hours."""
    hour = pd.to_datetime(row['date']).hour  # Extract hour from timestamp
    
    if pd.notna(row['pc']):  # Only process rows where 'pc' is not empty
        if row['activity'] == 'connect':
            return 'thumbdrive_connected_work' if WORK_START <= hour < WORK_END else 'thumbdrive_connected_offhours'
        elif row['activity'] == 'disconnect':
            return 'thumbdrive_disconnected_work' if WORK_START <= hour < WORK_END else 'thumbdrive_disconnected_offhours'
    
    return None  # Ignore cases where 'pc' is empty

def classify_email_activity(row):
    """Assigns multiple activity types to each email event."""
    hour = pd.to_datetime(row['date']).hour  # Extract hour

    # Determine if work or off-hours
    is_work_hours = WORK_START <= hour < WORK_END

    activity_types = []  # Store multiple labels

    # Email Sent
    if row["from"]:  # If there is a sender
        activity_types.append("email_sent_work" if is_work_hours else "email_sent_offhours")

    # Email Received
    if row["to"]:  # If there are recipients
        activity_types.append("email_received_work" if is_work_hours else "email_received_offhours")

    # Email with Attachment
    if row["attachments"] > 0:
        activity_types.append("email_attachment_work" if is_work_hours else "email_attachment_offhours")

    # Email Sentiment Classification
    sentiment = row["email_sentiment_y"]
    if sentiment > 0.2:
        activity_types.append("email_positive_work" if is_work_hours else "email_positive_offhours")
    elif sentiment < -0.2:
        activity_types.append("email_negative_work" if is_work_hours else "email_negative_offhours")
    else:
        activity_types.append("email_neutral_work" if is_work_hours else "email_neutral_offhours")

    # Convert activity types to encoded values
    return [activity_mapping[act] for act in activity_types]

def classify_file_activity(row):
    """Encodes file copy activity based on work hours."""
    hour = pd.to_datetime(row['date']).hour  # Extract hour

    # Check if both 'pc' and 'filename' are not empty
    if pd.notna(row['pc']) and pd.notna(row['filename']):
        return activity_mapping["file_copy_work"] if WORK_START <= hour < WORK_END else activity_mapping["file_copy_offhours"]
    
    return None  # Ignore events where pc or filename is missing

def encode_http_activity(row):
    """Encodes HTTP request activity based on work hours and sentiment."""
    hour = pd.to_datetime(row['date']).hour  # Extract hour from timestamp
    is_work_hours = WORK_START <= hour < WORK_END

    activity_types = []  # Store multiple activity labels

    # General HTTP request activity
    activity_types.append("http_request_work" if is_work_hours else "http_request_offhours")

    # Browsing Sentiment Classification
    sentiment = row["sentiment"]
    if sentiment > 0.2:
        activity_types.append("http_positive_work" if is_work_hours else "http_positive_offhours")
    elif sentiment < -0.2:
        activity_types.append("http_negative_work" if is_work_hours else "http_negative_offhours")
    else:
        activity_types.append("http_neutral_work" if is_work_hours else "http_neutral_offhours")

    # Convert activity types to encoded values
    return [activity_mapping[act] for act in activity_types]

In [19]:
# Apply the function
logon_df['activity_type'] = logon_df.apply(classify_logon_event, axis=1)
device_df['activity_type'] = device_df.apply(classify_device_event, axis=1)
email_df['activity_sequence'] = email_df.apply(classify_email_activity, axis=1)
file_df['activity_type'] = file_df.apply(classify_file_activity, axis=1)
http_df['activity_sequence'] = http_df.apply(encode_http_activity, axis=1)

In [None]:

# Convert activity types to numeric values
logon_df['encoded_activity'] = logon_df['activity_type'].map(activity_mapping)
device_df['encoded_activity'] = device_df['activity_type'].map(activity_mapping)
file_df['encoded_activity'] = file_df['activity_type'].map(activity_mapping)



In [22]:
email_df.head()

Unnamed: 0,date,user,to,from,attachments,content_x,anomalous,content_y,cleaned_content_x,email_sentiment_x,cleaned_content_y,email_sentiment_y,email_activity,activity_sequence
0,2010-01-02 07:11:45,LAP0338,Dean.Flynn.Hines@dtaa.com;Wade_Harrison@lockhe...,Lynn.Adena.Pratt@dtaa.com,0,middle f2 systems 4 july techniques powerful d...,0,middle f2 systems 4 july techniques powerful d...,middle f2 systems 4 july techniques powerful d...,0.1,middle f2 systems 4 july techniques powerful d...,0.1,email_neutral,"[18, 29, 23]"
1,2010-01-02 07:13:00,LAP0338,Penelope_Colon@netzero.com,Lynn_A_Pratt@earthlink.net,0,slowly this uncinus winter beneath addition ex...,0,slowly this uncinus winter beneath addition ex...,slowly uncinus winter beneath addition exist p...,-0.191667,slowly uncinus winter beneath addition exist p...,-0.191667,email_neutral,"[18, 29, 23]"
2,2010-01-02 07:13:17,LAP0338,Judith_Hayden@comcast.net,Lynn_A_Pratt@earthlink.net,0,400 other difficult land cirrocumulus powered ...,0,400 other difficult land cirrocumulus powered ...,400 difficult land cirrocumulus powered probab...,0.05,400 difficult land cirrocumulus powered probab...,0.05,email_neutral,"[18, 29, 23]"
3,2010-01-02 08:10:07,HSB0196,Sawyer.A.Turner@sbcglobal.net,Hadley.S.Bowen@charter.net,0,enacted saw 63 recently 5 were representatives...,0,enacted saw 63 recently 5 were representatives...,enacted saw 63 recently 5 representatives safe...,0.0,enacted saw 63 recently 5 representatives safe...,0.0,email_neutral,"[18, 29, 23]"
4,2010-01-02 08:33:56,HSB0196,Sawyer.Abel.Turner@dtaa.com,Hadley.Sonya.Bowen@dtaa.com,3,attitude basis you collapse note found words r...,0,attitude basis you collapse note found words r...,attitude basis collapse note found words range...,-0.208333,attitude basis collapse note found words range...,-0.208333,email_negative,"[18, 29, 20, 22]"


In [23]:
# # Expand sequences (convert list from string format)
# email_df['activity_sequence'] = email_df['activity_sequence'].apply(eval)
# http_df['activity_sequence'] = http_df['activity_sequence'].apply(eval)

# Explode email & http sequences (each activity gets its own row)
email_df = email_df.explode('activity_sequence').rename(columns={'activity_sequence': 'activity_encoded'})
http_df = http_df.explode('activity_sequence').rename(columns={'activity_sequence': 'activity_encoded'})

In [24]:
email_df.head()

Unnamed: 0,date,user,to,from,attachments,content_x,anomalous,content_y,cleaned_content_x,email_sentiment_x,cleaned_content_y,email_sentiment_y,email_activity,activity_encoded
0,2010-01-02 07:11:45,LAP0338,Dean.Flynn.Hines@dtaa.com;Wade_Harrison@lockhe...,Lynn.Adena.Pratt@dtaa.com,0,middle f2 systems 4 july techniques powerful d...,0,middle f2 systems 4 july techniques powerful d...,middle f2 systems 4 july techniques powerful d...,0.1,middle f2 systems 4 july techniques powerful d...,0.1,email_neutral,18
0,2010-01-02 07:11:45,LAP0338,Dean.Flynn.Hines@dtaa.com;Wade_Harrison@lockhe...,Lynn.Adena.Pratt@dtaa.com,0,middle f2 systems 4 july techniques powerful d...,0,middle f2 systems 4 july techniques powerful d...,middle f2 systems 4 july techniques powerful d...,0.1,middle f2 systems 4 july techniques powerful d...,0.1,email_neutral,29
0,2010-01-02 07:11:45,LAP0338,Dean.Flynn.Hines@dtaa.com;Wade_Harrison@lockhe...,Lynn.Adena.Pratt@dtaa.com,0,middle f2 systems 4 july techniques powerful d...,0,middle f2 systems 4 july techniques powerful d...,middle f2 systems 4 july techniques powerful d...,0.1,middle f2 systems 4 july techniques powerful d...,0.1,email_neutral,23
1,2010-01-02 07:13:00,LAP0338,Penelope_Colon@netzero.com,Lynn_A_Pratt@earthlink.net,0,slowly this uncinus winter beneath addition ex...,0,slowly this uncinus winter beneath addition ex...,slowly uncinus winter beneath addition exist p...,-0.191667,slowly uncinus winter beneath addition exist p...,-0.191667,email_neutral,18
1,2010-01-02 07:13:00,LAP0338,Penelope_Colon@netzero.com,Lynn_A_Pratt@earthlink.net,0,slowly this uncinus winter beneath addition ex...,0,slowly this uncinus winter beneath addition ex...,slowly uncinus winter beneath addition exist p...,-0.191667,slowly uncinus winter beneath addition exist p...,-0.191667,email_neutral,29


In [40]:
for df in [logon_df, email_df, http_df, file_df, device_df]:
    df['date'] = pd.to_datetime(df['date']).dt.date

In [41]:
# Merge all activities into a single DataFrame
merged_df = pd.concat([logon_df, email_df, http_df, file_df, device_df])

  merged_df = pd.concat([logon_df, email_df, http_df, file_df, device_df])


In [42]:
merged_df.head()

Unnamed: 0,date,user,pc,activity,anomalous,activity_type,encoded_activity,to,from,attachments,content_x,content_y,cleaned_content_x,email_sentiment_x,cleaned_content_y,email_sentiment_y,email_activity,activity_encoded,sentiment,filename
0,2010-01-02,IRM0931,PC-7188,Logon,0,,,,,,,,,,,,,,,
1,2010-01-02,LAP0338,PC-5758,Logon,0,,,,,,,,,,,,,,,
2,2010-01-02,NOB0181,PC-3446,Logon,0,,,,,,,,,,,,,,,
3,2010-01-02,CTR0341,PC-6184,Logon,0,,,,,,,,,,,,,,,
4,2010-01-02,ATE0869,PC-1313,Logon,0,,,,,,,,,,,,,,,


In [43]:
daily_labels = merged_df.groupby(['user', 'date'])['anomalous'].max().reset_index()

In [44]:
user_sequences = merged_df.groupby(['user', 'date'])['activity_encoded'].apply(list).reset_index()


In [45]:
# Merge anomaly labels with user-day sequences
user_sequences = user_sequences.merge(daily_labels, on=['user', 'date'], how='left')

In [46]:
user_sequences.head()

Unnamed: 0,user,date,activity_encoded,anomalous
0,AAF0535,2010-01-04,"[nan, nan, 3, 4, 5, 8, 3, 4, 8, 3, 4, 5, 8, 11...",0
1,AAF0535,2010-01-05,"[nan, nan, 3, 4, 6, 3, 4, 5, 8, 3, 4, 8, 11, 1...",0
2,AAF0535,2010-01-06,"[nan, nan, 3, 4, 6, 3, 4, 7, 11, 14, 11, 12, 1...",0
3,AAF0535,2010-01-07,"[nan, nan, 3, 4, 5, 8, 3, 4, 6, 3, 4, 5, 8, 11...",0
4,AAF0535,2010-01-08,"[nan, nan, 3, 4, 8, 3, 4, 6, 3, 4, 5, 8, 11, 1...",0


In [76]:
import numpy as np
import pandas as pd
import ast  # To safely parse string lists

def process_activity_sequence(seq):
    """Convert string sequences to lists & replace only NaNs inside the sequence."""
    
    # Convert string representation of lists to actual lists
    if isinstance(seq, str):
        try:
            seq = ast.literal_eval(seq)  # Safer than eval(), converts "[6, 8, nan]" → [6, 8, nan]
        except:
            return [99]  # Fallback if parsing fails
    
    # If seq is a single NaN, return [99]
    if seq is None or (isinstance(seq, float) and np.isnan(seq)):
        return [99]

    # Ensure it's a list and replace NaNs inside the list
    if isinstance(seq, list):
        return [99 if (pd.isna(x) or x is None or (isinstance(x, float) and np.isnan(x))) else x for x in seq]
    
    # If it's still not a list, return [99] as a fallback
    return [99]




In [77]:
# Convert 'activity_sequence' column from string to list & fix NaNs
user_sequences['activity_encoded'] = user_sequences['activity_encoded'].apply(process_activity_sequence)

# # Ensure all sequences are valid lists
# user_sequences = user_sequences.dropna(subset=['activity_encoded'])

# Save cleaned dataset
user_sequences.to_csv("lstm_input_cleaned3.csv", index=False)