In [1]:
%load_ext autoreload
%autoreload 2

In [23]:
import pyspark.sql.functions as F
import pyspark.sql.types as T
from pyspark.sql import SparkSession

In [3]:
from shared.paths import DatasetPath

DS = DatasetPath('enron-mail-20150507')

In [4]:
import pandas as pd

df = pd.read_csv(DS.raw('emails.csv'))
df.head(5)

Unnamed: 0,file,message
0,allen-p/_sent_mail/1.,Message-ID: <18782981.1075855378110.JavaMail.e...
1,allen-p/_sent_mail/10.,Message-ID: <15464986.1075855378456.JavaMail.e...
2,allen-p/_sent_mail/100.,Message-ID: <24216240.1075855687451.JavaMail.e...
3,allen-p/_sent_mail/1000.,Message-ID: <13505866.1075863688222.JavaMail.e...
4,allen-p/_sent_mail/1001.,Message-ID: <30922949.1075863688243.JavaMail.e...


In [5]:
import pathlib
import pandas as pd
from tqdm import tqdm
import os

if str(DS) == 'test':
    files = []
    path = pathlib.Path(DS.raw())
    for file in tqdm(path.glob('**/*')):
        if not os.path.isfile(file):
            continue

        try:
            with open(file, 'r') as f:
                files.append({
                    'file': str(file),
                    'message': f.read()
                })
        except Exception as e:
            print(e)

    df = pd.DataFrame(files)

In [6]:
# Convert to linux line endings

WINDOWS_LINE_ENDING = '\r\n'
UNIX_LINE_ENDING = '\n'

df['message'] = df.message.map(lambda x: x.replace(WINDOWS_LINE_ENDING, UNIX_LINE_ENDING))

# Enron Dataset Preprocessing
Credit: https://www.kaggle.com/oalvay/enron-emails-complete-preprocessing

In [7]:
import re
import pandas as pd


def info_part(i):
    """split infomation part out"""
    return i.split('\n\n', 1)[0]


def content_part(i):
    """split content part out"""
    return i.split('\n\n', 1)[1]


df['pre_info'] = df.message.map(info_part)
df['content'] = df.message.map(content_part)
df['test_true'] = True

words2split = ['Message-ID: ', 'Date: ', 'From: ', 'To: ', 'Subject: ', 'Cc: ', 'Mime-Version: ', 'Content-Type: ',
               'Content-Transfer-Encoding: ', 'Bcc: ', 'X-From: ', 'X-To: ', 'X-cc: ', 'X-bcc: ', 'X-Folder: ',
               'X-Origin: ',
               'X-FileName: ']
features_naming = [i[:-2] for i in words2split]
split_condition = '|'.join(words2split)


# Some emails' subject confuse the string-spliting function, so I make a little change
def duplicated_info(i):
    return i.replace(' Date: ', ' Date- ').replace(' Subject: ', ' Subject2: ').replace(' To: ',
                                                                                        ' To- ').replace(' (Subject: ',
                                                                                                         ' (Subject- ')


df['pre_info'] = df['pre_info'].map(duplicated_info)


# let's check how many categories are there in these emails
def num_part(i):
    return len(re.split(split_condition, i))


df['num_info'] = df['pre_info'].map(num_part)


# around 20k emails do not have the 'To: ' category, so I add one
def add_to(i):
    return i.replace('\nSubject: ', '\nTo: \nSubject: ')


temp_condition = (df['num_info'] == 17) | (df['num_info'] == 15)
df.loc[temp_condition, 'pre_info'] = df.loc[temp_condition, 'pre_info'].map(add_to)

# similar way to deal with the "Cc:" and "Bcc:" categories
temp_condition = (df['num_info'] == 16) | (df['num_info'] == 15)


def add_bcc(i):
    return i.replace('\nX-From: ', '\nBcc: \nX-From: ')


df.loc[temp_condition, 'pre_info'] = df.loc[temp_condition, 'pre_info'].map(add_bcc)


def add_cc(i):
    return i.replace('\nMime-Version: ', '\nCc: \nMime-Version: ')


df.loc[temp_condition, 'pre_info'] = df.loc[temp_condition, 'pre_info'].map(add_cc)

df['num_info'] = df['pre_info'].map(num_part)
df['num_info'].value_counts()

df_remove = df.loc[df['num_info'] != 18].copy()
df = df.loc[df['num_info'] == 18].copy()

global feature_idx


def info_split(i):
    ## split the i th part out and remove \n for the feature
    return re.split(split_condition, i)[feature_idx + 1][:-1]


def info_split_last(i):
    ## no need to remove \n for last category -- X-FileName
    return re.split(split_condition, i)[feature_idx + 1]


for feature_idx in range(len(words2split)):
    if feature_idx != len(words2split) - 1:
        df[features_naming[feature_idx]] = df['pre_info'].map(info_split)
    else:
        df[features_naming[feature_idx]] = df['pre_info'].map(info_split_last)

df_remove2 = df.loc[df['Content-Transfer-Encoding'] == 'text/plain; charset=us-asci']
df = df.loc[df['Content-Transfer-Encoding'] != 'text/plain; charset=us-asci']

df.loc[df["content"].str.contains("-------------"), "content"]


def split_other_content(i):
    """split other forms of contents out"""
    return i.split('-------------', 1)[0]


df["has_other_content"] = df["content"].str.contains("-------------")
df["if_forwarded"] = df["content"].str.contains("------------- Forwarded")
df['content'] = df.content.map(split_other_content)

df = df.drop(['pre_info', 'test_true', 'num_info'], axis=1).set_index("file")

# Parse what we can from the emails

In [8]:
df['Date'] = pd.to_datetime(df['Date'])
df['folder'] = df.index.map(lambda x: x.split('/')[-2])


def clean_cc(x):
    tokens = [i.strip() for i in x.split(',')]
    return ','.join(tokens)


df['From'] = df['From'].map(clean_cc).str.lower()
df['To'] = df['To'].map(clean_cc).str.lower()
df['Cc'] = df['Cc'].map(clean_cc).str.lower()
df['Bcc'] = df['Bcc'].map(clean_cc).str.lower()

df.drop(columns=['message'], inplace=True)

In [9]:
df

Unnamed: 0_level_0,content,Message-ID,Date,From,To,Subject,Cc,Mime-Version,Content-Type,Content-Transfer-Encoding,...,X-From,X-To,X-cc,X-bcc,X-Folder,X-Origin,X-FileName,has_other_content,if_forwarded,folder
file,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1,Unnamed: 6_level_1,Unnamed: 7_level_1,Unnamed: 8_level_1,Unnamed: 9_level_1,Unnamed: 10_level_1,Unnamed: 11_level_1,Unnamed: 12_level_1,Unnamed: 13_level_1,Unnamed: 14_level_1,Unnamed: 15_level_1,Unnamed: 16_level_1,Unnamed: 17_level_1,Unnamed: 18_level_1,Unnamed: 19_level_1,Unnamed: 20_level_1,Unnamed: 21_level_1
allen-p/_sent_mail/1.,Here is our forecast\n\n,<18782981.1075855378110.JavaMail.evans@thyme>,2001-05-14 16:39:00-07:00,phillip.allen@enron.com,tim.belden@enron.com,,,1.0,text/plain; charset=us-ascii,7bit,...,Phillip K Allen,Tim Belden <Tim Belden/Enron@EnronXGate>,,,"\Phillip_Allen_Jan2002_1\Allen, Phillip K.\'Se...",Allen-P,pallen (Non-Privileged).pst,False,False,_sent_mail
allen-p/_sent_mail/10.,Traveling to have a business meeting takes the...,<15464986.1075855378456.JavaMail.evans@thyme>,2001-05-04 13:51:00-07:00,phillip.allen@enron.com,john.lavorato@enron.com,Re:,,1.0,text/plain; charset=us-ascii,7bit,...,Phillip K Allen,John J Lavorato <John J Lavorato/ENRON@enronXg...,,,"\Phillip_Allen_Jan2002_1\Allen, Phillip K.\'Se...",Allen-P,pallen (Non-Privileged).pst,False,False,_sent_mail
allen-p/_sent_mail/100.,test successful. way to go!!!,<24216240.1075855687451.JavaMail.evans@thyme>,2000-10-18 03:00:00-07:00,phillip.allen@enron.com,leah.arsdall@enron.com,Re: test,,1.0,text/plain; charset=us-ascii,7bit,...,Phillip K Allen,Leah Van Arsdall,,,\Phillip_Allen_Dec2000\Notes Folders\'sent mail,Allen-P,pallen.nsf,False,False,_sent_mail
allen-p/_sent_mail/1000.,"Randy,\n\n Can you send me a schedule of the s...",<13505866.1075863688222.JavaMail.evans@thyme>,2000-10-23 06:13:00-07:00,phillip.allen@enron.com,randall.gay@enron.com,,,1.0,text/plain; charset=us-ascii,7bit,...,Phillip K Allen,Randall L Gay,,,\Phillip_Allen_Dec2000\Notes Folders\'sent mail,Allen-P,pallen.nsf,False,False,_sent_mail
allen-p/_sent_mail/1001.,Let's shoot for Tuesday at 11:45.,<30922949.1075863688243.JavaMail.evans@thyme>,2000-08-31 05:07:00-07:00,phillip.allen@enron.com,greg.piper@enron.com,Re: Hello,,1.0,text/plain; charset=us-ascii,7bit,...,Phillip K Allen,Greg Piper,,,\Phillip_Allen_Dec2000\Notes Folders\'sent mail,Allen-P,pallen.nsf,False,False,_sent_mail
...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
zufferli-j/sent_items/95.,This is a trade with OIL-SPEC-HEDGE-NG (John L...,<26807948.1075842029936.JavaMail.evans@thyme>,2001-11-28 13:30:11-08:00,john.zufferli@enron.com,kori.loibl@enron.com,Trade with John Lavorato,,1.0,text/plain; charset=us-ascii,7bit,...,"Zufferli, John </O=ENRON/OU=NA/CN=RECIPIENTS/C...","Loibl, Kori </O=ENRON/OU=NA/CN=RECIPIENTS/CN=K...",,,"\ExMerge - Zufferli, John\Sent Items",ZUFFERLI-J,john zufferli 6-26-02.PST,False,False,sent_items
zufferli-j/sent_items/96.,Some of my position is with the Alberta Term b...,<25835861.1075842029959.JavaMail.evans@thyme>,2001-11-28 12:47:48-08:00,john.zufferli@enron.com,john.lavorato@enron.com,Gas Hedges,,1.0,text/plain; charset=us-ascii,7bit,...,"Zufferli, John </O=ENRON/OU=NA/CN=RECIPIENTS/C...","Lavorato, John </O=ENRON/OU=NA/CN=RECIPIENTS/C...",,,"\ExMerge - Zufferli, John\Sent Items",ZUFFERLI-J,john zufferli 6-26-02.PST,False,False,sent_items
zufferli-j/sent_items/97.,2\n\n -----Original Message-----\nFrom: \tDouc...,<28979867.1075842029988.JavaMail.evans@thyme>,2001-11-28 07:20:00-08:00,john.zufferli@enron.com,dawn.doucet@enron.com,RE: CONFIDENTIAL,,1.0,text/plain; charset=us-ascii,7bit,...,"Zufferli, John </O=ENRON/OU=NA/CN=RECIPIENTS/C...","Doucet, Dawn </O=ENRON/OU=NA/CN=RECIPIENTS/CN=...",,,"\ExMerge - Zufferli, John\Sent Items",ZUFFERLI-J,john zufferli 6-26-02.PST,False,False,sent_items
zufferli-j/sent_items/98.,Analyst\t\t\t\t\tRank\n\nStephane Brodeur\t\t\...,<22052556.1075842030013.JavaMail.evans@thyme>,2001-11-27 11:52:45-08:00,john.zufferli@enron.com,jeanie.slone@enron.com,Calgary Analyst/Associate,,1.0,text/plain; charset=us-ascii,7bit,...,"Zufferli, John </O=ENRON/OU=NA/CN=RECIPIENTS/C...","Slone, Jeanie </O=ENRON/OU=NA/CN=RECIPIENTS/CN...",,,"\ExMerge - Zufferli, John\Sent Items",ZUFFERLI-J,john zufferli 6-26-02.PST,False,False,sent_items


In [11]:
from shared.paths import TMP_PATH

TMP_PATH.mkdir(exist_ok=True)
df.to_parquet(str(TMP_PATH.joinpath('emails.parquet')))

In [12]:
del df

# Convert back to spark dataframe

In [13]:
spark = (SparkSession.builder
         .appName(f'{DS}_preprocess')
         .config('spark.sql.legacy.timeParserPolicy', 'LEGACY')
         .config("spark.executor.memory", "8g")
         .config("spark.driver.memory", "8g")
         .config("spark.memory.offHeap.enabled", True)
         .config("spark.memory.offHeap.size", "16g")
         .getOrCreate())

Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
22/04/06 11:45:42 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [14]:
df = spark.read.parquet(str(TMP_PATH.joinpath('emails.parquet')))

                                                                                

In [15]:
df = (
    df
        .withColumn('To', F.regexp_replace(F.col('To'), '"', ''))
        .withColumn('From', F.regexp_replace(F.col('From'), '"', ''))
        .withColumn('Cc', F.regexp_replace(F.col('Cc'), '"', ''))
        .withColumn('Bcc', F.regexp_replace(F.col('Bcc'), '"', ''))
)

In [16]:
df.head(5)

                                                                                

[Row(content='Here is our forecast\n\n ', Message-ID='<18782981.1075855378110.JavaMail.evans@thyme>', Date=datetime.datetime(2001, 5, 15, 1, 39), From='phillip.allen@enron.com', To='tim.belden@enron.com', Subject='', Cc='', Mime-Version='1.0', Content-Type='text/plain; charset=us-ascii', Content-Transfer-Encoding='7bit', Bcc='', X-From='Phillip K Allen', X-To='Tim Belden <Tim Belden/Enron@EnronXGate>', X-cc='', X-bcc='', X-Folder="\\Phillip_Allen_Jan2002_1\\Allen, Phillip K.\\'Sent Mail", X-Origin='Allen-P', X-FileName='pallen (Non-Privileged).pst', has_other_content=False, if_forwarded=False, folder='_sent_mail', file='allen-p/_sent_mail/1.'),
 Row(content="Traveling to have a business meeting takes the fun out of the trip.  Especially if you have to prepare a presentation.  I would suggest holding the business plan meetings here then take a trip without any formal business meetings.  I would even try and get some honest opinions on whether a trip is even desired or necessary.\n\nAs f

In [30]:
df_nodes_users = (
    df.select(
        F.explode(F.concat(
            (F.split(F.col('To'), ',')),
            (F.split(F.col('From'), ',')),
            (F.split(F.col('Cc'), ',')),
            (F.split(F.col('Bcc'), ',')),
        )).alias('email')
    )
        .distinct()
        .filter("email != ''")
        .withColumn('is_internal', F.col('email').like('%enron%'))
        .withColumn('id', F.col('email'))
        .dropDuplicates(['id'])
        .withColumnRenamed('id', 'tid')
        .withColumnRenamed('email', 'name')
        .coalesce(1)
        .withColumn('id', F.monotonically_increasing_id())
)
print('User count: ' + str(df_nodes_users.count()))
df_nodes_users.show(5)

                                                                                

User count: 87678




+--------------------+-----------+--------------------+---+
|                name|is_internal|                 tid| id|
+--------------------+-----------+--------------------+---+
| #2.martin@enron.com|       true| #2.martin@enron.com|  0|
|#23.training@enro...|       true|#23.training@enro...|  1|
|#24.training@enro...|       true|#24.training@enro...|  2|
|#25.training@enro...|       true|#25.training@enro...|  3|
|#26.training@enro...|       true|#26.training@enro...|  4|
+--------------------+-----------+--------------------+---+
only showing top 5 rows



                                                                                

In [33]:
df_nodes_emails = (
    df.select(
        F.col('Message-ID').alias('eid'),
        F.col('content'),
        F.col('From').like('%enron%').alias('is_internal'),
        F.split(F.col('From'), ',').alias('From'),
        F.split(F.col('To'), ',').alias('To'),
        F.split(F.col('Cc'), ',').alias('Cc'),
        F.split(F.col('Bcc'), ',').alias('Bcc'),
        F.unix_timestamp(F.col('Date').cast(T.TimestampType())).alias('timestamp_from'),
        F.col('Subject').alias('name'),
        F.col('folder'),
        F.col('if_forwarded'),
    ).distinct()
    .dropDuplicates(['eid'])
    .coalesce(1)
    .withColumn('id', F.monotonically_increasing_id())
)
print('Email count: ' + str(df_nodes_emails.count()))
df_nodes_emails.show(5)

Email count: 517398




+--------------------+--------------------+-----------+--------------------+--------------------+--------------------+--------------------+--------------+--------------------+------------------+------------+---+
|                 eid|             content|is_internal|                From|                  To|                  Cc|                 Bcc|timestamp_from|                name|            folder|if_forwarded| id|
+--------------------+--------------------+-----------+--------------------+--------------------+--------------------+--------------------+--------------+--------------------+------------------+------------+---+
|<10001590.1075860...| We have not yet ...|       true|[louise.kitchen@e...|[bryan.seyfried@e...|[david.forster@en...|[david.forster@en...|     950873640|Re: Updated Enron...|credit_derivatives|        true|  0|
|<1000163.10758596...|Jennifer,\n\nPlea...|       true|[debra.perlingier...|[jennifer.fraser@...|[jeffrey.hodge@en...|[jeffrey.hodge@en...|     97620000

                                                                                

In [42]:
df_edges_from = (
    df_nodes_emails.select(
        F.explode(F.col('From')).alias('From'),
        F.col('id').alias('dst'),
        F.col('timestamp_from')
    ).filter("From != ''")
    .join(
        df_nodes_users.select('tid', F.col('id').alias('src')),
        F.col('From') == F.col('tid'),
        'inner'
    )
    .drop('tid', 'From')
    .distinct()
)
print(df_edges_from.count())
df_edges_from.show(5)

df_edges_to = (
    df_nodes_emails.select(
        F.col('id').alias('src'),
        F.explode(F.col('To')).alias('To'),
        F.col('timestamp_from')
    ).filter("To != ''")
        .join(
            df_nodes_users.select('tid', F.col('id').alias('dst')),
            F.col('To') == F.col('tid'),
            'inner'
        )
        .drop('tid', 'To')
        .distinct()
)
print(df_edges_to.count())
df_edges_to.show(5)

df_edges_cc = (
    df_nodes_emails.select(
        F.col('id').alias('src'),
        F.explode(F.col('Cc')).alias('Cc'),
        F.col('timestamp_from')
    ).filter("Cc != ''")
        .join(
            df_nodes_users.select('tid', F.col('id').alias('dst')),
            F.col('Cc') == F.col('tid'),
            'inner'
        )
        .drop('tid', 'Cc')
        .distinct()
)
print(df_edges_cc.count())
df_edges_cc.show(5)

                                                                                

517398


                                                                                

+---+--------------+-----+
|dst|timestamp_from|  src|
+---+--------------+-----+
|  0|     950873640|26432|
|  1|     976200000|25164|
|  2|     976637280|19077|
|  3|     971169960| 6106|
|  4|    1002840789|83083|
+---+--------------+-----+
only showing top 5 rows



                                                                                

3101175


                                                                                

+---+--------------+-----+
|src|timestamp_from|  dst|
+---+--------------+-----+
|  0|     950873640|79045|
|  1|     976200000|73668|
|  2|     976637280|17238|
|  2|     976637280|85545|
|  2|     976637280|29138|
+---+--------------+-----+
only showing top 5 rows



                                                                                

561305


[Stage 275:>                                                        (0 + 1) / 1]

+---+--------------+-----+
|src|timestamp_from|  dst|
+---+--------------+-----+
|  0|     950873640|25166|
|  0|     950873640|42555|
|  0|     950873640|36933|
|  1|     976200000|38251|
|  5|     985107360|27326|
+---+--------------+-----+
only showing top 5 rows



                                                                                

In [43]:
df_nodes_users.write.parquet(DS.processed_str('node__User'), mode='overwrite')
df_nodes_emails.write.parquet(DS.processed_str('node__Email'), mode='overwrite')

df_edges_from.write.parquet(DS.processed_str('edge__User_SENT_Email'), mode='overwrite')
df_edges_to.write.parquet(DS.processed_str('edge__Email_ADDRESSEDTO_User'), mode='overwrite')
df_edges_cc.write.parquet(DS.processed_str('edge__Email_ADDRESSEDCC_User'), mode='overwrite')

                                                                                