## Project 2
### Sneha Ganganna (5579362) Aishwarya Dinni (5653414)

In [1]:
import csv
import sys
import time
import pyspark
import pandas as pd
import hashlib
import time
pd.set_option('display.max_rows', None)

a)The first task is to pre-process the data. It is required to partition the triples into relations by using vertically partitioned approach, namely for each distinct property, set up a table with ’Subject’ and ’Object’ as columns. Assume there are n properties in the triple store, then you need to construct n tables. One optional step before the pre-processing, is to build up a dictionary of all strings occurring in the triple store and transform the string values into integers. Since the comparison of integer is much faster than the comparison on string values, this optional step helps improve the efficiency of the join algorithm

In [2]:
# Function to fetch subject,object,property
def pre_process_data(df):
    
    df[0] = df[0].split(":")[1].strip()
    df[1] = df[1].split(":")[1].strip()
    df[2] = df[2].split(":")[1][:-1].strip() if len(df[2].split(":")) > 1 else df[2][:-1].strip()
    
    return df

In [3]:
# read 100k.txt into a pandas dataframe
rawDataDf = pd.read_csv("100k.txt", 
                     header = None, 
                     sep="\t", 
                     quotechar='"', 
                     skipinitialspace=True, 
                     names=['subject','property','object'])

rawDataDf = rawDataDf.apply(pre_process_data, axis=1)

In [4]:
print(rawDataDf.head())

  subject       property     object
0   City0  parentCountry  Country20
1   City1  parentCountry   Country0
2   City2  parentCountry   Country1
3   City3  parentCountry   Country6
4   City4  parentCountry  Country15


In [5]:
# group the DataFrame raw_df by the values in the "property" column, creating separate groups for each unique property value.
dictionaryDf = dict(tuple(rawDataDf.groupby('property', as_index=False)))


# Remove the "property" column from every DataFrame in the dictionary
dictionaryDf = {key: df.drop("property", axis=1).reset_index(drop=True) for key, df in dictionaryDf.items()}


## Sort Merge Join

In [6]:
def sortMergeJoin(leftDf, leftKey, rightDf, rightKey):
    
    # Perform Sorting based on keys for both DataFrames
    sortedLeftDf = leftDf.sort_values(by=leftKey, ascending=True)
    sortedRightDf = rightDf.sort_values(by=rightKey, ascending=True)
    
    # Merge the sorted DataFrames based on the specified keys using an inner join
    # The 'pd.merge' operation combines the DataFrames using the common 'leftKey' and 'rightKey' columns.
    # The 'how='inner'' parameter ensures that only common rows are included in the result.
    resultsDf = pd.merge(sortedLeftDf, sortedRightDf, left_on=leftKey, right_on=rightKey, how='inner')
   
    # The merged DataFrame containing the common rows based on the specified keys
    return resultsDf

In [7]:
# Sort Merge Join Results
start_time = time.time()


# Sort Merge Join between follows and friendOf DataFrames
# followsObjectect_friendOfSubjectect sort merge join Df
followsObject_friendOfSubject_smj_df = sortMergeJoin(
    dictionaryDf["follows"],"object",
    dictionaryDf["friendOf"],"subject")

# Sort Merge Join between likes and hasReview DataFrames
# likesObject_hasReviewSubject_sort merge join Df
likesObject_hasReviewSubject_smj_df = sortMergeJoin(
    dictionaryDf["likes"],"object",
    dictionaryDf["hasReview"],"subject")

# Sort Merge Join between the results of the previous two joins
# friendsOfObject_likesSubject_sort merge join Df
friendsOfObject_likesSubject_smj_df = sortMergeJoin(
    followsObject_friendOfSubject_smj_df,"object_y",
    likesObject_hasReviewSubject_smj_df,"subject_x")

# Extract only the required columns and rename them for clarity
# resultingDf= sortMergeJoin_QueryResult df
sortMergeJoin_QueryResult_df = friendsOfObject_likesSubject_smj_df[["subject_x_x",
                                     "object_x_x",
                                     "object_y_x",
                                     "object_x_y",
                                     "object_y_y"]]

sortMergeJoin_QueryResult_df.rename(columns = {'subject_x_x':'follows.subject',
                                       'object_x_x':'follows.object',
                                       'object_y_x':'friendOf.object',
                                       'object_x_y':'likes.object',
                                       'object_y_y':' hasReview.object'}, inplace = True)

end_time = time.time()

A value is trying to be set on a copy of a slice from a DataFrame

See the caveats in the documentation: https://pandas.pydata.org/pandas-docs/stable/user_guide/indexing.html#returning-a-view-versus-a-copy
  sortMergeJoin_QueryResult_df.rename(columns = {'subject_x_x':'follows.subject',


## Sort Merge Join Time Cost

In [8]:
print("Execution time for Sort merge Join: %s seconds" %(end_time - start_time))

Execution time for Sort merge Join: 5.063119649887085 seconds


In [9]:
print(sortMergeJoin_QueryResult_df.head())

  follows.subject follows.object friendOf.object likes.object   
0         User454        User964         User104     Product0  \
1         User454        User964         User104     Product0   
2         User454        User964         User104     Product0   
3         User454        User964         User104     Product0   
4         User454        User964         User104     Product0   

   hasReview.object  
0         Review478  
1        Review1458  
2        Review1452  
3        Review1392  
4        Review1344  


## Hash Join

In [10]:
def hashJoin(df1, key1, df2, key2):
    # Create hash keys for both DataFrames based on the specified keys
    df1["hash_key"] = df1[key1].apply(hash)
    df2["hash_key"] = df2[key2].apply(hash)
    
    # Group df1 based on the hash keys to improve join performance
    df1Partitions = dict(tuple(df1.groupby('hash_key')))
    for key in df1Partitions:
        df1Partitions[key].reset_index(drop = True, inplace = True)
    
     # Get the keys of the grouped DataFrame partitions
    dictKeys = df1Partitions.keys()
    resultAsList = []
    
    # Perform the hash join operation
    for i in range(len(df2)):
        if (df2.iloc[i]['hash_key']) in dictKeys:
            key = df2.iloc[i]['hash_key']
            joinRow = df2.iloc[[i]]
            joinedRow = pd.merge(df1Partitions[key],joinRow, on='hash_key',  how='left')
            resultAsList.append(joinedRow.to_dict('list'))
    
    listOfJoinedPartitions = []
    for dictionary in resultAsList:
        listOfJoinedPartitions.append(pd.DataFrame(dictionary))
    
    finalDf = pd.concat(listOfJoinedPartitions)
    finalDf = finalDf.drop('hash_key',axis=1)
    
    return finalDf

In [11]:
#Hash Join Results
start_time = time.time()

# Perform the hash join operations between 'follows' and 'friendOf' DataFrames
# followsObject_friendOfSubject_hashJoin_dataframe
followsObject_friendOfSubject_hj_df = hashJoin(
    dictionaryDf['follows'], 'object',
    dictionaryDf['friendOf'], 'subject')

# Perform the hash join operations between 'likes' and 'hasReview' DataFrames
#likesObject_hasReviewSubject_ hashJoin Df
likesObject_hasReviewSubject_hj_df = hashJoin(
    dictionaryDf['likes'], 'object',
    dictionaryDf['hasReview'], 'subject')

# Perform the hash join operations between the results of previous two joins
friendsOfObject_likesSubject_hj_df = hashJoin(
    followsObject_friendOfSubject_hj_df, 'object_y',
    likesObject_hasReviewSubject_hj_df, 'subject_x')

hashJoin_QueryResult_df = friendsOfObject_likesSubject_hj_df[["subject_x_x",
                                     "object_x_x",
                                     "object_y_x",
                                     "object_x_y",
                                     "object_y_y"]]

hashJoin_QueryResult_df.rename(columns = {'subject_x_x': 'follows.subject',
                                          'object_x_x': 'follows.object',
                                          'object_y_x': 'friendOf.object',
                                          'object_x_y': 'likes.object',
                                          'object_y_y': 'hasReview.object'}, inplace = True)
                               
end_time = time.time()

A value is trying to be set on a copy of a slice from a DataFrame

See the caveats in the documentation: https://pandas.pydata.org/pandas-docs/stable/user_guide/indexing.html#returning-a-view-versus-a-copy
  hashJoin_QueryResult_df.rename(columns = {'subject_x_x': 'follows.subject',


## Hash Join Time Cost

In [12]:
print("Execution time for Hash Join: %s seconds" %(end_time - start_time))

Execution time for Hash Join: 122.14708161354065 seconds


In [13]:
print(hashJoin_QueryResult_df.head())

  follows.subject follows.object friendOf.object likes.object hasReview.object
0         User630          User9          User20     Product0         Review24
1          User26         User57          User20     Product0         Review24
2         User197         User57          User20     Product0         Review24
3         User351         User57          User20     Product0         Review24
4         User396         User57          User20     Product0         Review24


## Improved Hash Join 

C) The third task is to design and implement an improvement algorithm regarding the running time.
There is no restrictions on the approaches. Possible candidates are: use radix join algorithm, use
a different hash function or hashing scheme, partition the data before the join operation, or use
parallel sorting algorithms. Other options are for instance building indexes on the data before the
query evaluation. Of course you might even invent a completely new join algorithm. In summary
everything is allowed as long as the result is correct. Describe your improvement approach in the
report.

In [14]:
def improvedHashJoin(df1, key1, df2, key2):
    
    df1["hash_key"] = df1[key1].apply(hash)
    df2["hash_key"] = df2[key2].apply(hash)
    
    df1Partitions = dict(tuple(df1.groupby('hash_key')))
    for key in df1Partitions:
        df1Partitions[key].reset_index(drop = True, inplace = True)

    df2Partitions = dict(tuple(df2.groupby('hash_key')))
    for key in df2Partitions:
        df2Partitions[key].reset_index(drop = True, inplace = True)
    
    resultAsList = []
    for key_2 in df2Partitions:
        for key_1 in df1Partitions:
            if key_1 == key_2:
                joinedPartitions = pd.merge(df1Partitions[key_1],df2Partitions[key_2], on='hash_key',  how='left')
                resultAsList.append(joinedPartitions.to_dict('list'))
    
    listOfJoinedPartitions = []
    for dictionary in resultAsList:
        listOfJoinedPartitions.append(pd.DataFrame(dictionary))
    
    finalDf = pd.concat(listOfJoinedPartitions)
    finalDf = finalDf.drop('hash_key',axis=1)
    return finalDf

In [15]:
#Improved Hash Join Results
start_time = time.time()

# Perform the improved hash join operations between 'follows' and 'friendOf' DataFrames
followsObject_friendOfSubject_ihj_df = improvedHashJoin(
    dictionaryDf['follows'], 'object',
    dictionaryDf['friendOf'], 'subject')

# Perform the improved hash join operations between 'likes' and 'hasReview' DataFrames
likesObject_hasReviewSubject_ihj_df = improvedHashJoin(
    dictionaryDf['likes'], 'object',
    dictionaryDf['hasReview'], 'subject')

# Perform the improved hash join operations between the results of the previous two joins
friendsOfObject_likesSubject_ihj_df = improvedHashJoin(
    followsObject_friendOfSubject_ihj_df, 'object_y',
    likesObject_hasReviewSubject_ihj_df, 'subject_x')

# Extract only the required columns and rename them for clarity
improvedHashJoin_QueryResult_df = friendsOfObject_likesSubject_ihj_df[["subject_x_x",
                                     "object_x_x",
                                     "object_y_x",
                                     "object_x_y",
                                     "object_y_y"]]

improvedHashJoin_QueryResult_df.rename(columns = {'subject_x_x': 'follows.subject',
                                          'object_x_x': 'follows.object',
                                          'object_y_x': 'friendOf.object',
                                          'object_x_y': 'likes.object',
                                          'object_y_y': 'hasReview.object'}, inplace = True)
                               
end_time = time.time()

A value is trying to be set on a copy of a slice from a DataFrame

See the caveats in the documentation: https://pandas.pydata.org/pandas-docs/stable/user_guide/indexing.html#returning-a-view-versus-a-copy
  improvedHashJoin_QueryResult_df.rename(columns = {'subject_x_x': 'follows.subject',


## Improved Hash Join Time Cost

In [16]:
print("Execution time for Improved Hash Join: %s seconds" %(end_time - start_time))

Execution time for Improved Hash Join: 49.30752444267273 seconds


In [17]:
print(improvedHashJoin_QueryResult_df.head())

  follows.subject follows.object friendOf.object likes.object hasReview.object
0          User75        User250         User602   Product110         Review37
1          User75        User250         User602   Product110        Review218
2          User75        User250         User602   Product110        Review219
3          User75        User250         User602   Product110        Review459
4          User75        User250         User602   Product110        Review584
