## ENSF 612 - Assignment 1
### Author: Steven Duong (30022492)

## Question 1

### Original Source Code

In [0]:
from collections import defaultdict
filteredKV1 = readFile(filename1)
filteredKV2 = readFile(filename2)

# The following pyspark method is called to read two files, do word counts on each file, and to
# return the word counts

def readFile(filename):
    infile = sc.textFile(filename) # here assume that sc is SparkContext
    counts = infile.flatMap(lambda line: line.split(" ")).map(lambda word:
    (word, 1)).collect()
    return doFilter(counts1)

def doFilter(count):
    key_val = defaultdict(int)
    for item in counts:
    key = item[0]
    val = item[1]
    key_val[key] += int(val)
    filtered_key_val = dict()
    
    for k, v in key_val.items():
        if v >= 100:
            filtered_key_val[k] = v
            
    return filtered_key_val

### Issues

1. in the readFile function, doFilter takes the incorrect argument of 'counts1', causing a runtime error. In order to fix this, we would simply pass the argument 'counts' into the doFilter function call.

2. The use of the collect() method: This method collects all the data from the RDD and stores it into memory (driver node), which is usually not a good idea for large datasets. In this case, collecting 2 TB of data into the memory for each file of an 8 GB machine would most likely cause a memory overflow. To solve this error, we can replace the collect() method with the reduceByKey method. This would eliminate the requirement to store temporary results in memory and prevent it from exceeding its capacity.

3. The doFilter function uses for loops and python dictionaries to compute the word count. This function is not efficient because it collects all the data into the driver node's memory and performs the word count in memory. For large datasets, the memory on the driver node may not be sufficient to hold all the data (8 GB of RAM in this case), leading to a crash on a personal machine. A more efficient solution would be to perform the word count in a distributed fashion using the Spark operations of reduceByKey and filter, which allows data to be processed in parallel within a cluster.

### Updated Code

In [0]:
from collections import defaultdict
filteredKV1 = readFile(filename1)
filteredKV2 = readFile(filename2)

# Replaced the collect() method with the reduceByKey method.
def readFile(filename):
    infile = sc.textFile(filename) # reads the input file as SparkContext and creates an RDD.
    counts = infile.flatMap(lambda line: line.split(" ")).map(lambda word:
    (word, 1)).reduceByKey(lambda a, b: a + b) # aggregates the word-value RDD pairs as word-count RDD pairs. KVP: (word, word count).
    return doFilter(counts) # pass reduced word-count RDD as an argument.

# Implemented the filter Spark operation to allow for parallel data processing
# using cluster computing. This method filters the RDD 'counts' to keep only
# words with at least 100 counts.
def doFilter(counts):
    filtered = counts.filter(lambda x: x[1] >= 100) # x is a tuple, and we compare if: word count >= 100.
    return filtered # return filtered word-count RDD containing only words with a count of greater or equal to 100.

## Question 2

### PySpark Programming

In [0]:
from pyspark.sql import SparkSession

# Creating a Spark session.
spark = SparkSession.builder.appName("StackOverflowAnalysis").getOrCreate()

# Function to load the CSV files.
def load_csv(filename):
    df = spark.read.format("csv").option("header", "true").option("multiline", "true").option('escape', "\"").load(filename)
    return df

# Function to show the total ViewCount per file.
def total_view_count(df):
    total_vc = df.agg({"ViewCount": "sum"}).collect()
    return total_vc[0][0]

# Function to show the total sample of ViewCounts per file.
def view_count_samples(df):
    return df.count()

# Loading the three CSV files.
df_spark = load_csv("/FileStore/tables/SO_Spark.csv")
df_ml = load_csv("/FileStore/tables/SO_ML.csv")
df_security = load_csv("/FileStore/tables/SO_Security.csv")

# Creating a pyspark dataframe for the total view count sample size.
list_of_view_count_samples = [("SO-Spark", view_count_samples(df_spark)), ("SO-ML", view_count_samples(df_ml)), ("SO-Security", view_count_samples(df_security))]
df = spark.createDataFrame(list_of_view_count_samples, ['File Name', 'ViewCount Sample Size'])
df.show()

# Creating a pyspark dataframe for the total view count (sum).
list_of_view_counts = [("SO-Spark", total_view_count(df_spark)), ("SO-ML", total_view_count(df_ml)), ("SO-Security", total_view_count(df_security))]
df = spark.createDataFrame(list_of_view_counts, ['File Name', 'Total ViewCount (Sum)'])
df.show()


+-----------+---------------------+
|  File Name|ViewCount Sample Size|
+-----------+---------------------+
|   SO-Spark|                50000|
|      SO-ML|                50000|
|SO-Security|                50000|
+-----------+---------------------+

+-----------+---------------------+
|  File Name|Total ViewCount (Sum)|
+-----------+---------------------+
|   SO-Spark|         1.40958653E8|
|      SO-ML|         1.02120351E8|
|SO-Security|         1.63597256E8|
+-----------+---------------------+

