### Question C:
- Who are the top ten senders (based on the 'From’ field) who received no emails themselves (based on the 'To’ field)

In [0]:
file_directory = "/FileStore/tables"
file_name = "/emails.csv"
file_path = file_directory + file_name
dbutils.fs.ls(file_path)

Out[1]: [FileInfo(path='dbfs:/FileStore/tables/emails.csv', name='emails.csv', size=1426122219, modificationTime=1739962882000)]

In [0]:
dbutils.fs.head(file_path)

[Truncated to first 65536 bytes]
Out[2]: '"file","message"\n"allen-p/_sent_mail/1.","Message-ID: <18782981.1075855378110.JavaMail.evans@thyme>\nDate: Mon, 14 May 2001 16:39:00 -0700 (PDT)\nFrom: phillip.allen@enron.com\nTo: tim.belden@enron.com\nSubject: \nMime-Version: 1.0\nContent-Type: text/plain; charset=us-ascii\nContent-Transfer-Encoding: 7bit\nX-From: Phillip K Allen\nX-To: Tim Belden <Tim Belden/Enron@EnronXGate>\nX-cc: \nX-bcc: \nX-Folder: \\Phillip_Allen_Jan2002_1\\Allen, Phillip K.\\\'Sent Mail\nX-Origin: Allen-P\nX-FileName: pallen (Non-Privileged).pst\n\nHere is our forecast\n\n "\n"allen-p/_sent_mail/10.","Message-ID: <15464986.1075855378456.JavaMail.evans@thyme>\nDate: Fri, 4 May 2001 13:51:00 -0700 (PDT)\nFrom: phillip.allen@enron.com\nTo: john.lavorato@enron.com\nSubject: Re:\nMime-Version: 1.0\nContent-Type: text/plain; charset=us-ascii\nContent-Transfer-Encoding: 7bit\nX-From: Phillip K Allen\nX-To: John J Lavorato <John J Lavorato/ENRON@enronXgate@ENRON>\nX-cc: \nX-


### README

### Assumptions Made
After reviewing the csv file. It is clear that the csv has two headers, a file and message header. The message header is of concern as it holds all the information including To and From field and other metadata relating to the emails. These were assumptions made: 

- The From field contains the sender email address. 
- The To field contains the recipient email addresses.
- The To field can contain multiple recipient per email. 
- For an email address to be considered, it needs to be a valid email address. 



### How The Code Works

- Firstly, we read the email CSV file using Spark.read function. We pass the multiLine option to it because by default Spark expects a record to fit into one line. But because emails usually span multiple lines, to ensure that Spark knows this and treats the multiple lines as one record, we pass the multiLine option. After reading this file, we use the printSchema method to print the schema, we have a dataframe with a file and message columns. 



In [0]:
# Read the CSV: 
# CSV has headers "file" and "message"
df = spark.read.option("multiLine", True).option("header", True).csv(file_path)

df.printSchema()

root
 |-- file: string (nullable = true)
 |-- message: string (nullable = true)




- The cell below selects the message column from the dataframe and converts it to an RDD (which is usually a row of objects). We then apply a map function to the row, passing a lambda function that extract the message field. The RDD is then converted to one that just contains the contents of the message field as string. 

In [0]:
# Convert DataFrame to an RDD of message: 
rdd_messages = df.select("message").rdd.map(lambda row: row["message"])

rdd_messages.take(5)

Out[4]: ["Message-ID: <18782981.1075855378110.JavaMail.evans@thyme>\nDate: Mon, 14 May 2001 16:39:00 -0700 (PDT)\nFrom: phillip.allen@enron.com\nTo: tim.belden@enron.com\nSubject: \nMime-Version: 1.0\nContent-Type: text/plain; charset=us-ascii\nContent-Transfer-Encoding: 7bit\nX-From: Phillip K Allen\nX-To: Tim Belden <Tim Belden/Enron@EnronXGate>\nX-cc: \nX-bcc: \nX-Folder: \\Phillip_Allen_Jan2002_1\\Allen, Phillip K.\\'Sent Mail\nX-Origin: Allen-P\nX-FileName: pallen (Non-Privileged).pst\n\nHere is our forecast\n\n ",
 "Message-ID: <15464986.1075855378456.JavaMail.evans@thyme>\nDate: Fri, 4 May 2001 13:51:00 -0700 (PDT)\nFrom: phillip.allen@enron.com\nTo: john.lavorato@enron.com\nSubject: Re:\nMime-Version: 1.0\nContent-Type: text/plain; charset=us-ascii\nContent-Transfer-Encoding: 7bit\nX-From: Phillip K Allen\nX-To: John J Lavorato <John J Lavorato/ENRON@enronXgate@ENRON>\nX-cc: \nX-bcc: \nX-Folder: \\Phillip_Allen_Jan2002_1\\Allen, Phillip K.\\'Sent Mail\nX-Origin: Allen-P\nX-File


- The cell below is a python function that accepts the message string, and extracts the sender and recipients and validates that they are actual emails. The function returns the sender email and receipient emails as a tuple. 

In [0]:
from pyspark.sql import SparkSession
import re

# Regex pattern for validating emails: 
email_regex = re.compile(r'^[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}$')

# This block defines a function for extracting emails from the To and From field using regex. This function extracts sender and recipient(s). 
def extract_valid_emails_from_message(message):
   
    # Check if message is string: 
    if not isinstance(message, str):n
        return None

    # Extract the sender only from From:
    from_match = re.search(r'^From:\s*([\w\.-]+@[\w\.-]+\.\w+)', message, re.MULTILINE)
    if not from_match:
        return None
    from_email = from_match.group(1).lower().strip()
    if not email_regex.match(from_email):
        return None

    # Extract recipients from the standard To:
    to_match = re.search(r'^To:\s*(.*)$', message, re.MULTILINE)
    recipients = []
    if to_match:
        to_field = to_match.group(1)
        # Use re.findall to extract all valid email addresses from the "To:" header line
        recipients = [email.lower().strip() 
                      for email in re.findall(r'[\w\.-]+@[\w\.-]+\.\w+', to_field)
                      if email_regex.match(email)
                    ]
    return (from_email, recipients)




- In the cell below, we apply the map function to the RDD. The map function accepts the python function above - `extract_valid_emails_from_message` and filters out items that do not have a value. The `extract_valid_emails_from_message` function returns the sender email address and recipient email addresses as a tuple of this format `(sender, [recipients]) `. This newly transformed RDD is assigned to a new variable. 

In [0]:
# Extract (sender, [recipients]) pairs from each message.
# Since email_pairs is used in both senders_rdd and recipients_rdd, caching avoids re-processing.
email_pairs_rdd = (rdd_messages.map(extract_valid_emails_from_message)
                                  .filter(lambda x: x is not None)
                                  .cache())  # Cache for reuse
email_pairs_rdd.take(3)

Out[6]: [('phillip.allen@enron.com', ['tim.belden@enron.com']),
 ('phillip.allen@enron.com', ['john.lavorato@enron.com']),
 ('phillip.allen@enron.com', ['leah.arsdall@enron.com'])]

- In the cell below, we extract the senders email addresses from the RDD above. We also do a count of all email addresses using the reduceByKey function and assign this newly transformed RDD to a variable. 

In [0]:
# Build an RDD of senders with the total count of emails sent.
senders_rdd = email_pairs_rdd.map(lambda x: (x[0], 1)).reduceByKey(lambda a, b: a + b)

senders_rdd.take(20)

Out[7]: [('phillip.allen@enron.com', 2195),
 ('ina.rangel@enron.com', 404),
 ('1.11913372.-2@multexinvestornetwork.com', 3),
 ('messenger@ecm.bloomberg.com', 26),
 ('aod@newsdata.com', 17),
 ('critical.notice@enron.com', 24),
 ('market-reply@listserv.dowjones.com', 3),
 ('rebecca.cantrell@enron.com', 217),
 ('webmaster@earnings.com', 27),
 ('paul.kaufman@enron.com', 389),
 ('yild@zdemail.zdlists.com', 3),
 ('bounce-news-932653@lists.autoweb.com', 3),
 ('public.relations@enron.com', 412),
 ('stephanie.miller@enron.com', 311),
 ('tracy.arthur@enron.com', 9),
 ('sarah.novosel@enron.com', 565),
 ('bobregon@bga.com', 3),
 ('subscriptions@intelligencepress.com', 42),
 ('tim.heizenrader@enron.com', 89),
 ('rob_tom@freenet.carleton.ca', 3)]


- In the cell below, we get the recipient email addresses from the RDD that returns a tuple of sender and recipients where recipients is a list of email addresses. The flatMap returns each email address as an individual element and the map function transforms each email address to a key value pair, selecting all unique recipients using the distinct function. Assigning a value of 1 to each email address allows Spark to easily group all emails together and count them in the future. 

In [0]:

# Build an RDD of recipients by flattening the list of recipients from each record.
recipients_rdd = email_pairs_rdd.flatMap(lambda x: x[1]) \
                                .map(lambda email: (email, 1)) \
                                .distinct()

recipients_rdd.take(20)

Out[8]: [('tim.belden@enron.com', 1),
 ('john.lavorato@enron.com', 1),
 ('leah.arsdall@enron.com', 1),
 ('randall.gay@enron.com', 1),
 ('greg.piper@enron.com', 1),
 ('david.l.johnson@enron.com', 1),
 ('john.shafer@enron.com', 1),
 ('joyce.teixeira@enron.com', 1),
 ('mark.scott@enron.com', 1),
 ('zimam@enron.com', 1),
 ('buck.buckner@honeywell.com', 1),
 ('stagecoachmama@hotmail.com', 1),
 ('keith.holst@enron.com', 1),
 ('david.delainey@enron.com', 1),
 ('paula.harris@enron.com', 1),
 ('ina.rangel@enron.com', 1),
 ('tim.heizenrader@enron.com', 1),
 ('pallen70@hotmail.com', 1),
 ('bs_stone@yahoo.com', 1),
 ('stouchstone@natsource.com', 1)]

- The cell below removes all senders that appears in the recipient RDD. This ensures that that the `senders_only_rdd` RDD only contains sender email addresses that never received an email.

In [0]:

# Remove any sender that appears in the recipients RDD.
senders_only_rdd = senders_rdd.subtractByKey(recipients_rdd)

senders_only_rdd.take(20)

Out[9]: [('market-reply@listserv.dowjones.com', 3),
 ('bobregon@bga.com', 3),
 ('subscriptions@intelligencepress.com', 42),
 ('ei_editor@ftenergy.com', 62),
 ('billc@greenbuilder.com', 3),
 ('matt@fastpacket.net', 3),
 ('jfreeman@ssm.net', 3),
 ('grensheltr@aol.com', 3),
 ('outlook-migration-team@enron.com', 225),
 ('ei_editor@platts.com', 38),
 ('anchordesk_daily@anchordesk.zdlists.com', 20),
 ('noreply@ccomad3.uu.commissioner.com', 877),
 ('edelivery@salomonsmithbarney.com', 5),
 ('showtimes@amazon.com', 19),
 ('dmallory@ftenergy.com', 7),
 ('important_phone_call@response.etracks.com', 1),
 ('bmg_support@adm.chtah.com', 2),
 ('gifts@info.iwon.com', 5),
 ('unsubscribe-i@networkpromotion.com', 4),
 ('e-mail.center@wsj.com', 20)]

- In the cell below, we apply the takeOrdered function to the `senders_only_rdd`. The takeOrdered function by default returns the smallest items first. We pass a lambda function to it so it sorts the RDD in descending order so we can pick the top 10 senders that are not recipients. 

In [0]:

# Get the top 10 senders (by sent count) among those who never received an email.
top_10_senders = senders_only_rdd.takeOrdered(10, key=lambda x: -x[1])

print(top_10_senders)

[('no.address@enron.com', 5112), ('noreply@ccomad3.uu.commissioner.com', 877), ('owner-nyiso_tech_exchange@lists.thebiz.net', 712), ('owner-eveningmba@haas.berkeley.edu', 508), ('exchange.administrator@enron.com', 455), ('wsmith@wordsmith.org', 454), ('fool@motleyfool.com', 417), ('nytdirect@nytimes.com', 348), ('ecenter@williams.com', 340), ('pmadpr@worldnet.att.net', 317)]


- The cell below converts the RDD back to a dataframe of column sender and count. This step is just to allow us easily visualize the data. 

In [0]:

# Convert the RDD (list of tuples) to a DataFrame with columns "sender" and "count"
df_top_10_senders = spark.createDataFrame(top_10_senders, ["sender", "count"])

# Print the DataFrame
df_top_10_senders.show()


+--------------------+-----+
|              sender|count|
+--------------------+-----+
|no.address@enron.com| 5112|
|noreply@ccomad3.u...|  877|
|owner-nyiso_tech_...|  712|
|owner-eveningmba@...|  508|
|exchange.administ...|  455|
|wsmith@wordsmith.org|  454|
| fool@motleyfool.com|  417|
|nytdirect@nytimes...|  348|
|ecenter@williams.com|  340|
|pmadpr@worldnet.a...|  317|
+--------------------+-----+

