#### Names of people in the group

Please write the names of the people in your group in the next cell.

Anne Torgersen

Aaryan Neupane

In [None]:
# We need to install 'ipython_unittest' to run unittests in a Jupyter notebook
!pip install -q ipython_unittest

In [None]:
# Loading modules that we need
from pyspark.sql.dataframe import DataFrame
from collections import Counter
from pyspark.sql.functions import desc, col
from numpy import sqrt
from itertools import combinations

In [None]:
# A helper function to load a table (stored in Parquet format) from DBFS as a Spark DataFrame 
def load_df(table_name: "name of the table to load") -> DataFrame:
    return spark.read.parquet(table_name)

users_df = load_df("/user/hive/warehouse/users")
posts_df = load_df("/user/hive/warehouse/posts")

#### Subtask 1: implementing two functions
Implement these two functions:
1. 'compute_pearsons_r' that receives a DataFrame and two column names and returns the [Pearson correlation coefficient](https://en.wikipedia.org/wiki/Pearson_correlation_coefficient) between values of two columns;
2. 'make_tag_graph' that in the input receives the DataFrame containing the records related to 'questions' and returns a DataFrame with two columns 'u' and 'v'; the record for row i from the resulting DataFrame is a tuple (u_i, v_i). u_i and v_j are distinct tags and have appeared together for a question.

Please note that you should implement the 'compute_pearsons_r' yourself, so you should not use the 'DataFrame.stat.corr' method. Nevertheless, you can use 'DataFrame.stat.corr' to verify the correctness of your implementation.

In [None]:
def calc_avg_df(df: "a DataFrame", column: "Name of column"):
    # Calculate the mean of the given column in the provided dataframe
    return df.select(col(column)).agg({column: "avg"}).collect()[0][0]

def compute_pearsons_r(df: "a DataFrame", col1: "name of column A", col2: "name of column B") -> float:
    # Find the average values
    x_avg = calc_avg_df(df, col1)
    y_avg = calc_avg_df(df, col2)

    # Select the correct df with the provided columns 
    selected_df = df.select(col(col1), col(col2)).collect()

    # Variables to calculate the coeff.
    cov = 0
    var_x = 0
    var_y = 0

    for row in selected_df:
        x_val = row[0]
        y_val = row[1]
        if x_val is not None and y_val is not None:
            cov += (int(x_val) - x_avg) * (int(y_val) - y_avg)
            var_x += (int(x_val) - x_avg) ** 2
            var_y += (int(y_val) - y_avg) ** 2

    return cov / sqrt(var_x * var_y)

def make_unique_tags(row: list):
    # Create an output set to exclude duplicates
    tags = set()

    # Handle cases with only one tag
    if len(row) == 1:
        tags.add((row[0], row[0]))
        return tags
    
    tag_combinations = combinations(row, 2) # Generate combinations of pairs of tags from each row

    for pair in tag_combinations: # Add each pair and its reverse to the set of records
        tags.add(pair)
        tags.add((pair[1], pair[0]))

    return tags

def make_tag_graph(df: "DataFrame containing question data") -> DataFrame:
    # Filter the DataFrame to only select the Tags column
    posts_df = df.select(col("Tags")).collect()
    
    # Create an output list
    records = list()
    
    # Iterate through the rows in the filtered DataFrame
    for post in posts_df:  
        tag_list = post["Tags"].replace("<", "").rstrip(">").split(">")  # Create an array consisting of tags in each row
        unique_tags = make_unique_tags(tag_list) # Return the unique tags in each question
        for pair in unique_tags:
            records.append(pair)

    # Convert the set of records to a DataFrame
    result_df = spark.createDataFrame(records, ["u", "v"])

    return result_df

#### Subtask 2: implementing three functions
Impelment these three functions:
1. 'get_nodes' that, given the result from execution of 'make_tag_graph', returns a DataFrame with one column named 'id' that includes the tags that have appeared in the tag graph;
2. 'get_edges' that, given the result from execution of 'make_tag_graph', returns a DataFrame with two columns 'src' and 'dst' where 'src' is the source node and 'dst' is the destination node.


Note that the term 'tag graph' in this context refers to the DataFrame reuturned by executing 'make_tag_graph'. Furthermore, 'src' and 'dst' are distinct, so 'src' != 'dst'.

In [None]:
def get_nodes(df: "DataFrame of the tag graph") -> DataFrame:
  # Select unique values and rename the column u to id
   unique_tags = df.select(col("u").alias("id")).distinct()
   return unique_tags

def get_edges(df: "DataFrame of the tag graph") -> DataFrame:
  # Filter out equal values and rename the columns u and v to src and dst
  src_to_dst = df.filter(col("u") != col("v")).select(col("u").alias("src"), col("v").alias("dst"))
  return src_to_dst

In [None]:
# Loading 'ipython_unittest' so we can use '%%unittest_main' magic command
%load_ext ipython_unittest

#### Subtask 3: validating the implementation by running the tests

Run the cell below and make sure that all the tests run successfully.

In [None]:
%%unittest_main
class TestTask3(unittest.TestCase):
  
  error_threshold = 0.03
  
  def test_corr1(self):
    # Pearson correlation coefficient between 'user reputation' and 'upvotes' received by users
    result = compute_pearsons_r(users_df, "Reputation", "UpVotes")
    self.assertLessEqual(abs(result-0.5218138310114108), self.error_threshold)
    print(result)
  
  def test_corr2(self):
    # Pearson correlation coefficient between 'user reputation' and 'downvotes' received by users
    result = compute_pearsons_r(users_df, "Reputation", "DownVotes")
    self.assertLessEqual(abs(result-0.1473558141546844), self.error_threshold)
    print(result)

  def test_corr3(self):
    # Pearson correlation coefficient between 'question score' and the 'number of answers' it received
    result = compute_pearsons_r(posts_df[posts_df["PostTypeId"] == 1], "Score", "AnswerCount")
    self.assertLessEqual(abs(result-0.47855272641249674), self.error_threshold)
    print(result)
    
  def test_make_tag_graph(self):
    result = make_tag_graph(df=posts_df[posts_df["PostTypeId"] == 1])
    self.assertIsInstance(result, DataFrame)
    
    coulmn_names = Counter(map(str.lower, ['u', 'v']))
    self.assertCountEqual(coulmn_names, Counter(map(str.lower, result.columns)), "Missing column(s) or column name mismatch")
    
    display(result)
    
    self.assertEqual(result.count(), 228830)
    
  def test_get_nodes(self):
    result = make_tag_graph(df=posts_df[posts_df["PostTypeId"] == 1])
    n = get_nodes(result)
    self.assertEqual(n.count(), 638)
    n.show()

  def test_get_edges(self):
    result = make_tag_graph(df=posts_df[posts_df["PostTypeId"] == 1])
    e = get_edges(result)
    
    coulmn_names = Counter(map(str.lower, ['src', 'dst']))
    self.assertCountEqual(coulmn_names, Counter(map(str.lower, e.columns)), "Missing column(s) or column name mismatch")
    
    self.assertEqual(e.count(), 225290)
    e.show()



0.5218138821606071
0.14735583044990214
0.47530986340467524
+----------------+----------------+
|             src|             dst|
+----------------+----------------+
|     open-source|       education|
|       education|     open-source|
|     data-mining|     definitions|
|     definitions|     data-mining|
|         bigdata|machine-learning|
|          libsvm|         bigdata|
|          libsvm|machine-learning|
|machine-learning|         bigdata|
|machine-learning|          libsvm|
|         bigdata|          libsvm|
|         bigdata|     scalability|
|      efficiency|         bigdata|
|     performance|     scalability|
|     scalability|     performance|
|         bigdata|      efficiency|
|     scalability|      efficiency|
|      efficiency|     performance|
|     performance|      efficiency|
|      efficiency|     scalability|
|     scalability|         bigdata|
+----------------+----------------+
only showing top 20 rows

+----------------+
|              id|
+------------

u,v
machine-learning,machine-learning
open-source,education
education,open-source
data-mining,definitions
definitions,data-mining
databases,databases
bigdata,machine-learning
libsvm,bigdata
libsvm,machine-learning
machine-learning,bigdata


Success

......
----------------------------------------------------------------------
Ran 6 tests in 33.422s

OK
Out[35]: <unittest.runner.TextTestResult run=6 errors=0 failures=0>

#### Subtask 4: answering to questions about Spark related concepts

Please write a short description for the terms below---one to two short paragraphs for each term. Don't copy-paste; instead, write your own understanding.

1. What do the terms 'User-Defined Functions (UDFs)', 'Data Locality', 'Bucketing', 'Distributed Filesystem' mean in the context of Spark?

Write your descriptions in the next cell.

-
    User-Defined Functions (UDFs) are intuitively functions defined by the user themselves. The users can create functions with their own logic in familiar programming languages. For instance, the python functions defined in this assignment. The UDFs are often use combinations of Spark built-in methods. Note that UDFs will be less efficient than Spark built-in methods, and should only be used in cases where the Spark interface lacks the functionality of performing the logic needed. 

    Data Locality in Spark refers to the concept of placing computation tasks as close to the data block as possible. Moving data between machines is expensive, so the work should be executed where the data is located. Therefore, the Spark executors are only running on the machines containing the data nodes needed to execute their tasks. Data locality is implemented in the Spark interface, and you can inspect the locality level for a given task to check if it ran locally or not. 

    Bucketing is a technique used to enhance the Spark application performance. Data is partitioned into buckets by hashing. A hash value is calculated for each column in the input data, and the columns gets allocated to the right buckets. Sorting the data into buckets leads to  improved query performance when running operations like Join.

    Apache Spark, which is an open-source distributed data processing framework uses a distributed file system (DFS) as data storage. Together, Spark and Hadoop distributed file system (HDFS) stores and processes big data. HDFS is designed to store large files across a cluster of machines and hard drives. In general, a DFS a file system that manages and stores data across multiple storage devices or servers. 

    HDFS have the features Scalable storage, parallel computing and high Fault tolerance. Scalable storage means that it can handle very large volumes of data without requiring expensive hardware upgrades. Parallel computing refers to the concept of multiple processors simultaneously executing tasks. HDFS also have high fault tolerance because they have several execution nodes ready to perform the same tasks, in case something happens to the work node. 

    HDFS has two major components, the NameNode and the DataNode. The NameNodes handles the metadata such as name of the file and location. The DataNodes stores these files in form of blocks. The NameNodes usually appear one per cluster, while the DataNodes appear one per machine. The client interacts with a NameNode when it want’s to extract some files. The NameNode will provide the client with block locations for where the files are stored such that the client can interact directly with the DataNode.