In [None]:
import numpy as np
import pandas as pd
import os
from functools import partial
from datetime import datetime
from multiprocessing import Pool
from io import StringIO

In [24]:
folder = '../r6.2/'
TEST=True
internal_domain = 'dtaa.com'
working_hours_start = datetime.strptime('09:00:00', '%H:%M:%S').time()
working_hours_end = datetime.strptime('17:00:00', '%H:%M:%S').time()
user_records = pd.read_csv(f'{folder}/users.csv')
test_data_size = 20000

In [25]:
#Multiprocessing and Parallel Processing Variables to Distribute processing thorough different nodes and cores
#Not using multiple nodes, but only using avialabe 20 cores in the high memmory node
total_processes = os.cpu_count()

In [26]:
df = pd.read_csv(f'{folder}/email.csv', nrows=test_data_size if TEST else None)

In [33]:
# Convert the 'date' column to a datetime data type
# Using parallel processing by dividing the dataset into chunks
# I am just trying parallel processing on this -> I can reuse the chunks and make the processing even faster. But whatever, the data is already clean

def process_chunk(chunk):
    chunk['date'] = pd.to_datetime(chunk['date'], format='mixed')
    # Perform other data processing on the chunk
    return chunk

chunk_size = int(test_data_size / total_processes) if TEST else 100000
# chunk_iter = pd.read_csv(f'{folder}/email.csv', chunksize=test_data_size if TEST else chunk_size)

chunk_iter = pd.read_csv(StringIO(df.to_csv()), chunksize=chunk_size)

# Initialize a multiprocessing pool
with Pool(total_processes) as pool:
    processed_chunks = pool.map(process_chunk, chunk_iter)

# Concatenate the processed chunks back into a single DataFrame
df = pd.concat(processed_chunks)

In [34]:
def is_working_hours(timestamp):
    time = timestamp.time()
    # Check if it's a weekday (Monday: 0, Tuesday: 1, ..., Friday: 4)
    if timestamp.weekday() in range(0, 5):
        # Check if it's working hours or not
        if working_hours_start <= time <= working_hours_end:
            return True
    return False  # It's not a weekday or not within working hours

def has_attachments(attachments):
    if pd.notna(attachments):  # Check if attachments is not null
        # Split the attachments based on the semicolon and count the parts
        if(len(attachments.split(';')) > 0):
            return True
        else:
            return False
    else:
        return False

def is_sent_email(activity):
    if(activity == "Send"):
        return True
    else:
        return False

def is_recieved_email(activity):
    if(activity == "View"):
        return True
    else:
        return False

def count_attachments(attachments):
    if pd.notna(attachments):
        return len(attachments.split(';'))
    else:
        return 0

def concatenate_recipients_with_semicolon(row):
    concatenated = ''
    for col in ['to', 'cc', 'bcc']:
        value = row[col]
        if pd.notna(value):
            concatenated += value + ';'
    return concatenated[:-1]  # Remove the trailing semicolon


# Function to count distinct emails for a user group based on working hours
def count_distinct_emails(user_group, working_hours=True):
    user_group_df = user_group[1]  # Access the DataFrame within the tuple
    distinct_emails = set()
    for index, row in user_group_df.iterrows():
        if working_hours != row['WorkingHours']:
            continue
        combined_recipient_emails = concatenate_recipients_with_semicolon(row)
        if combined_recipient_emails:
            email_addresses = combined_recipient_emails.split(';')
            distinct_emails.update(email_addresses)
    return len(distinct_emails)
    
# Function to count distinct internal emails for a user group based on working hours
def count_distinct_internal_emails(user_group, working_hours=True):
    user_group_df = user_group[1]  # Access the DataFrame within the tuple
    distinct_emails = set()
    for index, row in user_group_df.iterrows():
        if working_hours != row['WorkingHours']:
            continue
        combined_recipient_emails = concatenate_recipients_with_semicolon(row)
        if combined_recipient_emails:
            email_addresses = combined_recipient_emails.split(';')
            distinct_internal_emails = [email for email in email_addresses if internal_domain in email]
            distinct_emails.update(distinct_internal_emails)
    return len(distinct_emails)



In [35]:
# Getting different columns

# Parallelize the processing of the three tasks
# Split the DataFrame into chunks
chunk_size = len(df) // total_processes
chunks = [df[i:i + chunk_size] for i in range(0, len(df), chunk_size)]

# Function to process a chunk of the DataFrame
def process_chunk(chunk):
    chunk['WorkingHours'] = chunk['date'].apply(is_working_hours)
    chunk['WithAttachement'] = chunk['attachments'].apply(has_attachments)
    chunk['AttachmentCount'] = chunk['attachments'].apply(count_attachments)
    chunk['EmailSent'] = chunk['activity'].apply(is_sent_email)
    chunk['EmailRecieved'] = chunk['activity'].apply(is_recieved_email)
    return chunk

# Use multiprocessing to process the chunks
with Pool(total_processes) as pool:
    results = pool.map(process_chunk, chunks)

# Concatenate the results
df = pd.concat(results)

In [36]:
# Group the data by 'user'
grouped = df.groupby('user')

In [39]:
# Partial functions for distinct and internal emails

# Create partial functions for counting distinct emails during working and non-working hours
count_distinct_emails_working_hours = partial(count_distinct_emails, working_hours=True)
count_distinct_emails_non_working_hours = partial(count_distinct_emails, working_hours=False)

# Create partial functions for counting distinct emails during working and non-working hours
count_distinct_internal_emails_working_hours = partial(count_distinct_internal_emails, working_hours=True)
count_distinct_internal_emails_non_working_hours = partial(count_distinct_internal_emails, working_hours=False)

In [41]:
# Apply the distinct email count functions to each user group using a Pool
# Caveat : Not a good practice of using dicts for big data as it consumes a lot of memory: However, it worked, so didnt bother changing this code
with Pool(total_processes) as pool:
    distinct_email_counts_working_hours = {user_group[0]: count_distinct_emails_working_hours(user_group) for user_group in grouped}
    distinct_email_counts_non_working_hours = {user_group[0]: count_distinct_emails_non_working_hours(user_group) for user_group in grouped}
    distinct_internal_email_counts_working_hours = {user_group[0]: count_distinct_internal_emails_working_hours(user_group) for user_group in grouped}
    distinct_internal_email_counts_non_working_hours = {user_group[0]: count_distinct_internal_emails_non_working_hours(user_group) for user_group in grouped}

In [42]:
def calculate_user_attributes(user_group):
    index,user_group_df=user_group
    num_send_day = len(user_group_df[(user_group_df['EmailSent']) & (user_group_df['WorkingHours'])])
    num_send_night = len(user_group_df[(user_group_df['EmailSent']) & (~user_group_df['WorkingHours'])])
    
    num_recieved_day = len(user_group_df[(user_group_df['EmailRecieved']) & (user_group_df['WorkingHours'])])
    num_recieved_night = len(user_group_df[(user_group_df['EmailRecieved']) & (~user_group_df['WorkingHours'])])

    num_attachment_day = user_group_df[user_group_df['WorkingHours']]['AttachmentCount'].sum()
    num_attachment_night = user_group_df[~user_group_df['WorkingHours']]['AttachmentCount'].sum()

    num_sent_with_attachments_day = len(user_group_df[(user_group_df['WithAttachement']) & (user_group_df['EmailSent']) & (user_group_df['WorkingHours'])])
    num_sent_with_attachments_night = len(user_group_df[(user_group_df['WithAttachement']) & (user_group_df['EmailSent']) & (~user_group_df['WorkingHours'])])

    num_recieved_with_attachments_day = len(user_group_df[(user_group_df['WithAttachement']) & (user_group_df['EmailRecieved']) & (user_group_df['WorkingHours'])])
    num_recieved_with_attachments_night = len(user_group_df[(user_group_df['WithAttachement']) & (user_group_df['EmailRecieved']) & (~user_group_df['WorkingHours'])])

    num_distinct_emails_during_office_hours = distinct_email_counts_working_hours[index]
    num_distinct_emails_during_off_hours = distinct_email_counts_non_working_hours[index]
    #Distinct Internal Emails Count
    num_distinct_internal_emails_during_office_hours = distinct_internal_email_counts_working_hours[index]
    num_distinct_internal_emails_during_off_hours = distinct_internal_email_counts_non_working_hours[index]
    
    attributes = {
        'User': user_group_df['user'].iloc[0],
        'numAttachmentDay': num_attachment_day,
        'numAttachmentNight': num_attachment_night,
        'numSendDay': num_send_day,
        'numSendNight': num_send_night,
        'numRecievedDay': num_recieved_day,
        'numRecievedNight': num_recieved_night,
        'numEmailSentwithAttachDay': num_sent_with_attachments_day,
        'numEmailSentwithAttachNight': num_sent_with_attachments_night,
        'numEmailRecievedwithAttachDay': num_recieved_with_attachments_day,
        'numEmailRecievedwithAttachNight': num_recieved_with_attachments_night,
        'numdistinctRecipientsDay': num_distinct_emails_during_office_hours,
        'numdistinctRecipientsNight': num_distinct_emails_during_off_hours,
        'numinternalRecipientsDay': num_distinct_internal_emails_during_office_hours,
        'numinternalRecipientsNight': num_distinct_internal_emails_during_off_hours
    }
    
    return attributes



In [43]:
# Now calculate all the attributes of the email dataset using the function described above
with Pool(total_processes) as pool:
    user_attributes = pool.map(calculate_user_attributes, grouped)

In [44]:
# Create a DataFrame from the user attributes
results = pd.DataFrame(user_attributes)

In [45]:
# Save the results to a CSV file
results.to_csv("test_cleaned_email_dataset.csv" if TEST else "cleaned_email_dataset.csv", index=False)

In [47]:
# Print the cleaned dataset
results

Unnamed: 0,User,numAttachmentDay,numAttachmentNight,numSendDay,numSendNight,numRecievedDay,numRecievedNight,numEmailSentwithAttachDay,numEmailSentwithAttachNight,numEmailRecievedwithAttachDay,numEmailRecievedwithAttachNight,numdistinctRecipientsDay,numdistinctRecipientsNight,numinternalRecipientsDay,numinternalRecipientsNight
0,AAB0162,0,2,2,2,2,3,0,1,0,0,6,5,5,5
1,AAB0398,0,0,0,1,2,5,0,0,0,0,1,4,1,3
2,AAC0610,0,2,0,2,0,1,0,1,0,0,0,4,0,4
3,AAC0668,3,0,2,0,3,0,1,0,0,0,5,0,4,0
4,AAC3270,0,0,0,0,1,0,0,0,0,0,1,0,1,0
...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
3108,ZVB2656,0,0,0,1,1,0,0,0,0,0,1,3,1,3
3109,ZVS1637,0,0,1,0,0,2,0,0,0,0,3,1,3,1
3110,ZWS3625,3,0,1,0,0,0,1,0,0,0,3,0,3,0
3111,ZXM3086,0,0,0,2,0,5,0,0,0,0,0,5,0,1
