#### Names of people in the group

Please write the names of the people in your group in the next cell.

Name of person A: Dominik Ucher

Name of person B

In [0]:
# Loading modules that we need
from pyspark.sql.session import SparkSession
from pyspark.sql.types import *
from pyspark.sql.functions import *

# Add your imports below this line
import numpy as np
from pyspark.sql import SparkSession
from pyspark.sql import DataFrame
from pyspark.sql.functions import col
from pyspark.sql.functions import avg, stddev_pop

spark = SparkSession.builder.appName("task4").getOrCreate()

In [0]:
# A helper function to load a table (stored in Parquet format) from DBFS as a Spark DataFrame 
def load_df(table_name: "name of the table to load") -> DataFrame:
    return spark.read.parquet(table_name)

users_df = load_df("/user/hive/warehouse/users.parquet")
posts_df = load_df("/user/hive/warehouse/posts.parquet")

# Uncomment if you need
comments_df = load_df("/user/hive/warehouse/comments.parquet")
badges_df = load_df("/user/hive/warehouse/badges.parquet")

#### The Problem: Mining the Interests of Experts

In [0]:

# FUNKSJONER

def compute_pearsons_r(df: DataFrame, col1: str, col2: str) -> float:
    df.createOrReplaceTempView("df")
    result = spark.sql(f"SELECT (AVG({col1} * {col2}) - (AVG({col1}) * AVG({col2}))) / (stddev_pop({col1}) * stddev_pop({col2})) pearsons_r FROM df")

    pearsons_r = result.collect()[0]['pearsons_r']

    return pearsons_r

def make_tag_graph(df: DataFrame) -> DataFrame:
    df.createOrReplaceTempView("df")
    data = []  # Helper dataset
    tagsCollect = df.collect()

    for row in tagsCollect:
        tags = row['Tags']
        
        if tags:  
            # Ensure tags is a list before processing
            if not isinstance(tags, list):
                tags = [tags]
            
            for tag in tags:
                tagdata = tag.split(">")
                tagdata = tagdata[:-1]
                if (len(tagdata) == 1):
                    tag = tagdata[0].replace("<", "").strip()
                    data.append((tag,tag))
                else:
                    for i in range(len(tagdata) - 1):
                        tagdata[i] = tagdata[i].replace("<", "")
                        u = tagdata[i]
                        for j in range(i+1, len(tagdata)):
                            v = tagdata[j].replace("<", "").strip()
                            data.append((u, v))
                            data.append((v, u))
    
    if len(data) == 0:
        return None
    
    tag_graph = spark.createDataFrame(data, ["u", "v"])
    return tag_graph


def make_tag_graph2(df: DataFrame) -> DataFrame:
    df.createOrReplaceTempView("df")
    data = []  # Helper dataset
    tagsCollect = df.collect()

    for row in tagsCollect:
        tags = row['Tags']
        
        if tags:  
            tagdata = tags.split(">")
            tagdata = tagdata[:-1]
            if (len(tagdata) == 1):
                tag = tagdata[0].replace("<", "").strip()
                data.append((tag,tag))
            else:
                for i in range(len(tagdata) - 1):
                    tagdata[i] = tagdata[i].replace("<", "")
                    u = tagdata[i]
                    for j in range(i+1, len(tagdata)):
                        v = tagdata[j].replace("<", "").strip()
                        data.append((u, v))
                        data.append((v, u))
    if len(data) == 0:
        return None
    
    tag_graph = spark.createDataFrame(data, ["u", "v"])
    return tag_graph


def get_nodes(df: DataFrame) -> DataFrame:
    df.createOrReplaceTempView("df")

    tags = spark.sql("SELECT DISTINCT u as id FROM df WHERE u IS NOT NULL AND u != '' UNION SELECT DISTINCT v as id FROM df WHERE v IS NOT NULL AND v != ''")

    tag_graph = tags.select("id").distinct()

    return tag_graph


def get_edges(df: DataFrame) -> DataFrame:
    df.createOrReplaceTempView("df")
    data = []  
    tagsCollect = df.collect()
    
    for row in tagsCollect:
        u = row['u']
        v = row['v']

        if u != v:
            data.append((u, v))       
    if len(data) > 0:
        edge_graph = spark.createDataFrame(data, ["src", "dst"])
        return edge_graph
    else:
        return None 
       
    return edge_graph
  

def variableA(df: DataFrame, user: int) -> float:
    df.createOrReplaceTempView("df")
    result = spark.sql(f"SELECT CAST(Reputation AS INT) AS Reputation FROM df WHERE Id={user}")
    return result.collect()[0]['Reputation']


def variableB(df: DataFrame, user: int) -> float:
  df.createOrReplaceTempView("df")
  questions = spark.sql(f"SELECT * FROM df WHERE OwnerUserId = {user}")
  tag_graph = make_tag_graph(questions)
  if (tag_graph == None):
    return 0
  # nodes = get_nodes(tag_graph).count()
  nodes = 638
  edgegraph = get_edges(tag_graph)
  if edgegraph is None:
    return 0
  else:
    edges = edgegraph.count()
    return (nodes / edges)

# SLUTT AV FUNKSJONER



#### Info
Bellow you will see three different functions called Prototype 1-3. This is because i expiremented with different functions to see which one is the fastest. So therefore we will be using function 3 in the example, because it compiles the data the fastest. While 2 is slower and 1 is the slowest.
You can also try this out with the functions

In [0]:
# 3 PROTOTYPE FUNCTIONS
# PROTOTYPE 3 is computing the fastest

def Prototype1(df1: DataFrame, df2: DataFrame) -> DataFrame:
    data = []
    df1.createOrReplaceTempView("df1")
    df2.createOrReplaceTempView("df2")
    df1_filtered = spark.sql(f"SELECT DISTINCT Id, CAST(Reputation AS float) FROM df1")
    df2_filtered = spark.sql(f"SELECT OwnerUserId, Tags FROM df2 WHERE PostTypeId = 1")
    df1_filtered.createOrReplaceTempView("df1_filtered")
    df2_filtered.createOrReplaceTempView("df2_filtered")
    for row in df1_filtered.collect():
        userId = row['Id']
        Rep = row['Reputation']
        # a = variableA(df1_filtered, userId)
        b = variableB(df2_filtered, userId)
        if (b != 0):
            data.append((Rep, float(b)))
            # Only do the first 5
            if(len(data) > 5):
              return spark.createDataFrame(data, ['Reputation','Diversity'])

    return spark.createDataFrame(data, ['Reputation','Diversity'])

def Prototype2(df1: DataFrame, df2: DataFrame) -> DataFrame:
    data = []
    df1.createOrReplaceTempView("df1")
    df2.createOrReplaceTempView("df2")
    df12 = spark.sql(f"SELECT OwnerUserId, Reputation, collect_list(Tags) AS Tags FROM df1 JOIN df2 ON df1.Id = df2.OwnerUserId GROUP BY OwnerUserId, Reputation")
    df12.createOrReplaceTempView("df12")
    rows = df12.collect()
    for row in rows:
        userId = row['OwnerUserId']
        Rep = row['Reputation']
        # a = variableA(df1_filtered, userId)
        b = variableB(df12, userId)
        if (b != 0):
            data.append((Rep, float(b)))
            #Only do the first 5
            if (len(data) > 5):
              return spark.createDataFrame(data, ['Reputation','Diversity'])

    return spark.createDataFrame(data, ['Reputation','Diversity'])
  

def Prototype3(df1: DataFrame, df2: DataFrame) -> DataFrame:
    data = []
    df1.createOrReplaceTempView("df1")
    df2.createOrReplaceTempView("df2")
    df12 = spark.sql(f"SELECT OwnerUserId, CAST(Reputation AS float), concat_ws('', collect_list(Tags)) AS Tags FROM df1 JOIN df2 ON df1.Id = df2.OwnerUserId GROUP BY OwnerUserId, Reputation")
    df12.createOrReplaceTempView("df12")
    rows = df12.collect()
    for row in rows:
      userId = row['OwnerUserId']
      Rep = row['Reputation']
      tags = row['Tags']
      if tags:
        nodes = 638
        edge = get_edges(make_tag_graph(spark.sql(f"SELECT * FROM df12 WHERE OwnerUserId = {userId}")))
        if (edge is not None):
          answer = edge.count() / nodes
          data.append((Rep, answer))
          # Only do the first 50 (Takes the same amount of time as 1 and 2 does to compute 5.)
          # Therefore Prototype3 is almost 10 times faster than 1 and 2
          if(len(data) > 50):
              return spark.createDataFrame(data, ['Reputation','Diversity'])

    return spark.createDataFrame(data, ['Reputation','Diversity'])
  

# pro1 = Prototype1(users_df,posts_df)
# pro2 = Prototype2(users_df,posts_df)
pro3 = Prototype3(users_df,posts_df)

# pro1.show()
# pro1pearson = compute_pearsons_r(pro1, "Reputation", "Diversity")
# print(f"Computed Pearsons Prototype1: {pro1pearson}")

# pro2.show()
# pro2pearson = compute_pearsons_r(pro2, "Reputation", "Diversity")
# print(f"Computed Pearsons Prototype2: {pro2pearson}")

pro3.show()
pro3pearson = compute_pearsons_r(pro3, "Reputation", "Diversity")
print(f"Computed Pearsons Prototype3: {pro3pearson}")


+----------+--------------------+
|Reputation|           Diversity|
+----------+--------------------+
|     213.0|0.009404388714733543|
|    2952.0|  1.0376175548589341|
|     173.0|0.009404388714733543|
|     325.0| 0.03134796238244514|
|     176.0|0.009404388714733543|
|     283.0|0.009404388714733543|
|     403.0|0.047021943573667714|
|     721.0|0.009404388714733543|
|     121.0|   0.109717868338558|
|     549.0| 0.31347962382445144|
|    3967.0|  2.0595611285266457|
|     971.0| 0.27586206896551724|
|     355.0|0.003134796238244514|
|     273.0|0.003134796238244514|
|    4832.0|   3.310344827586207|
|     123.0| 0.03134796238244514|
|     151.0|0.009404388714733543|
|     443.0|0.047021943573667714|
|     634.0|0.003134796238244514|
|    2182.0|   5.078369905956113|
+----------+--------------------+
only showing top 20 rows

Computed Pearsons Prototype3: 0.6877209756913499


#### Explanation
The value of the pearson correlation coeeficient indicates that there is a strong positive correlation between users expertise level and their diversity interests. This means that the higher the users expertise level, the more likely they are to have a higher diversity for topics aswell.

It is also considered a very strong correlation. We can also interpret the answer as the expert users are likely to have general interests rather then specific ones, that can show how they have gotten such a high expertise level