#### Names of people in the group

Please write the names of the people in your group in the next cell.

Name of person A Karl Edvin Undheim

Name of person B

In [0]:
# We need to install 'ipython_unittest' to run unittests in a Jupyter notebook
!pip install -q ipython_unittest

You should consider upgrading via the '/databricks/python3/bin/python -m pip install --upgrade pip' command.[0m


In [0]:
# Loading modules that we need
from pyspark.sql.dataframe import DataFrame
from collections import Counter
from pyspark.sql.functions import desc
import math # for subtask 1.1
from pyspark.sql.types import * # for subtask 1.2

In [0]:
# A helper function to load a table (stored in Parquet format) from DBFS as a Spark DataFrame 
def load_df(table_name: "name of the table to load") -> DataFrame:
    return spark.read.parquet(table_name)

users_df = load_df("/user/hive/warehouse/users")
posts_df = load_df("/user/hive/warehouse/posts")

#### Subtask 1: implementing two functions
Implement these two functions:
1. 'compute_pearsons_r' that receives a DataFrame and two column names and returns the [Pearson correlation coefficient](https://en.wikipedia.org/wiki/Pearson_correlation_coefficient) between values of two columns;
2. 'make_tag_graph' that in the input receives the DataFrame containing the records related to 'questions' and returns a DataFrame with two columns 'u' and 'v'; the record for row i from the resulting DataFrame is a tuple (u_i, v_i). u_i and v_j are distinct tags and have appeared together for a question.

Please note that you should implement the 'compute_pearsons_r' yourself, so you should not use the 'DataFrame.stat.corr' method. Nevertheless, you can use 'DataFrame.stat.corr' to verify the correctness of your implementation.

In [0]:
def compute_pearsons_r(df: "a DataFrame", col1: "name of column A", col2: "name of column B") -> float:
  # Columns collected as arrays
  A = df.select(col1).collect()
  B = df.select(col2).collect()
  # Below are all the variables for the rearranged sample equation found on wikipedia.
  n=df.count()
  sumA = 0
  sumB = 0
  sumAB = 0
  sumAA = 0
  sumBB = 0
  
  # Simple iteration through each row in the two columns to get the unknowns
  for i in range(n):
    # If a row has NULL in either col A og col B this row is skipped
    if A[i][0] is None or B[i][0] is None:
      continue
    sumA+=A[i][0]
    sumB+=B[i][0]
    sumAB+=A[i][0]*B[i][0]
    sumAA+=A[i][0]**2
    sumBB+=B[i][0]**2
  
  # Rearranged sample equation
  r_AB = (n*sumAB-sumA*sumB)/(math.sqrt(n*sumAA-sumA**2)*math.sqrt(n*sumBB-sumB**2))
  return r_AB

def make_tag_graph(df: "DataFrame containing question data") -> DataFrame:
    # List to hold the rows
    tagPairs = []
    # Tags column collected as array
    df = df.select("Tags").collect()
    # schema for resulting dataframe
    tagsDf_schema = StructType([StructField('u', StringType(), False),
                                StructField('v', StringType(), False)])
    
    # Iteration through every question by index
    for i in range(len(df)):
      # The question's tags are put in a list.
      tags = df[i][0][1:-1].split("><")
      if len(tags)<2:
        # Tags appearing alone are included
        tagPairs += [(tags[0], '')]
      else:
        # Here I find all unordered combinations of tag pairs for the question.
        tagPairs += [(a, b) for idx, a in enumerate(tags) for b in tags[idx + 1:]]
        # I also need every tag pair found in previous line but switched to get all ordered combinations.
        tagPairs += [(b, a) for idx, a in enumerate(tags) for b in tags[idx + 1:]]
    tagsDf = spark.createDataFrame(data=tagPairs, schema = tagsDf_schema)
    
    return tagsDf

In [0]:
# Imprting GraphFrames graph library; make sure you have GraphFrames installed on the cluster
from graphframes import *

#### Subtask 2: implementing three functions
Impelment these three functions:
1. 'get_nodes' that, given the result from execution of 'make_tag_graph', returns a DataFrame with one column named 'id' that includes the tags that have appeared in the tag graph;
2. 'get_edges' that, given the result from execution of 'make_tag_graph', returns a DataFrame with two columns 'src' and 'dst' where 'src' is the source node and 'dst' is the destination node.
3. 'compute_pagerank' that receives a GraphFrames graph object in the input and computes the PageRank for nodes in the graph and returns the result as a DataFrame with two columns named 'id' and 'pagerank'; the rows in the in the resulting DataFrame should be sorted by the values of 'pagerank' column.

Note that the term 'tag graph' in this context refers to the DataFrame reuturned by executing 'make_tag_graph'. Furthermore, 'src' and 'dst' are distinct, so 'src' != 'dst'.

In [0]:
def get_nodes(df: "DataFrame of the tag graph") -> DataFrame:
  # Since the tag graph contains all ordered combinations of tag pairs, we know that a single column must contain every distinct tag.
  # I use a query to get all distinct tags from column u.
  df.createOrReplaceTempView("df");
  query = "SELECT DISTINCT df.u FROM df"
  sqlDf = spark.sql(query)
  # Rename column u to id
  return sqlDf.withColumnRenamed('u', 'id')

def get_edges(df: "DataFrame of the tag graph") -> DataFrame:
  # Resulting dataframe is the same as the tag graph but without single tags(since they have no destination)
  # and with column u and v renamed to src and dst respectively.
  return df.filter(df.v!='').withColumnRenamed('u', 'src').withColumnRenamed('v', 'dst')

def compute_pagerank(graph: "a Graphframes graph") -> DataFrame:
  # Here I use the graphframe function inDegrees to get the pagerank. Followed by sorting and renaming the columns
  df = graph.inDegrees
  return df.sort(df.inDegree.desc()).withColumnRenamed('inDegree', 'pagerank')

In [0]:
# Loading 'ipython_unittest' so we can use '%%unittest_main' magic command
%load_ext ipython_unittest

The ipython_unittest extension is already loaded. To reload it, use:
  %reload_ext ipython_unittest


#### Subtask 3: validating the implementation by running the tests

Run the cell below and make sure that all the tests run successfully.

In [0]:
%%unittest_main
class TestTask3(unittest.TestCase):
  
  error_threshold = 0.03
  
  def test_corr1(self):
    # Pearson correlation coefficient between 'user reputation' and 'upvotes' received by users
    result = compute_pearsons_r(users_df, "Reputation", "UpVotes")
    self.assertLessEqual(abs(result-0.5218138310114108), self.error_threshold)
    print(result)
  
  def test_corr2(self):
    # Pearson correlation coefficient between 'user reputation' and 'downvotes' received by users
    result = compute_pearsons_r(users_df, "Reputation", "DownVotes")
    self.assertLessEqual(abs(result-0.1473558141546844), self.error_threshold)
    print(result)

  def test_corr3(self):
    # Pearson correlation coefficient between 'question score' and the 'number of answers' it received
    result = compute_pearsons_r(posts_df[posts_df["PostTypeId"] == 1], "Score", "AnswerCount")
    self.assertLessEqual(abs(result-0.47855272641249674), self.error_threshold)
    print(result)
    
  def test_make_tag_graph(self):
    result = make_tag_graph(df=posts_df[posts_df["PostTypeId"] == 1])
    self.assertIsInstance(result, DataFrame)
    
    coulmn_names = Counter(map(str.lower, ['u', 'v']))
    self.assertCountEqual(coulmn_names, Counter(map(str.lower, result.columns)), "Missing column(s) or column name mismatch")
    
    display(result)
    
    self.assertEqual(result.count(), 228830)
    
  def test_get_nodes(self):
    result = make_tag_graph(df=posts_df[posts_df["PostTypeId"] == 1])
    n = get_nodes(result)
    self.assertEqual(n.count(), 638)
    n.show()

  def test_get_edges(self):
    result = make_tag_graph(df=posts_df[posts_df["PostTypeId"] == 1])
    e = get_edges(result)
    
    coulmn_names = Counter(map(str.lower, ['src', 'dst']))
    self.assertCountEqual(coulmn_names, Counter(map(str.lower, e.columns)), "Missing column(s) or column name mismatch")
    
    self.assertEqual(e.count(), 225290)
    e.show()
    
  def test_compute_pagerank(self):
    result = make_tag_graph(df=posts_df[posts_df["PostTypeId"] == 1])
    n = get_nodes(result)
    e = get_edges(result)
    g = GraphFrame(n, e)
    ranks = compute_pagerank(g)
    self.assertEqual(ranks.first()[0], 'machine-learning')
    ranks.show()

+-------------------+--------+
|                 id|pagerank|
+-------------------+--------+
|   machine-learning|   23237|
|             python|   13930|
|      deep-learning|   10944|
|     neural-network|    9767|
|     classification|    6815|
|              keras|    6615|
|         tensorflow|    4878|
|       scikit-learn|    4870|
|                nlp|    4640|
|        time-series|    3448|
|                cnn|    3176|
|         regression|    3158|
|            dataset|    2865|
|        data-mining|    2790|
|               lstm|    2688|
|predictive-modeling|    2652|
|                  r|    2643|
|         clustering|    2594|
|             pandas|    2374|
|         statistics|    2177|
+-------------------+--------+
only showing top 20 rows

0.5218138159588875
0.14735580966688025
0.4753098634046644
+----------------+----------------+
|             src|             dst|
+----------------+----------------+
|       education|     open-source|
|     open-source|       edu

u,v
machine-learning,
education,open-source
open-source,education
data-mining,definitions
definitions,data-mining
databases,
machine-learning,bigdata
machine-learning,libsvm
bigdata,libsvm
bigdata,machine-learning


#### Subtask 4: answering to questions about Spark related concepts

Please write a short description for the terms below---one to two short paragraphs for each term. Don't copy-paste; instead, write your own understanding.

1. What do the terms 'User-Defined Functions (UDFs)', 'Data Locality', 'Bucketing', 'Distributed Filesystem' mean in the context of Spark?

Write your descriptions in the next cell.

- User-Defined-Funtions are functions that the user can define themselves to be used on a dataframe or dataset. This is useful if you want a function to do something which is not provided in the framework for the datatype. This function can then be reused throughout your code.

- Data locality in spark means that it wants to execute the code as close as possible to where the data is located. This is because it makes the computation much faster. If they are far from another we then want to move them together. Since the code is generally much smaller than the data, the code is transferred to the data if possible.

- Bucketing is an optimization technique in Spark used to decide how to partition data such that data shuffle(when data is rearranged between partitions) is minimized. The idea is that data is partitioned into buckets based on a given column(bucketing column), thus shuffling the data beforehand instead of during a query.  

- As we know a distributed filesystem is a filesystem distributed across different servers or locations connected by a network. Spark is simply a processing engine and it uses data from a distributed filesystem, such as Hadoop distributed file system. In this case we are using DBFS(Databricks File System).