## Reading Data from Disk and Calculate Collab Distance

In [46]:
from pyspark.sql import Row
from pyspark.sql.types import *
import queue

### Read from Disk

In [47]:
author_df = spark.read.parquet("Data/authors-cat:stat.ML.parquet")
collab_df = spark.read.parquet("Data/collab-cat:stat.ML.parquet")

In [52]:
# id = 2 and id = 90
# author1 = 2
# author2 = 3338
# depth_max = 10

def collab_dist(author1, author2, depth_max = 10):
    
    # BFS
    fifo = queue.Queue()
    fifo.put(author1)

    # To track depth to stop search at max depth
    depth = queue.Queue()
    depth.put(0)

    # To find depth and the path backwards
    parents = {author1 : -1}

    while not fifo.empty():
        a = fifo.get() ; d = depth.get()
        print("AuthorID:", a)
        if a == author2:
            break

        if d >= depth_max:
            print("Max depth of %i is reached." % d)
            break

        # "src" in collab item is equal to author1, look for the authors in "dest"
        df_dest = collab_df.filter(collab_df.src == a).select(collab_df.columns[1])
        for i in [int(row.dest) for row in df_dest.collect()]:
            print("Next author: %i" % i)
            if i not in parents: #if already visited, don't add the queue
                fifo.put(i); depth.put(d + 1)
                parents[i] = a

        # "dest" in collab item is equal to author1, look for the authors in "src"
        df_src = collab_df.filter(collab_df.dest == a).select(collab_df.columns[0])
        for i in [int(row.src) for row in df_src.collect()]:
            print("Next author: %i" % i)
            if i not in parents:
                fifo.put(i); depth.put(d + 1)
                parents[i] = a

    # Calculate the depth.
    dist = 0           
    while parents[a] > 0:
        dist = dist + 1
        a = parents[a]
    
    return (dist, d, parents)
    

# print("Parents: %s" % parents)
# print("Dist:", dist)

In [54]:
collab_dist(2,700)

AuthorID: 2
Next author: 1
Next author: 3338
Next author: 671
AuthorID: 1
Next author: 2
Next author: 215
Next author: 700
AuthorID: 3338
Next author: 671
Next author: 2
AuthorID: 671
Next author: 2
Next author: 3338
AuthorID: 215
Next author: 216
Next author: 217
Next author: 881
Next author: 1465
Next author: 231
Next author: 1465
Next author: 3204
Next author: 3484
Next author: 145
Next author: 700
Next author: 701
Next author: 217
Next author: 701
Next author: 549
Next author: 231
Next author: 847
Next author: 270
Next author: 85
Next author: 880
Next author: 549
Next author: 231
Next author: 85
Next author: 1464
Next author: 700
Next author: 1
Next author: 549
Next author: 549
Next author: 2113
Next author: 231
Next author: 1464
Next author: 1175
Next author: 239
Next author: 2952
Next author: 2953
Next author: 740
Next author: 863
Next author: 2979
Next author: 700
Next author: 414
Next author: 889
AuthorID: 700


(2,
 2,
 {2: -1,
  1: 2,
  3338: 2,
  671: 2,
  215: 1,
  700: 1,
  216: 215,
  217: 215,
  881: 215,
  1465: 215,
  231: 215,
  3204: 215,
  3484: 215,
  145: 215,
  701: 215,
  549: 215,
  847: 215,
  270: 215,
  85: 215,
  880: 215,
  1464: 215,
  2113: 215,
  1175: 215,
  239: 215,
  2952: 215,
  2953: 215,
  740: 215,
  863: 215,
  2979: 215,
  414: 215,
  889: 215})

In [36]:
collab_dest = collab_df.filter(collab_df.dest == 2).select(collab_df.columns[0]).collect()
collab_dest[1].src
[int(row.mvv) for row in mvv_list.collect()]

3338

In [28]:
collab_df.filter(collab_df.src == 2).collect()

[]

In [42]:
df_dest = collab_df.filter(collab_df.src == a).select(collab_df.columns[1])
[int(row.src) for row in df_dest.collect()]

[]