In [1]:
import warnings
warnings.filterwarnings("ignore")
import os
import sys
#import tracemalloc

os.environ['PYSPARK_PYTHON'] = sys.executable
os.environ['PYSPARK_DRIVER_PYTHON'] = sys.executable

#Imports
from pyspark.sql import functions as psfunctions
from pyspark.sql import SparkSession
from pyspark.sql.types import *
from pyspark.sql.functions import split, udf, desc, concat, col, lit
from pyspark.sql import SQLContext
from pyspark.sql import HiveContext
from pyspark.sql.window import Window
from pyspark.sql.functions import substring

#Initialise session
spark = SparkSession.builder.appName("cite").getOrCreate()
sc = spark.sparkContext
sqlc = SQLContext(sc)
sqlContext = HiveContext(sc)

#load the ralations into df
relation_df = spark.read.option("delimiter", "\t").csv("100k.txt")

#Rename columns
relation_df = relation_df.select(col("_c0").alias("subject"), col("_c1").alias("relation"), col("_c2").alias("object"))
relation_df = relation_df.withColumn("object",psfunctions.regexp_replace('object', ' .', ''))
relation_df.show()

#Count how many relations there are
relation_index_df = relation_df.select("relation").distinct()
relation_index_df = relation_index_df.withColumn("index", psfunctions.row_number().over(Window.orderBy("relation")))

#Creating a list containing the different df
relation_list = relation_index_df.select("relation").toPandas()
relation_list = list(relation_list['relation'])
relation_df_list = list(map(lambda rel : relation_df.filter(relation_df.relation == rel), relation_list))
relation_dict = dict()
relation_dict_reversed = dict()
for i,x in enumerate(relation_list):
    relation_dict[x] = i
    relation_dict_reversed[i] = x

#Creating a dict for all subject and objects
objects_list = relation_df.select("object").toPandas()
objects_list = list(objects_list["object"])
subjects_list = relation_df.select("subject").toPandas()
subjects_list = list(subjects_list["subject"])
so_list = objects_list + subjects_list
so_set = set(so_list)

#Index -> SO
so_dict = dict()

#SO -> Index
so_dict_reverse = dict()

for i, x in enumerate(so_set):
    so_dict_reverse[x] = i
    so_dict[i] = x

map_col = psfunctions.create_map([psfunctions.lit(x) for i in so_dict_reverse.items() for x in i])
relation_df_list_numbers = list(map(lambda df : df.withColumn("subject", map_col[psfunctions.col('subject')].cast("int")).withColumn('object', map_col[psfunctions.col('object')].cast("int")), relation_df_list))

#Dropping the relation column, as it is superfluous
relation_df_list = list(map(lambda df : df.drop("relation"), relation_df_list))
relation_df_list_numbers = list(map(lambda df : df.drop("relation"), relation_df_list_numbers))
relation_df_list[0].show()


+-----------+----------------+--------------------+
|    subject|        relation|              object|
+-----------+----------------+--------------------+
|wsdbm:City0|gn:parentCountry|     wsdbm:Country20|
|wsdbm:City1|gn:parentCountry|      wsdbm:Country0|
|wsdbm:City2|gn:parentCountry|      wsdbm:Country1|
|wsdbm:City3|gn:parentCountry|      wsdbm:Country6|
|wsdbm:City4|gn:parentCountry|     wsdbm:Country15|
|wsdbm:City5|gn:parentCountry|      wsdbm:Country1|
|wsdbm:City6|gn:parentCountry|     wsdbm:Country11|
|wsdbm:City7|gn:parentCountry|      wsdbm:Country3|
|wsdbm:City8|gn:parentCountry|      wsdbm:Country1|
|wsdbm:City9|gn:parentCountry|      wsdbm:Country0|
|wsdbm:User0|      sorg:email|"waxedinglesonarc...|
|wsdbm:User0|    wsdbm:userId|           "1806723"|
|wsdbm:User0|   wsdbm:follows|        wsdbm:User24|
|wsdbm:User0|   wsdbm:follows|        wsdbm:User27|
|wsdbm:User0|   wsdbm:follows|        wsdbm:User37|
|wsdbm:User0|   wsdbm:follows|       wsdbm:User110|
|wsdbm:User0

In [2]:
###HashJoin

#Imports
from collections import defaultdict
import pandas as pd
import copy
import time
import numpy as np
from IPython.display import display

def hashJoin(df1, key1, df2, key2):
    
    #Setup dict that stores all rows according to their key
    hash_map = defaultdict(list)
    df1 = df1.toPandas()
    df2 = df2.toPandas()
    for index, row in df1.iterrows():
        hash_map[row[key1]].append(list(row))
    
    #Defining column names for the returned/constructed df 
    #EXAMPLE: [subject, 1, 2, 3, ..., object]
    #Thus we can garantee multiple consecutive joines with joines on the (object, subject) pair
    col_list = df1.columns.tolist()
    col_list2 = df2.columns.tolist()
    col_list= col_list + col_list2
    col_len = len(col_list)
    col_list = [str(i) for i in range(0, col_len-1)]
    col_list[col_len-2] = "object"
    col_list[0] = "subject"
    
    #Calculate needed size for joined df
    counter = 0
    t1 = time.time()
    for index, row in df2.iterrows():
        for h in hash_map[row[key2]]:
            counter += 1
    
    #Creating temporary/join df that will be filled with the joined rows
    tmp_df = pd.DataFrame(index=np.arange(counter), columns = col_list)
    
    #Joining different rows by iterating trough the second df and finding and linking the matched rows in the hashmap
    row_counter = 0
    for index, row in df2.iterrows():
        t1 = time.time()
        for h in hash_map[row[key2]]:
            c = copy.deepcopy(h)
            c.pop()
            c = c + list(row)
            tmp_df.loc[row_counter] = c
            row_counter += 1
    return spark.createDataFrame(tmp_df)

In [3]:
#Performing the requested SQL expression with HASHJOIN from right to left
#Takes approx 30 Mins on i7-7700hq for 100k.txt
t1 = time.time()
likes_hasreview = hashJoin(relation_df_list[relation_dict["wsdbm:likes"]], "object", relation_df_list[relation_dict["rev:hasReview"]], "subject")
print("First Join took ", time.time()-t1, "seconds")

t1 = time.time()
friends_likes_hasreview = hashJoin(relation_df_list[relation_dict["wsdbm:friendOf"]], "object", likes_hasreview, "subject")
print("Second Join took ", time.time()-t1, "seconds")

t1 = time.time()
follows_friends_likes_hasreview_reversed = hashJoin(relation_df_list[relation_dict["wsdbm:follows"]], "object", friends_likes_hasreview, "subject")
print("Third Join took ", time.time()-t1, "seconds")

follows_friends_likes_hasreview_reversed.show()

First Join took  1.220749855041504 seconds
Second Join took  29.75323724746704 seconds
Third Join took  949.3976566791534 seconds
+-------------+------------+------------+--------------+--------------+
|      subject|           1|           2|             3|        object|
+-------------+------------+------------+--------------+--------------+
|wsdbm:User630| wsdbm:User9|wsdbm:User20|wsdbm:Product0|wsdbm:Review24|
| wsdbm:User26|wsdbm:User57|wsdbm:User20|wsdbm:Product0|wsdbm:Review24|
|wsdbm:User197|wsdbm:User57|wsdbm:User20|wsdbm:Product0|wsdbm:Review24|
|wsdbm:User351|wsdbm:User57|wsdbm:User20|wsdbm:Product0|wsdbm:Review24|
|wsdbm:User396|wsdbm:User57|wsdbm:User20|wsdbm:Product0|wsdbm:Review24|
|wsdbm:User704|wsdbm:User57|wsdbm:User20|wsdbm:Product0|wsdbm:Review24|
|wsdbm:User173|wsdbm:User67|wsdbm:User20|wsdbm:Product0|wsdbm:Review24|
| wsdbm:User95|wsdbm:User81|wsdbm:User20|wsdbm:Product0|wsdbm:Review24|
|wsdbm:User522|wsdbm:User81|wsdbm:User20|wsdbm:Product0|wsdbm:Review24|
|wsdbm

In [4]:
"""
#Performing the requested SQL expression with HASHJOIN from right to left
#Takes approx 30 Mins on i7-7700hq for 100k.txt
tracemalloc.start()
t1 = time.time()
likes_hasreview = hashJoin(relation_df_list[relation_dict["wsdbm:likes"]], "object", relation_df_list[relation_dict["rev:hasReview"]], "subject")
print("First Join took ", time.time()-t1, "seconds and ", tracemalloc.get_traced_memory(),"memory")
tracemalloc.stop()
tracemalloc.reset_peak()

tracemalloc.start()
t1 = time.time()
friends_likes_hasreview = hashJoin(relation_df_list[relation_dict["wsdbm:friendOf"]], "object", likes_hasreview, "subject")
print("Second Join took ", time.time()-t1, "seconds and ", tracemalloc.get_traced_memory(),"memory")
tracemalloc.stop()
tracemalloc.reset_peak()

tracemalloc.start()
t1 = time.time()
follows_friends_likes_hasreview_reversed = hashJoin(relation_df_list[relation_dict["wsdbm:follows"]], "object", friends_likes_hasreview, "subject")
print("Third Join took ", time.time()-t1, "seconds and ", tracemalloc.get_traced_memory(),"memory")
tracemalloc.stop()
tracemalloc.reset_peak()
follows_friends_likes_hasreview_reversed.show()
"""

'\n#Performing the requested SQL expression with HASHJOIN from right to left\n#Takes approx 30 Mins on i7-7700hq for 100k.txt\ntracemalloc.start()\nt1 = time.time()\nlikes_hasreview = hashJoin(relation_df_list[relation_dict["wsdbm:likes"]], "object", relation_df_list[relation_dict["rev:hasReview"]], "subject")\nprint("First Join took ", time.time()-t1, "seconds and ", tracemalloc.get_traced_memory(),"memory")\ntracemalloc.stop()\ntracemalloc.reset_peak()\n\ntracemalloc.start()\nt1 = time.time()\nfriends_likes_hasreview = hashJoin(relation_df_list[relation_dict["wsdbm:friendOf"]], "object", likes_hasreview, "subject")\nprint("Second Join took ", time.time()-t1, "seconds and ", tracemalloc.get_traced_memory(),"memory")\ntracemalloc.stop()\ntracemalloc.reset_peak()\n\ntracemalloc.start()\nt1 = time.time()\nfollows_friends_likes_hasreview_reversed = hashJoin(relation_df_list[relation_dict["wsdbm:follows"]], "object", friends_likes_hasreview, "subject")\nprint("Third Join took ", time.time(

In [5]:
"""
#Performing the requested SQL expression with HASHJOIN from left to right
#Takes much longer than the above solution

t1 = time.time()
follows_friends = hashJoin(relation_df_list[relation_dict["wsdbm:follows"]], "object", relation_df_list[relation_dict["wsdbm:friendOf"]], "subject")
print("First Join took ", time.time()-t1, "seconds")

t1 = time.time()
follows_friends_likes = hashJoin(follows_friends, "object", relation_df_list[relation_dict["wsdbm:likes"]], "subject")
print("Second Join took ", time.time()-t1, "seconds")

t1 = time.time()
follows_friends_likes_hasreview = hashJoin(follows_friends_likes, "object", relation_df_list[relation_dict["rev:hasReview"]], "subject")
print("Third Join took ", time.time()-t1, "seconds")

follows_friends_likes_hasreview.show()
"""

'\n#Performing the requested SQL expression with HASHJOIN from left to right\n#Takes much longer than the above solution\n\nt1 = time.time()\nfollows_friends = hashJoin(relation_df_list[relation_dict["wsdbm:follows"]], "object", relation_df_list[relation_dict["wsdbm:friendOf"]], "subject")\nprint("First Join took ", time.time()-t1, "seconds")\n\nt1 = time.time()\nfollows_friends_likes = hashJoin(follows_friends, "object", relation_df_list[relation_dict["wsdbm:likes"]], "subject")\nprint("Second Join took ", time.time()-t1, "seconds")\n\nt1 = time.time()\nfollows_friends_likes_hasreview = hashJoin(follows_friends_likes, "object", relation_df_list[relation_dict["rev:hasReview"]], "subject")\nprint("Third Join took ", time.time()-t1, "seconds")\n\nfollows_friends_likes_hasreview.show()\n'

In [6]:
###SortMergeJoin

def sort_merge_join(df1, key1, df2, key2):

    #Sorting the left and right df
    df1_sorted = df1.sort(key1, key2)
    df2_sorted = df2.sort(key2, key1)
    
    left_index = 0
    right_index = 0
    
    #Using pandas to modify and iterate the df
    df1_pandas = df1_sorted.toPandas()
    df2_pandas = df2_sorted.toPandas()
    
    #Getting the df row lengths
    df1_row_len = len(df1_pandas.index)
    df2_row_len = len(df2_pandas.index)

    #Determining the size of the temporary/joined df
    counter = 0
    while right_index < df2_row_len and left_index < df1_row_len:
        key = min(df2_pandas.loc[right_index].at[key2], df1_pandas.loc[left_index].at[key1])
        r_counter = 0
        l_counter = 0
        while right_index < df2_row_len and key == df2_pandas.loc[right_index].at[key2]:
            r_counter += 1
            right_index += 1
        while left_index < df1_row_len and key == df1_pandas.loc[left_index].at[key1]:
            l_counter += 1
            left_index += 1
        counter += l_counter * r_counter
        
    #Defining column names for the returned/constructed df 
    #EXAMPLE: [subject, 1, 2, 3, ..., object]
    #Thus we can garantee multiple consecutive joines with joines on the (object, subject) pair
    col_list = df1.columns
    col_list2 = df2.columns
    col_list= col_list + col_list2
    col_len = len(col_list)
    col_list = [str(i) for i in range(0, col_len-1)]
    col_list[col_len-2] = "object"
    col_list[0] = "subject"
    
    #Creating temporary/join df that will be filled with the joined rows
    tmp_df = pd.DataFrame(index=np.arange(counter), columns = col_list)
    
    #Iterating through both sorted df at the same time and joining rows when there are key matches
    left_index = 0
    right_index = 0
    row_counter = 0
    while right_index < df2_row_len and left_index < df1_row_len:
        key = min(df2_pandas.loc[right_index].at[key2], df1_pandas.loc[left_index].at[key1])
        right_group = []
        while right_index < df2_row_len and key == df2_pandas.loc[right_index].at[key2]:
            right_group.append(list(df2_pandas.loc[right_index]))
            right_index += 1
        left_group = []
        while left_index < df1_row_len and key == df1_pandas.loc[left_index].at[key1]:
            left_group.append(list(df1_pandas.loc[left_index]))
            left_index += 1
        for i in right_group:
            for o in left_group:
                c = o[:-1]
                c = c + i
                tmp_df.loc[row_counter] = c
                row_counter += 1
    return spark.createDataFrame(tmp_df.astype(int))

In [7]:
#Performing the requested SQL expression with SORTMERGEJOIN from right to left
#Takes approx 25 Mins on i7-7700hq for 100k.txt
t1 = time.time()
likes_hasreview = sort_merge_join(relation_df_list_numbers[relation_dict["wsdbm:likes"]], "object", relation_df_list_numbers[relation_dict["rev:hasReview"]], "subject")
likes_hasreview.show()
print("First Join took ", time.time()-t1, "seconds")

t1 = time.time()
friends_likes_hasreview = sort_merge_join(relation_df_list_numbers[relation_dict["wsdbm:friendOf"]], "object", likes_hasreview, "subject")
print("Second Join took ", time.time()-t1, "seconds")

t1 = time.time()
follows_friends_likes_hasreview_reversed = sort_merge_join(relation_df_list_numbers[relation_dict["wsdbm:follows"]], "object", friends_likes_hasreview, "subject")
print("Third Join took ", time.time()-t1, "seconds")

follows_friends_likes_hasreview_reversed.show()

+-------+---+------+
|subject|  1|object|
+-------+---+------+
|    245|522|    40|
|   6377|522|    40|
|   8377|522|    40|
|   8948|522|    40|
|   9994|522|    40|
|  10924|522|    40|
|    245|522|    90|
|   6377|522|    90|
|   8377|522|    90|
|   8948|522|    90|
|   9994|522|    90|
|  10924|522|    90|
|    245|522|   127|
|   6377|522|   127|
|   8377|522|   127|
|   8948|522|   127|
|   9994|522|   127|
|  10924|522|   127|
|    245|522|   484|
|   6377|522|   484|
+-------+---+------+
only showing top 20 rows

First Join took  2.7328715324401855 seconds
Second Join took  30.896827459335327 seconds
Third Join took  848.6176447868347 seconds
+-------+---+----+-----+------+
|subject|  1|   2|    3|object|
+-------+---+----+-----+------+
|   5397| 22|2876| 3944|     7|
|   5867| 22|2876| 3944|     7|
|   6082| 22|2876| 3944|     7|
|   6310| 22|2876| 3944|     7|
|  11975| 22|2876| 3944|     7|
|  12310| 22|2876| 3944|     7|
|   5397| 22|8323| 9104|    11|
|   5867| 22|8323|

In [8]:
"""
#Performing the requested SQL expression with SORTMERGEJOIN from right to left
#Takes approx 25 Mins on i7-7700hq for 100k.txt
tracemalloc.start()
t1 = time.time()
likes_hasreview = sort_merge_join(relation_df_list_numbers[relation_dict["wsdbm:likes"]], "object", relation_df_list_numbers[relation_dict["rev:hasReview"]], "subject")
print("First Join took ", time.time()-t1, "seconds and ", tracemalloc.get_traced_memory(),"memory")
tracemalloc.stop()
tracemalloc.reset_peak()

tracemalloc.start()
t1 = time.time()
friends_likes_hasreview = sort_merge_join(relation_df_list_numbers[relation_dict["wsdbm:friendOf"]], "object", likes_hasreview, "subject")
print("Second Join took ", time.time()-t1, "seconds and ", tracemalloc.get_traced_memory(),"memory")
tracemalloc.stop()
tracemalloc.reset_peak()

tracemalloc.start()
t1 = time.time()
follows_friends_likes_hasreview_reversed = sort_merge_join(relation_df_list_numbers[relation_dict["wsdbm:follows"]], "object", friends_likes_hasreview, "subject")
print("Third Join took ", time.time()-t1, "seconds and ", tracemalloc.get_traced_memory(),"memory")
tracemalloc.stop()
tracemalloc.reset_peak()
follows_friends_likes_hasreview_reversed.show()
"""

'\n#Performing the requested SQL expression with SORTMERGEJOIN from right to left\n#Takes approx 25 Mins on i7-7700hq for 100k.txt\ntracemalloc.start()\nt1 = time.time()\nlikes_hasreview = sort_merge_join(relation_df_list_numbers[relation_dict["wsdbm:likes"]], "object", relation_df_list_numbers[relation_dict["rev:hasReview"]], "subject")\nprint("First Join took ", time.time()-t1, "seconds and ", tracemalloc.get_traced_memory(),"memory")\ntracemalloc.stop()\ntracemalloc.reset_peak()\n\ntracemalloc.start()\nt1 = time.time()\nfriends_likes_hasreview = sort_merge_join(relation_df_list_numbers[relation_dict["wsdbm:friendOf"]], "object", likes_hasreview, "subject")\nprint("Second Join took ", time.time()-t1, "seconds and ", tracemalloc.get_traced_memory(),"memory")\ntracemalloc.stop()\ntracemalloc.reset_peak()\n\ntracemalloc.start()\nt1 = time.time()\nfollows_friends_likes_hasreview_reversed = sort_merge_join(relation_df_list_numbers[relation_dict["wsdbm:follows"]], "object", friends_likes_h

In [9]:
#The advanced Join, resp. task c) of the submission is done in a separate python file as jupyter notebook restricts multiprocessing.