### Loading Data

This code snippet is used to read a CSV file named emails.csv located in the /FileStore/tables/emails directory using Apache Spark. The dbutils.fs.ls() function lists the files in the specified directory, while the spark.read.csv() function reads the CSV file into a DataFrame, taking parameters such as path, header=True, inferSchema=True, quote='"', escape='"', and multiLine=True, which specify the file path, column naming, data type inference, quoting, escaping, and multi-line handling, respectively.

In [0]:
dbutils.fs.ls("/FileStore/tables/emails")

Out[1]: [FileInfo(path='dbfs:/FileStore/tables/emails/emails.csv', name='emails.csv', size=1426122219, modificationTime=1738864351000),
 FileInfo(path='dbfs:/FileStore/tables/emails/parsed_message.csv/', name='parsed_message.csv/', size=0, modificationTime=0),
 FileInfo(path='dbfs:/FileStore/tables/emails/stopwords.txt', name='stopwords.txt', size=622, modificationTime=1738936718000)]

In [0]:
df = spark.read.csv(
    "/FileStore/tables/emails/emails.csv",
    header=True, 
    inferSchema=True, 
    quote='"', 
    escape='"', 
    multiLine=True 
)

### Data Expolration

In [0]:
print("Header:")
print(df.columns)

Header:
['file', 'message']


In [0]:
print("\nFirst few email messages:")
df.show(5, truncate=False)


First few email messages:
+------------------------+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------

### Parsing Emails

This code defines a schema for a Spark DataFrame to store email data and a Python function parse_email to extract relevant information from a raw email message.
The schema includes columns for various email headers and the email body. The parse_email function uses regular expressions to extract the values of these headers and the body from the raw email message, and returns a dictionary containing the extracted data.

In [0]:
from pyspark.sql.functions import udf
from pyspark.sql.types import StructType, StructField, StringType

schema = StructType([
    StructField('Message-ID', StringType(), True),
    StructField('Date', StringType(), True),
    StructField('From', StringType(), True),
    StructField('To', StringType(), True),
    StructField('Subject', StringType(), True),
    StructField('Mime-Version', StringType(), True),
    StructField('Content-Type', StringType(), True),
    StructField('Content-Transfer-Encoding', StringType(), True),
    StructField('X-From', StringType(), True),
    StructField('X-To', StringType(), True),
    StructField('X-cc', StringType(), True),
    StructField('X-bcc', StringType(), True),
    StructField('X-Folder', StringType(), True),
    StructField('X-Origin', StringType(), True),
    StructField('X-FileName', StringType(), True),
    StructField('Body', StringType(), True)
])


In [0]:
import re

def parse_email(message):
    
    parsed_message = {}
    
    match = re.search(r'Message-ID: <(.+)>', message)
    if match:
        parsed_message['Message-ID'] = match.group(1)
    
    match = re.search(r'Date: (.+)', message)
    if match:
        parsed_message['Date'] = match.group(1)
    
    match = re.search(r'From: (.+)', message)
    if match:
        parsed_message['From'] = match.group(1)
    
    match = re.search(r'To: (.+)', message)
    if match:
        parsed_message['To'] = match.group(1)
    
    match = re.search(r'Subject: (.+)', message)
    if match:
        parsed_message['Subject'] = match.group(1)
    
    match = re.search(r'Mime-Version: (.+)', message)
    if match:
        parsed_message['Mime-Version'] = match.group(1)
    
    match = re.search(r'Content-Type: (.+)', message)
    if match:
        parsed_message['Content-Type'] = match.group(1)
    
    match = re.search(r'Content-Transfer-Encoding: (.+)', message)
    if match:
        parsed_message['Content-Transfer-Encoding'] = match.group(1)
    
    match = re.search(r'X-From: (.+)', message)
    if match:
        parsed_message['X-From'] = match.group(1)
    
    match = re.search(r'X-To: (.+)', message)
    if match:
        parsed_message['X-To'] = match.group(1)
    
    match = re.search(r'X-cc: (.+)', message)
    if match:
        parsed_message['X-cc'] = match.group(1)
    
    match = re.search(r'X-bcc: (.+)', message)
    if match:
        parsed_message['X-bcc'] = match.group(1)
    
    match = re.search(r'X-Folder: (.+)', message)
    if match:
        parsed_message['X-Folder'] = match.group(1)
    
    match = re.search(r'X-Origin: (.+)', message)
    if match:
        parsed_message['X-Origin'] = match.group(1)
    
    match = re.search(r'X-FileName: (.+)', message)
    if match:
        parsed_message['X-FileName'] = match.group(1)
    
    match = re.search(r'\n\n(.+)', message, re.DOTALL)
    if match:
        parsed_message['Body'] = match.group(1)
    
    return parsed_message


### Restructuring Data

This code defines a Spark UDF (User-Defined Function) parse_email_udf that applies the parse_email function to each row in the message column of a DataFrame.
It then uses this UDF to create a new column parsed_message in the DataFrame df, which contains the parsed email data. The code then selects specific columns from the resulting DataFrame, renames them to remove the parsed_message. prefix, and finally displays the resulting DataFrame using show() and prints its column names.

In [0]:

parse_email_udf = udf(parse_email, schema)

df_with_parsed_message = df.withColumn('parsed_message', parse_email_udf('message'))

df_with_parsed_message = df_with_parsed_message.select(
    'file',
    'parsed_message.Message-ID',
    'parsed_message.Date',
    'parsed_message.From',
    'parsed_message.To',
    'parsed_message.Subject',
    'parsed_message.Mime-Version',
    'parsed_message.Content-Type',
    'parsed_message.Content-Transfer-Encoding',
    'parsed_message.X-From',
    'parsed_message.X-To',
    'parsed_message.X-cc',
    'parsed_message.X-bcc',
    'parsed_message.X-Folder',
    'parsed_message.X-Origin',
    'parsed_message.X-FileName',
    'parsed_message.Body'
)

df_with_parsed_message = df_with_parsed_message.withColumnRenamed('parsed_message.Message-ID', 'Message-ID') \
    .withColumnRenamed('parsed_message.Date', 'Date') \
    .withColumnRenamed('parsed_message.From', 'From') \
    .withColumnRenamed('parsed_message.To', 'To') \
    .withColumnRenamed('parsed_message.Subject', 'Subject') \
    .withColumnRenamed('parsed_message.Mime-Version', 'Mime-Version') \
    .withColumnRenamed('parsed_message.Content-Type', 'Content-Type') \
    .withColumnRenamed('parsed_message.Content-Transfer-Encoding', 'Content-Transfer-Encoding') \
    .withColumnRenamed('parsed_message.X-From', 'X-From') \
    .withColumnRenamed('parsed_message.X-To', 'X-To') \
    .withColumnRenamed('parsed_message.X-cc', 'X-cc') \
    .withColumnRenamed('parsed_message.X-bcc', 'X-bcc') \
    .withColumnRenamed('parsed_message.X-Folder', 'X-Folder') \
    .withColumnRenamed('parsed_message.X-Origin', 'X-Origin') \
    .withColumnRenamed('parsed_message.X-FileName', 'X-FileName') \
    .withColumnRenamed('parsed_message.Body', 'Body')


In [0]:
df_with_parsed_message.show(truncate=False)

+------------------------+-------------------------------------------+-------------------------------------+-----------------------+------------------------------------------------+-----------------------------------------------------------+------------+----------------------------+-------------------------+---------------+--------------------------------------------------------+----+-----+-----------------------------------------------------+--------+---------------------------+------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------

In [0]:
df_with_parsed_message.show(2)

+--------------------+--------------------+--------------------+--------------------+--------------------+-------+------------+--------------------+-------------------------+---------------+--------------------+----+-----+--------------------+--------+--------------------+--------------------+
|                file|          Message-ID|                Date|                From|                  To|Subject|Mime-Version|        Content-Type|Content-Transfer-Encoding|         X-From|                X-To|X-cc|X-bcc|            X-Folder|X-Origin|          X-FileName|                Body|
+--------------------+--------------------+--------------------+--------------------+--------------------+-------+------------+--------------------+-------------------------+---------------+--------------------+----+-----+--------------------+--------+--------------------+--------------------+
|allen-p/_sent_mai...|18782981.10758553...|Mon, 14 May 2001 ...|phillip.allen@enr...|tim.belden@enron.com|   null| 

In [0]:
df_with_parsed_message.columns

Out[10]: ['file',
 'Message-ID',
 'Date',
 'From',
 'To',
 'Subject',
 'Mime-Version',
 'Content-Type',
 'Content-Transfer-Encoding',
 'X-From',
 'X-To',
 'X-cc',
 'X-bcc',
 'X-Folder',
 'X-Origin',
 'X-FileName',
 'Body']

### Saving Data

This line of code writes the df_with_parsed_message DataFrame to a CSV file located at /FileSore/tables/emails/parsed_message.csv. The parameters used are:
- header=True: includes the column names as the first row in the CSV file
- mode="overwrite": overwrites any existing file at the specified location

In [0]:
df_with_parsed_message.write.csv("/FileStore/tables/emails/parsed_message.csv", header=True, mode="overwrite")

### Questions

@00804635 Mr Sachin . <br> **Easy** <br> 
1. What is the mean number of emails sent by each sender in the dataset, i.e., the number of emails sent divided by the number of unique senders in the dataset (based on the ‘From’ field)?

To calculates key statistics about email senders from the df_with_parsed_statistics DataFrame.

- Count the total number of emails (total_emails) by selecting the "From" column and applying the count() method.
- Count the number of unique email senders (unique_senders) by selecting the "From" column and applying the distinct().count() method.
- Calculate the mean number of emails sent per sender (mean_emails_per_sender) by dividing the total number of emails by the number of unique senders.

In [0]:
from pyspark.sql import functions as F

total_emails = df_with_parsed_message.select("From").count()

unique_senders = df_with_parsed_message.select("From").distinct().count()

mean_emails_per_sender = total_emails / unique_senders

In [0]:
print(f"Mean number of emails per sender: {mean_emails_per_sender}")

Mean number of emails per sender: 25.45262691853601


@00823251 Mr Mohammad Reza Haghighatju <br> **Medium** <br> 
1. Who are the top 10 senders by number of emails sent (based on the ‘From’ field), along with the number of emails they each sent?

For identifing the top 10 email senders from the df_with_parsed_message DataFrame.

- Filters data to include only valid email addresses in the "From" column.
- Groups filtered data by the "From" column and counts the number of emails sent by each sender.
- Sorts the resulting DataFrame in descending order by email count and limits to the top 10 senders.

In [0]:
from pyspark.sql.functions import col, count

df_filtered = df_with_parsed_message.filter(col("From").isNotNull() & col("From").like("%@%.%"))

top_senders = df_filtered.groupBy("From").agg(count("*").alias("email_count"))

top_senders = top_senders.orderBy(col("email_count").desc()).limit(10)

top_senders.show(truncate=False)

+-----------------------------+-----------+
|From                         |email_count|
+-----------------------------+-----------+
|kay.mann@enron.com           |16735      |
|vince.kaminski@enron.com     |14368      |
|jeff.dasovich@enron.com      |11411      |
|pete.davis@enron.com         |9149       |
|chris.germany@enron.com      |8801       |
|sara.shackleton@enron.com    |8777       |
|enron.announcements@enron.com|8587       |
|tana.jones@enron.com         |8490       |
|steven.kean@enron.com        |6759       |
|kate.symes@enron.com         |5438       |
+-----------------------------+-----------+



@00783884 Mr Nithin Puthan Veettil Kongadan <br> **Medium** <br>
3. What is the number of emails sent internally within Enron (based on both 'From' and 'To' fields containing @enron.com)?

Here we have to extracts the domain from the "From" and "To" email addresses in the df_with_parsed_message DataFrame, and then counts the number of emails sent from and to the specific domain 'enron.com'.

- Extracts the domain from the "From" and "To" email addresses using the split function.
- Filters the data to count the emails sent from and to the specified domain 'enron.com'.
- Prints the number of emails sent from, to, and the total exchanged with the 'enron.com' domain.

In [0]:
from pyspark.sql.functions import col, split

df_with_parsed_message = df_with_parsed_message.withColumn('Domain', split(col('From'), '@')[1])

domain = 'enron.com'
emails_from_domain = df_with_parsed_message.filter(col('Domain') == domain).count()

print(f"Number of emails sent from {domain}: {emails_from_domain}")

Number of emails sent from enron.com: 426229


In [0]:
from pyspark.sql.functions import col, split

df_with_parsed_message = df_with_parsed_message.withColumn('Domain', split(col('To'), '@')[1])

domain = 'enron.com'
emails_to_domain = df_with_parsed_message.filter(col('Domain') == domain).count()

print(f"Number of emails sent To {domain}: {emails_to_domain}")

Number of emails sent To enron.com: 275955


In [0]:
print(f"Total Emails Exchange with {domain} : {emails_from_domain + emails_to_domain}")

Total Emails emails sent "To" and recived "from" with enron.com: 551910


@00792908 Mr Muhammad Hassan Mukhtar <br> **Difficult** <br>
1. What are the word frequencies of the top 100 words in the subject lines (using the separately provided stopwords.txt document to remove stop words)?

Now we analyzes the subject lines of emails in the df_with_parsed_message DataFrame, ignoring stopwords and non-alphabetic characters, and then identifies the top 100 most frequently occurring words. It uses Spark SQL functions to:

- Filter out rows with null subject lines
- Convert subject lines to lowercase and remove non-alphabetic characters
- Split subject lines into individual words
- Filter out empty strings and stopwords
- Count the occurrences of each word
- Sort the words by count in descending order and limit to the top 100

In [0]:
stopwords_path = "/FileStore/tables/emails/stopwords.txt"
stopwords_list = set(spark.read.text(stopwords_path).rdd.map(lambda r: r[0]).collect())

In [0]:
from pyspark.sql.functions import col, explode, split, lower, regexp_replace, count

df_words = df_with_parsed_message 
    .filter(col("Subject").isNotNull()) \
    .select(explode(split(lower(regexp_replace(col("Subject"), "[^a-zA-Z]", " ")), " ")).alias("word"))

df_filtered_words = df_words \
    .filter((col("word") != "") & (~col("word").isin(stopwords_list)))

df_word_counts = df_filtered_words.groupBy("word").agg(count("*").alias("count"))

df_top_words = df_word_counts.orderBy(col("count").desc()).limit(100)

df_top_words.show(100, truncate=False)

+------------+------+
|word        |count |
+------------+------+
|re          |158440|
|fw          |39187 |
|enron       |23567 |
|meeting     |16140 |
|new         |12013 |
|gas         |11168 |
|agreement   |10769 |
|report      |10766 |
|energy      |10197 |
|power       |10046 |
|update      |9553  |
|hourahead   |9069  |
|date        |8334  |
|hour        |8171  |
|request     |8123  |
|start       |7714  |
|e           |7075  |
|com         |6900  |
|conference  |6422  |
|call        |6162  |
|codesite    |5963  |
|credit      |5925  |
|deal        |5660  |
|fwd         |5632  |
|california  |5565  |
|letter      |5527  |
|draft       |5454  |
|eol         |5151  |
|schedule    |4987  |
|list        |4937  |
|trading     |4848  |
|revised     |4710  |
|daily       |4555  |
|information |4272  |
|access      |4202  |
|ferc        |3971  |
|contract    |3819  |
|weekly      |3730  |
|project     |3671  |
|news        |3616  |
|isda        |3556  |
|today       |3551  |
|master   