### TRƯỜNG ĐẠI HỌC CÔNG NGHIỆP
### THÀNH PHỐ HỒ CHÍ MINH
 
### KHOA Công nghệ Thông tin   
## ĐỀ THI GIỮA KỲ
### Môn thi : Nhập môn dữ liệu lớn 
### Lớp/Lớp học phần:  DHKHDL17A
* Thời gian làm bài: 75 phút (Không kể thời gian phát đề)
* Thí sinh được sử dụng tài liệu và tra cứu tại trang wed
  - https://spark.apache.org/
  - https://stackoverflow.com/
  - https://learn.microsoft.com/en-us/sql/t-sql
* Thí sinh làm bài và lưu lại với định dạng mssv_hovaten_gk.ipynb. Ví dụ bạn có mã số sinh viên là: 12131411, họ và tên: Nguyễn Văn A, thì nộp bài với tên: **12131411_NguyenVanA_gk.ipynb**
* Thí sinh sử dụng dữ liệu *email.csv*, điều chỉnh biến 'dataPath' ở cell đầu tiên lại cho đúng với đường dẫn đến file data.
* Hoàn thành tất cả các vị trí có chữ **# YOUR CODE HERE** để hoàn thành yêu cầu của mỗi hàm.
#### LƯU Ý: KHÔNG THAY ĐỔI NHỮNG CHỖ KHÁC

In [1]:
import os
from pyspark.sql import SparkSession
from pyspark.sql.dataframe import DataFrame
from pyspark.rdd import RDD
from pyspark.sql.types import Row

sc = SparkSession.builder \
    .appName("Email Data Processing") \
    .getOrCreate()

dataPath = "./data/email.csv"

In [2]:
dataPath = os.environ.get("DATA_MIDTERM") or dataPath

In [4]:
#0.5
def loadAndProcessCsv(filePath: str, spark: SparkSession) -> DataFrame:
    '''
    This function loads a CSV file into a Spark DataFrame, caches it, 
    drops rows with null values, and prints the schema. (using option when read to keep format - header, columns)
    
    Args:
        file_path (str): Path to the CSV file.
        spark (SparkSession): Active Spark session.
        
    Returns:
        DataFrame: Processed DataFrame.
    '''

    data = None
    # YOUR CODE HERE
    data = spark.read \
    .format("csv") \
    .option("header","true") \
    .option("inferSchema","true") \
    .option("samplingRatio",.01) \
    .option("multiLine","true") \
    .option("escape", '"') \
    .option("quote", '"') \
    .option("sep", ",") \
    .load(filePath)

    data = data.dropna()
    
    data.cache()
    
    data.printSchema()
    # raise NotImplementedError()
    return data

In [5]:
data = loadAndProcessCsv(dataPath,sc)

root
 |-- file: string (nullable = true)
 |-- message: string (nullable = true)



In [6]:
data.columns

['file', 'message']

In [10]:
# 0.5đ
def createRDD(data: DataFrame)->RDD[Row]:
    '''
    This function converts a Spark DataFrame into an RDD of Rows.
    
    Args:
        data (DataFrame): Input DataFrame containing data loaded by Spark.
        
    Returns:
        RDD[Row]: An RDD containing all Rows from the input DataFrame.
    '''
    outRDD = None

    # YOUR CODE HERE
    # raise NotImplementedError()
    outRDD = data.rdd
    
    return outRDD

In [11]:
emailRDD = createRDD(data)
assert isinstance(emailRDD, RDD), "createRDD() does not return the correct data type (RDD)"
assert isinstance(emailRDD.first(),Row), "createRDD() contains elements that are not of type Row"

In [13]:
import email
from typing import List, Optional
from pyspark.sql.types import Row

def splitEmailAddresses(emailString: str) -> List[Optional[str]]:
    '''
    The function splits a comma-separated string of email addresses into a unique list.
    
    Args:
        emailString: A string containing email addresses separated by commas.
        
    Returns:
        A list of unique email addresses.
    '''
    if emailString:
        addresses = emailString.split(',')
        uniqueAddresses = list(frozenset(map(lambda x: x.strip(), addresses)))
        return uniqueAddresses
    return []

def extractEmailDetailsFromRawText(rawEmail: str) -> Row:
    '''
    The function extracts relevant details from a raw email message string.
    
    Args:
        rawEmail: A string representing the raw email message.
        
    Returns:
        A Row object containing the extracted email details.
    '''
    emailMessage = email.message_from_string(rawEmail)
    emailContentParts = []
    for part in emailMessage.walk():
        if part.get_content_type() == 'text/plain':
            emailContentParts.append(part.get_payload())

    emailContent = ''.join(emailContentParts)

    fromAddresses = splitEmailAddresses(emailMessage.get("From"))
    toAddresses = splitEmailAddresses(emailMessage.get("To"))
    ccEmail = splitEmailAddresses(emailMessage.get("Cc"))
    return Row(
        Date=emailMessage.get("Date"),
        From=fromAddresses, 
        To=toAddresses, 
        Subject=emailMessage.get("Subject"), 
        CC=ccEmail, 
        Content=emailContent
    )

# Extract structured email details from the first email message
firstEmailData = data.first()
structuredEmail = extractEmailDetailsFromRawText(firstEmailData.message)
structuredEmail


Row(Date='Thu, 1 Feb 2001 08:00:00 -0800 (PST)', From=['tana.jones@enron.com'], To=['diane.ellstrom@enron.com', 'janie.aguayo@enron.com', 'susan.bailey@enron.com', 'cheryl.nelson@enron.com', 'rudwell.johnson@enron.com', 'anthony.campos@enron.com', 'dale.neuner@enron.com', 'francisco.leite@enron.com', 'robert.bruce@enron.com', 'laurel.adams@enron.com', 'brent.hendry@enron.com', 'carrie.southard@enron.com', 'kim.theriot@enron.com', 'nidia.mendoza@enron.com', 'tanya.rohauer@enron.com', 'samantha.boyd@enron.com', 'cheryl.johnson@enron.com', 'bob.bowen@enron.com', 'edward.sacks@enron.com', 'paul.radous@enron.com', 'mark.taylor@enron.com', 'tom.moran@enron.com', 'wendy.conwell@enron.com', 'marilyn.colbert@enron.com', 'karen.lambert@enron.com', 'diane.anderson@enron.com', 'scott.tackett@enron.com', 'tracy.ngo@enron.com', 'larry.hunter@enron.com', 'brant.reves@enron.com', 'kevin.meredith@enron.com', 'melissa.murphy@enron.com', 'carol.clair@enron.com', 'jason.moore@enron.com', 'veronica.espinoz

In [14]:
#0.5đ
def createStructuredEmailRDD(emailRDD: RDD[Row]) -> RDD[Row]:
    '''
    The function takes an RDD of email messages and converts it into a new RDD containing structured email details.
    
    Args:
        emailRDD: An RDD where each Row contains an email message in raw text format.
        
    Returns:
        A new RDD where each element is a Row with structured email details such as Date, From, To, Subject, CC, and Content.
    '''
    structuredEmailRDD = None
    # YOUR CODE HERE
    # raise NotImplementedError()
    structuredEmailRDD = emailRDD.map(lambda x: extractEmailDetailsFromRawText(x.message))
    return structuredEmailRDD

In [15]:
structuredEmailRDD = createStructuredEmailRDD(emailRDD)
assert isinstance(structuredEmailRDD, RDD), "createStructuredEmailRDD() doesn't return an RDD"
assert isinstance(structuredEmailRDD.first(), Row), "createStructuredEmailRDD() elements are not of type Row"

In [16]:
#1.
def countNumberEmail(structuredEmailRDD: RDD[Row], k: int)->int:
    '''
    The function counts the number of emails with more than `k` email addresses in the CC field.
    
    Args:
    - structuredEmailRDD: RDD of Row objects, each containing an email's structured data.
    - k: The threshold for the number of emails in the CC field.
    
    Returns:
    - int: The count of emails with more than `k` email addresses in the CC field.
    '''
    count = -1
    # YOUR CODE HERE
    # raise NotImplementedError()
    count = structuredEmailRDD.filter(lambda row : len(row.CC) > k).count()

    return count

In [18]:
countNumberEmail(structuredEmailRDD,40)

25

In [22]:
# 1đ
def countUniqueEmailDomains(structuredEmailRDD: RDD[Row], k) -> int:
    '''
    This function counts the number of unique email domains in the "From" field using `map` and `reduce`.
    
    Args:
    - structuredEmailRDD: An RDD containing Row objects, each representing an email's structured data.
    
    Returns:
    - dict: A dictionary showing the count of emails from each unique domain in the "From" field.
      Example:
        If k = 3, the result might look like:
        {
          'enron.com': 16452,
          'aol.com': 122,
          'hotmail.com': 101
        }
    '''

    results = {}
    # YOUR CODE HERE
    # raise NotImplementedError()
    FromRDD = structuredEmailRDD.flatMap(lambda row : row.From) 

    # list_cc = sc.sparkContext.parallelize(list_cc)
    FromRDD = FromRDD.map(lambda x: (x.split("@")[-1],1))
    domainCountRDD = FromRDD.reduceByKey(lambda a,b: a+b)
    
    domainCountRDD = domainCountRDD.sortBy(lambda x : x[1],ascending=False)
    results = domainCountRDD.collectAsMap()
    # results = dict(sorted(results.items(), key=lambda item: item[1], reverse=True)[:k])
    results = dict(list(results.items())[:k])
    return results


In [23]:
countUniqueEmailDomains(structuredEmailRDD, 10)

{'enron.com': 16452,
 'aol.com': 122,
 'hotmail.com': 101,
 'txu.com': 76,
 'enron.com>': 66,
 'mailman.enron.com': 54,
 'nymex.com': 53,
 'haas.berkeley.edu': 48,
 'yahoo.com': 44,
 'nyiso.com': 41}

In [36]:
# 0.5đ
def countEmailsByRecipient(structuredEmailRDD: RDD[Row], recipient: str) -> int:
    '''
    The function filters the dataset to include only emails sent to a specific recipient.

    Args:
    - structuredEmailRDD: RDD of Row objects, each containing an email's structured data.
    - recipient: The email address of the recipient to filter by.

    Returns:
    - numEmail: number email sent to recipient
    '''
    numEmails =  -1
    # YOUR CODE HERE
    # raise NotImplementedError()
    # flatEmail = structuredEmailRDD.flatMap(lambda x: x.To)
    numEmails = structuredEmailRDD.filter(lambda x: recipient in x['To']).count()
    # print(flatEmail.first())
    
    return numEmails

In [38]:
countEmailsByRecipient(structuredEmailRDD, "tana.jones@enron.com")

454

In [47]:
# 1.5d
def getTopKFrequentWordsInContentBySubject(structuredEmailRDD: RDD[Row], keyword: str, k: int) -> dict:
    '''
    This function filters the dataset to include only emails with a specific keyword in the subject line,
    and then returns the top k most frequent words found in the content of those filtered emails.

    Args:
    - structuredEmailRDD: RDD of Row objects, where each Row represents an email with structured data, such as subject and content.
    - keyword: The keyword to search for in the subject line (case-insensitive).
    - k: The number of most frequent words to return.

    Returns:
    - topKWordsDict: A dictionary containing the top k most frequent words found in the content of filtered emails.
                     The keys are the words, and the values are their frequencies, representing how often they appear in the content of the filtered emails.
                     example:
                    {'to': 12,
                     'the': 9,
                     'your': 8,
                     'a': 5,
                     'is': 4,
                     'survey': 4,
                     'and': 4,
                     'you': 4,
                     'of': 3,
                     'very': 3}
    '''
    topKWordsDict = {}
    # YOUR CODE HERE
    # raise NotImplementedError()
    filterSubjectRDD = structuredEmailRDD.filter(lambda row : keyword.strip().lower() in row['Subject'].strip().lower())
    ContentRDD = filterSubjectRDD.flatMap(lambda row : row.Content.strip().lower().split())
    wordPairs = ContentRDD.map(lambda x : (x,1))
    wordcounts = wordPairs.reduceByKey(lambda a,b : a+b)
    dictWordCounts = wordcounts.sortBy(lambda x: x[1], ascending=False).collectAsMap()
    topKWordsDict = dict(list(dictWordCounts.items())[:k])
    
    return topKWordsDict

In [48]:
getTopKFrequentWordsInContentBySubject(structuredEmailRDD, "image", 10)

{'to': 12,
 'the': 9,
 'your': 8,
 'a': 5,
 'is': 4,
 'survey': 4,
 'and': 4,
 'you': 4,
 'of': 3,
 'very': 3}

In [84]:
# SQL Query
from pyspark.sql import SQLContext
from pyspark.sql.functions import col, size, to_date, year, month
sc.conf.set("spark.sql.legacy.timeParserPolicy", "LEGACY")
# Convert RDD to DataFrame
dfEmais = structuredEmailRDD.toDF()
dfEmais = dfEmais.withColumn('DateTime', to_date(col('Date'), "EEE, d MMM yyyy HH:mm:ss Z"))
dfEmais = dfEmais.withColumn('Num_To', size(col('To')))
dfEmais = dfEmais.withColumn('Num_CC', size(col('CC')))

In [85]:
sqlContext = SQLContext(sc)
tableName = "Emails"
dfEmais.createOrReplaceTempView(tableName)



In [86]:
# 0.5đ
def getTopKRowsBySQL(sqlContext: SQLContext, tableName: str, k: int) -> DataFrame:
    '''
    This function queries the first k rows from a given table using SQLContext.

    Args:
    - sqlContext: An SQLContext object that provides the environment to run SQL queries on structured data.
    - tableName: The name of the table from which the rows will be selected.
    - k: The number of rows to return. It must be a positive integer.

    Returns:
    - result: A DataFrame containing the first k rows of data from the specified table.
                If the table contains fewer than k rows, the DataFrame will contain all available rows.
    '''
    # YOUR CODE HERE
    # raise NotImplementedError()
    if k <= 0:
        raise ValueError("k must be a positive integer")
    result = sqlContext.sql(f"select * from {tableName} LIMIT {k}")
    return result

def getTopKRowsByDFOperations(dataFrame: DataFrame, k: int) -> DataFrame:
    '''
    This function queries the first k rows from a given table using SQLContext.

    Args:
    - dataFrame: An DataFrame data
    - tableName: The name of the table from which the rows will be selected.
    - k: The number of rows to return. It must be a positive integer.

    Returns:
    - resultDF: A DataFrame containing the first k rows of data from the specified table.
                If the table contains fewer than k rows, the DataFrame will contain all available rows.
    '''
    # YOUR CODE HERE
    # raise NotImplementedError()
    if k <= 0:
        raise ValueError("k must be a positive integer")
    result = dataFrame.limit(k)
    return result

In [87]:
getTopKRowsBySQL(sqlContext,tableName,1).show()

+--------------------+--------------------+--------------------+----------------+--------------------+--------------------+----------+------+------+
|                Date|                From|                  To|         Subject|                  CC|             Content|  DateTime|Num_To|Num_CC|
+--------------------+--------------------+--------------------+----------------+--------------------+--------------------+----------+------+------+
|Thu, 1 Feb 2001 0...|[tana.jones@enron...|[carol.clair@enro...|Deutsche Bank AG|[larry.gagliardi@...|We have received ...|2001-02-01|    64|     3|
+--------------------+--------------------+--------------------+----------------+--------------------+--------------------+----------+------+------+



In [88]:
getTopKRowsByDFOperations(dfEmais,1).show()

+--------------------+--------------------+--------------------+----------------+--------------------+--------------------+----------+------+------+
|                Date|                From|                  To|         Subject|                  CC|             Content|  DateTime|Num_To|Num_CC|
+--------------------+--------------------+--------------------+----------------+--------------------+--------------------+----------+------+------+
|Thu, 1 Feb 2001 0...|[tana.jones@enron...|[carol.clair@enro...|Deutsche Bank AG|[larry.gagliardi@...|We have received ...|2001-02-01|    64|     3|
+--------------------+--------------------+--------------------+----------------+--------------------+--------------------+----------+------+------+



In [90]:
# 1đ, 6.5đ
def countEmailsWithCCGreaterThanKBySQL(sqlContext: SQLContext, tableName: str, k: int) -> int:
    '''
    This function uses SQL to count the number of emails with more than k email addresses in the CC field.

    Args:
    - sqlContext: An SQLContext object that provides the environment to run SQL queries on structured data.
    - tableName: The name of the table that contains the email data.
    - k: The threshold number of email addresses in the CC field.

    Returns:
    - count: An integer representing the number of emails where the CC field has more than k email addresses.
    '''
    # YOUR CODE HERE
    # raise NotImplementedError()
    query = f"SELECT COUNT(*) FROM {tableName} WHERE size(CC) > {k}"
    count = sqlContext.sql(query).collect()[0][0]
    return count
    
def countEmailsWithCCGreaterThanKByDFOperations(dataFrame: DataFrame, k: int) -> int:
    '''
    This function counts the number of emails with more than k email addresses in the CC field using DataFrame operations.

    Args:
    - dataFrame: A DataFrame containing the email data.
    - k: The threshold number of email addresses in the CC field.

    Returns:
    - count: An integer representing the number of emails where the CC field has more than k email addresses.
    '''
    # YOUR CODE HERE
    # raise NotImplementedError()
    result = dataFrame.filter(size('CC') > k).count()
    return result


In [91]:
countEmailsWithCCGreaterThanKBySQL(sqlContext,tableName,50)

15

In [92]:
countEmailsWithCCGreaterThanKByDFOperations(dfEmais,50)

15

In [112]:
#1đ, 7.5
def getDateRangeBySQL(sqlContext: SQLContext, tableName: str) -> tuple:
    '''
    This function retrieves the start and end dates from the email dataset using SQL queries.

    Args:
    - sqlContext: An SQLContext object that provides the environment to run SQL queries on structured data.
    - tableName: The name of the table that contains the email data.

    Returns:
    - (startDate, endDate): A tuple containing two elements: the earliest (start) date and the latest (end) date.
    '''
    # YOUR CODE HERE
    # startDate_df = sqlContext.sql(f"SELECT MIN(date) as minDate FROM {tableName}")
    # startDate = startDate_df.collect()[0]['minDate']
    
    # # Run SQL query to get the end date
    # endDate_df = sqlContext.sql(f"SELECT MAX(date) as maxDate FROM {tableName}")
    # endDate = endDate_df.collect()[0]['maxDate']
    startDate, endDate = sqlContext.sql(f"SELECT MIN(DateTime), MAX(DateTime) FROM {tableName}").first()
    
    return startDate, endDate

def getDateRangeByDFOperations(dataFrame: DataFrame) -> tuple:
    '''
    This function retrieves the start and end dates from the email dataset using DataFrame operations.

    Args:
    - dataFrame: A DataFrame containing the email data.

    Returns:
    - (startDate, endDate): A tuple containing two elements: the earliest (start) date and the latest (end) date.
    '''
     # YOUR CODE HERE
     # raise NotImplementedError()
    startDate = dataFrame.selectExpr("min(DateTime)").collect()[0][0]
    endDate = dataFrame.selectExpr("max(DateTime)").collect()[0][0]
    return startDate, endDate


In [113]:
getDateRangeBySQL(sqlContext,tableName)

(datetime.date(1, 8, 1), datetime.date(2012, 11, 28))

In [114]:
getDateRangeByDFOperations(dfEmais)

(datetime.date(1, 8, 1), datetime.date(2012, 11, 28))

In [117]:
# 1đ, 8.5
def countEmailsInYearBySQL(sqlContext: SQLContext, tableName: str, year: int) -> int:
    '''
    This function calculates the number of emails sent in a given year by performing a group by operation on the DateTime column using SQL.

    Args:
    - sqlContext: An SQLContext object that provides the environment to run SQL queries on structured data.
    - tableName: The name of the table that contains the email data.
    - year: The specific year to filter the emails by.

    Returns:
    - email_count: An integer representing the number of emails sent in the given year.
    '''
    # YOUR CODE HERE
    # raise NotImplementedError()
    query = f"""
    SELECT COUNT(*) FROM {tableName}
    WHERE YEAR(DateTime) = {year}
    """
    emailCount = sqlContext.sql(query).collect()[0][0]
    return emailCount

def countEmailsInYearByDFOperations(dataFrame: DataFrame, yearValue: int) -> int:
    '''
    This function calculates the number of emails sent in a given year by performing a filter operation on the DateTime column using DataFrame operations.

    Args:
    - dataFrame: A DataFrame containing the email data.
    - yearValue: The specific year to filter the emails by.

    Returns:
    - email_count: An integer representing the number of emails sent in the given year.
    '''
    # YOUR CODE HERE
    # raise NotImplementedError()
    filtered_df = dataFrame.filter(year(col('DateTime')) == yearValue)
    result = filtered_df.count()
    return result


In [118]:
countEmailsInYearBySQL(sqlContext,tableName,2000)

7462

In [119]:
countEmailsInYearByDFOperations(dfEmais,2000)

7462

In [128]:
# 1đ
from pyspark.sql import DataFrame
from pyspark.sql.functions import desc
def countEmailsPerDayBySQL(sqlContext: SQLContext, tableName: str) -> DataFrame:
    '''
    This function calculates the number of emails sent per day by performing a group by operation on the DateTime column 
    and sorts the result in descending order by the number of emails.

    Args:
    - sqlContext: An SQLContext object that provides the environment to run SQL queries on structured data.
    - tableName: The name of the table that contains the email data.

    Returns:
    - resultDF: A DataFrame containing the number of emails sent per day, sorted in descending order by email count.
                With schema:
                    root
                     |-- Date: integer (nullable = true)
                     |-- count: long (nullable = false)
    '''
    # YOUR CODE HERE
    # raise NotImplementedError()
    result = sqlContext.sql(f"SELECT DateTime as Date, COUNT(*) as count FROM {tableName} GROUP BY DateTime ORDER BY count DESC")
    return result

def countEmailsPerDayByDFOperations(dataFrame: DataFrame) -> DataFrame:
    '''
    This function calculates the number of emails sent per day by performing a group by operation on the DateTime column 
    and sorts the result in descending order by the number of emails.

    Args:
    - dataFrame: A DataFrame containing the email data.

    Returns:
    - resultDF: A DataFrame containing the number of emails sent per day, sorted in descending order by email count.
                With schema:
                    root
                     |-- Date: integer (nullable = true)
                     |-- count: long (nullable = false)
    '''
    # YOUR CODE HERE
    # raise NotImplementedError()
    result = dataFrame.groupBy('DateTime').count().withColumnRenamed("count", "count").orderBy(desc("count"))
    return result


In [129]:
countEmailsPerDayBySQL(sqlContext,tableName).show()

+----------+-----+
|      Date|count|
+----------+-----+
|2000-12-13|  140|
|2000-12-12|  128|
|2001-10-24|  111|
|2001-10-26|  108|
|2001-11-27|  107|
|2001-10-25|  105|
|2001-11-26|   99|
|2002-01-30|   95|
|2000-12-11|   95|
|2001-10-23|   94|
|2001-11-19|   93|
|2000-12-08|   91|
|2000-11-28|   91|
|2001-10-22|   88|
|2001-11-20|   88|
|2001-11-21|   88|
|2001-10-18|   86|
|2000-12-04|   84|
|2001-05-01|   83|
|2001-04-19|   83|
+----------+-----+
only showing top 20 rows



In [125]:
countEmailsPerDayByDFOperations(dfEmais).show()

+----------+-----+
|  DateTime|count|
+----------+-----+
|2000-12-13|  140|
|2000-12-12|  128|
|2001-10-24|  111|
|2001-10-26|  108|
|2001-11-27|  107|
|2001-10-25|  105|
|2001-11-26|   99|
|2002-01-30|   95|
|2000-12-11|   95|
|2001-10-23|   94|
|2001-11-19|   93|
|2000-12-08|   91|
|2000-11-28|   91|
|2001-10-22|   88|
|2001-11-20|   88|
|2001-11-21|   88|
|2001-10-18|   86|
|2000-12-04|   84|
|2001-05-01|   83|
|2001-04-19|   83|
+----------+-----+
only showing top 20 rows

