# CMU 10405/10605 Machine Learning with Large Datasets
### This notebook contains the demo code from 10-405/605 Lecture 3: Workflows for Map-Reduce Systems.

#### ** Part 1: Attach and Test Class helper Library **

#### (1a) Install class helper library into your Databricks CE workspace
- The class helper library is called "nose"
- You can install the library into your workspace following the following instructions:
 - Step 1: Click on "Workspace", then on the dropdown and select "Create" and "Library"

<img src="https://raw.githubusercontent.com/10605/HW_imgs/main/S24/Recitation1/img1.png" width="500" alt="Drawing" />

 - Step 2 Enter the name of the library by selecting "Upload Python Egg or PyPI" and entering "nose" in the "PyPI Name" field


 - Step 3 Make sure the checkbox for auto-attaching the library to your cluster is selected

#### ** Part 2: Test Spark functionality **

** (2a) Create a DataFrame and filter it **

When you run the next cell (with control-enter or shift-enter), you will see the following popup.

<img src="https://raw.githubusercontent.com/10605/HW_imgs/main/S24/Recitation1/img2.png" width="500" alt="Drawing" />

Select the click box and then "Launch and Run". The display at the top of your notebook will change to "Pending" or "Starting"

<img src="https://raw.githubusercontent.com/10605/HW_imgs/main/S24/Recitation1/img3.png" width="250" alt="Drawing" />

Note that it may take a few seconds to a few minutes to start your cluster. Once your cluster is running the display will changed to "Attached"

<img src="https://raw.githubusercontent.com/10605/HW_imgs/main/S24/Recitation1/img4.png" width="250" alt="Drawing" />

Congratulations! You just launched your Spark cluster in the cloud!

In [0]:
# # YOU CAN MOST LIKELY IGNORE THIS CELL. This is only of use for running this notebook locally.

# # THIS CELL DOES NOT NEED TO BE RUN ON DATABRICKS.
# # Note that Databricks already creates a SparkContext for you, so this cell can be skipped.
# import findspark
# findspark.init()
# import pyspark
# from pyspark.sql import SparkSession, SQLContext

# spark = SparkSession.builder \
#     .appName("hw") \
#     .getOrCreate()

# sc = spark.sparkContext
# sqlContext = SQLContext(sc)

# print("spark context started")

In [0]:
# Check that Spark is working
from pyspark.sql import Row
data = [('Alice', 1), ('Bob', 2), ('Bill', 4)]
df = sqlContext.createDataFrame(data, ['name', 'age'])
fil = df.filter(df.age > 3).collect()   # filter: Transformation; collect: Action.
print(fil)

# If the Spark job doesn't work properly this will raise an AssertionError
assert(fil == [Row(u'Bill', 4)])

[Row(name='Bill', age=4)]


# Upload Data to DBFS

Click 'File' -> 'Upload Data to DBFS...':

<img src="https://raw.githubusercontent.com/XuChentianye/Shared/main/upload_to_dbfs_1.png" width="300" alt="Drawing" />

Drag the *Data* folder containing the data files to the box shown below: 

<img src="https://raw.githubusercontent.com/XuChentianye/Shared/main/upload_to_dbfs_2.png" width="500" alt="Drawing" />

Note that all files uploaded to Databricks will have any hyphens ("-") in their filenames automatically replaced with underscores ("_").

In [0]:
# Change the root to your path
root = 'dbfs:/FileStore/shared_uploads/chentiax@andrew.cmu.edu/data/'

In [0]:
%fs ls dbfs:/FileStore/shared_uploads/chentiax@andrew.cmu.edu/data/


path,name,size,modificationTime
dbfs:/FileStore/shared_uploads/chentiax@andrew.cmu.edu/data/aeffect_train-1.txt,aeffect_train-1.txt,1456373,1738679576000
dbfs:/FileStore/shared_uploads/chentiax@andrew.cmu.edu/data/aeffect_train.txt,aeffect_train.txt,1456373,1738679279000
dbfs:/FileStore/shared_uploads/chentiax@andrew.cmu.edu/data/bluecorpus-1.txt,bluecorpus-1.txt,335049,1738679582000
dbfs:/FileStore/shared_uploads/chentiax@andrew.cmu.edu/data/bluecorpus.txt,bluecorpus.txt,335049,1738679284000
dbfs:/FileStore/shared_uploads/chentiax@andrew.cmu.edu/data/bluecorpus_plus_docids-1.txt,bluecorpus_plus_docids-1.txt,342717,1738679575000
dbfs:/FileStore/shared_uploads/chentiax@andrew.cmu.edu/data/bluecorpus_plus_docids.txt,bluecorpus_plus_docids.txt,342717,1738679278000
dbfs:/FileStore/shared_uploads/chentiax@andrew.cmu.edu/data/brown_nolines-1.txt,brown_nolines-1.txt,6026059,1738679586000
dbfs:/FileStore/shared_uploads/chentiax@andrew.cmu.edu/data/brown_nolines.txt,brown_nolines.txt,6026059,1738679289000
dbfs:/FileStore/shared_uploads/chentiax@andrew.cmu.edu/data/citeseer_graph-1.txt,citeseer_graph-1.txt,262236,1738679578000
dbfs:/FileStore/shared_uploads/chentiax@andrew.cmu.edu/data/citeseer_graph.txt,citeseer_graph.txt,262236,1738679280000


# History of Big ML
## See `ngram_query.py` (no Spark required)

# Workflows in Spark
# 0. Matrix Multiplication
## `matmul.py`
### a. Import Libs

In [0]:
import collections
import json
import operator

### b. Define Named Tuple and Load Data

In [0]:
# Define named tuple for sparse matrix entries
Mat = collections.namedtuple('Mat', ['row', 'col', 'w'])

# Load data from JSON file
dbfs_file_path = root + 'matmul.json'
local_file_path = "/tmp/matmul.json"
try:
    dbutils.fs.cp(dbfs_file_path, f"file:{local_file_path}")
    with open(local_file_path, 'r') as fp:
        d = json.load(fp)
        print(d)
except Exception as e:
    print(f"Error: {e}")


{'a': [[0, 0, 1.0], [0, 5, 1.0], [0, 6, 1.0], [0, 8, 1.0], [0, 16, 1.0], [0, 18, 1.0], [0, 25, 1.0], [0, 26, 1.0], [1, 0, 1.0], [1, 4, 1.0], [1, 6, 1.0], [1, 12, 1.0], [1, 15, 1.0], [1, 24, 1.0], [2, 2, 1.0], [2, 8, 1.0], [2, 9, 1.0], [2, 12, 1.0], [2, 16, 1.0], [2, 18, 1.0], [3, 8, 1.0], [3, 11, 1.0], [3, 12, 1.0], [3, 21, 1.0], [3, 27, 1.0], [4, 0, 1.0], [4, 15, 1.0], [4, 20, 1.0], [4, 22, 1.0], [4, 28, 1.0], [5, 0, 1.0], [5, 2, 1.0], [5, 7, 1.0], [5, 14, 1.0], [5, 19, 1.0], [5, 21, 1.0], [5, 24, 1.0], [5, 26, 1.0], [5, 27, 1.0], [6, 1, 1.0], [6, 5, 1.0], [6, 12, 1.0], [6, 21, 1.0], [6, 26, 1.0], [7, 2, 1.0], [7, 13, 1.0], [7, 14, 1.0], [7, 16, 1.0], [7, 19, 1.0], [7, 28, 1.0], [8, 3, 1.0], [8, 12, 1.0], [8, 13, 1.0], [8, 16, 1.0], [8, 20, 1.0], [8, 22, 1.0], [8, 24, 1.0], [8, 26, 1.0], [9, 11, 1.0], [9, 14, 1.0], [9, 15, 1.0], [9, 18, 1.0], [9, 23, 1.0], [9, 25, 1.0], [9, 26, 1.0], [10, 5, 1.0], [10, 7, 1.0], [10, 17, 1.0], [10, 22, 1.0], [10, 29, 1.0], [11, 0, 1.0], [11, 2, 1.0], [

### c. Create RDDs for Matrices A and B

In [0]:
# Convert JSON data to RDDs
a = sc.parallelize([Mat(*e) for e in d['a']])
b = sc.parallelize([Mat(*e) for e in d['b']])


### d. Compute Sparse Matrix Multiplication

In [0]:
# Step 1: Map matrix entries by column
a_by_col = a.map(lambda e: (e.col, e))
b_by_col = b.map(lambda e: (e.col, e))

# Step 2: Compute products of a[i,k] * b[j,k]
def aik_bjk_product(join_result):
    k, (aik, bjk) = join_result
    return ((aik.row, bjk.row), aik.w * bjk.w)

ak_bk_prods = (
    a_by_col
    .join(b_by_col)
    .map(aik_bjk_product)
)

# Step 3: Sum up products to compute dot products of C
dotprods = ak_bk_prods.reduceByKey(operator.add)


### e. Convert Results Back to Sparse Matrix

In [0]:
# Convert dot products back to sparse matrix entries
def asMatEntry(dotprod):
    (row, col), weight = dotprod
    return Mat(row, col, weight)

c = dotprods.map(asMatEntry)


### f. Collect Results and Verify

In [0]:
# Collect results and sort them for comparison
spark_ans = sorted(c.collect())
reference_ans = sorted([Mat(*e) for e in d['r']])

# Verify correctness of results
print('correct?', spark_ans == reference_ans)


correct? True


# 1. Word Counts and Phrase Finding
## `redvsblue.py`
### a. Import Libs

In [0]:
import collections
import functools
import math
import operator
import re

### b. Load Data

In [0]:
bg_lines = sc.textFile(root + "redcorpus.txt")
fg_lines = sc.textFile(root + "bluecorpus.txt")

### c. Define Tokenization Function

In [0]:
def tokenize(line):
    return re.findall('\w+', line.lower())

### d. Define Word Count Pipeline

In [0]:
# Generalized version of the 'wordcount' sequence of transformations 
def wc_pipe(lines):
    return lines.flatMap(tokenize).map(lambda x:(x, 1)).reduceByKey(operator.add)

### e. Compute Word Count & Vocabulary Sizes

In [0]:
fg_word_count = wc_pipe(fg_lines)
bg_word_count = wc_pipe(bg_lines)

fg_vocab = fg_word_count.keys().count()
bg_vocab = bg_word_count.keys().count()

### f. Join Word Counts

In [0]:
wc_pairs = fg_word_count.join(bg_word_count)

### g. Define Scoring Function

In [0]:
def score_counted_pair(join_output):
    word, (fg_n, bg_n) = join_output
    p1 = (fg_n + 1.0/fg_vocab) / (fg_vocab + 1.0)
    p2 = (bg_n + 1.0/bg_vocab) / (bg_vocab + 1.0)
    return word, math.log(p1 / p2)

### h. Compute Scores and Sort Results

In [0]:
result = wc_pairs.map(score_counted_pair)
reds = result.sortBy(lambda ws: ws[1], ascending=True)
blues = result.sortBy(lambda ws: ws[1], ascending=False)


In [0]:
n = 20

print(f'top {n} most red:')
for word, score in reds.take(n):
    print(word, score)
print()

print(f'top {n} most blue:')
for word, score in blues.take(n):
    print(word, score)
print()


top 20 most red:
marriage -4.603860230052482
paul -4.316178671600118
ron -4.142515567479694
immigration -3.910714591489994
rangers -3.665592990121236
freedom -3.578581971372792
vrwc -3.3779122312198435
guard -3.2601298523380073
conservatives -3.2601298523380073
terrorists -3.1266612538917395
security -3.077871258607549
religion -3.077809477737151
argument -3.077809477737151
heroes -3.0265781508826013
qaeda -3.0265165566734464
kill -2.9999100046991294
choice -2.9908806911320633
boo -2.972511137015176
positions -2.9724497502072804
v -2.9724497502072804

top 20 most blue:
registration 4.299685643807176
local 4.248392435385082
advertising 4.076542500828816
email 3.7463016128757234
registered 3.7243227692812653
800 3.4730091706338726
voter 3.4142608242487995
sent 3.2134990813257356
rnc 3.1757589394783126
company 3.1366295100101547
mail 3.1365384273512498
forms 3.1365384273512498
florida 3.00870576857036
info 3.00870576857036
posted 2.962186035764293
alcohol 2.9133961827064767
oregon 2.91339

## `phrases.py`
### a. Import Libs

In [0]:
import collections
import functools
import math
import operator
import re

from pprint import pprint

### b. Load Corpora

In [0]:
# Two corpora
bg_lines = sc.textFile(root + "redcorpus.txt")
fg_lines = sc.textFile(root + "bluecorpus.txt")


### c. Define Utility Functions

In [0]:
# Extract 'phrases' and tokens
def bigrams(line):
    tokens = re.findall('\w+', line.lower())
    for i in range(0, len(tokens) - 1):
        yield (tokens[i], tokens[i + 1])

def tokens(line):
    return re.findall('\w+', line.lower())

# Generalized version of the 'wordcount' sequence of transformations 
def wc_pipe(lines, preprocess):
    return lines.flatMap(preprocess).map(lambda x:(x, 1)).reduceByKey(operator.add)
    
# Generalized code to get vocabulary size from wordcount-like RDD
def voc_size_pipe(sc, wc):
    n = wc.keys().count()
    return sc.broadcast(n)
    
# Generalized code to get sum of counts from a wordcount-like RDD
def total_count_pipe(sc, wc):
    n = wc.values().reduce(operator.add)
    return sc.broadcast(n)

# Score by smoothed log odds of Pr(x|corpus1) / Pr(x|corpus2)
def score_counted_pair(join_output, n1, n2):
    x, (fg_k, bg_k) = join_output
    p1 = (fg_k + 1.0/n1.value) /  (n1.value + 1.0)
    p2 = (bg_k + 1.0/n2.value) /  (n2.value + 1.0)
    return x, math.log( p1 / p2 )

# Inspect results utility function
def inspect(rdd, n, msg):
    print(f'top {n} {msg}:')
    for x, score in rdd.sortBy(lambda kv: kv[1], ascending=False).take(n):
        print(x, score)
    print()
    print(f'bottom {n} {msg}:')
    for x, score in rdd.sortBy(lambda kv: kv[1], ascending=True).take(n):
        print(x, score)
    print()


### d. Word Count and Phrase Count

In [0]:
# Wordcount-like RDDs for phrases and words in foreground and background corpora
fg_phrase_count = wc_pipe(fg_lines, bigrams)
bg_phrase_count = wc_pipe(bg_lines, bigrams)
fg_word_count = wc_pipe(fg_lines, tokens)
bg_word_count = wc_pipe(bg_lines, tokens)


### e. Analyze Most Informative Words

In [0]:
word_pairs = fg_word_count.join(bg_word_count)
fg_word_v = voc_size_pipe(sc, fg_word_count)
bg_word_v = voc_size_pipe(sc, bg_word_count)

word_result = word_pairs.map(
    lambda p: score_counted_pair(p, fg_word_v, bg_word_v))

inspect(word_result, 10, 'foreground words')


top 10 foreground words:
registration 4.299685643807176
local 4.248392435385082
advertising 4.076542500828816
email 3.7463016128757234
registered 3.7243227692812653
800 3.4730091706338726
voter 3.4142608242487995
sent 3.2134990813257356
rnc 3.1757589394783126
company 3.1366295100101547

bottom 10 foreground words:
marriage -4.603860230052482
paul -4.316178671600118
ron -4.142515567479694
immigration -3.910714591489994
rangers -3.665592990121236
freedom -3.578581971372792
vrwc -3.3779122312198435
guard -3.2601298523380073
conservatives -3.2601298523380073
terrorists -3.1266612538917395



### f. Analyze Most Informative Phrases

In [0]:
phrase_pairs = fg_phrase_count.join(bg_phrase_count)
fg_phrase_v = voc_size_pipe(sc, fg_phrase_count)
bg_phrase_v = voc_size_pipe(sc, bg_phrase_count)

phrase_result = phrase_pairs.map(
    lambda p: score_counted_pair(p, fg_phrase_v, bg_phrase_v))

inspect(phrase_result, 10, 'foreground phrases')


top 10 foreground phrases:
('the', 'company') 2.9953770498667827
('thank', 'you') 2.946586952009818
('the', 'rnc') 2.946586952009818
('election', 'day') 2.8952937309149895
('had', 'a') 2.515804785758036
('john', 'kerry') 2.515804785758036
('like', 'he') 2.3487510906222697
('it', 'can') 2.3487510906222697
('this', 'group') 2.3487510906222697
('vote', 'in') 2.253454397854922

bottom 10 foreground phrases:
('and', 'ask') -4.479906757295156
('way', 'they') -3.7626623903994036
('are', 'those') -3.73796979564649
('dream', 'of') -3.712652006414399
('that', 'never') -3.712652006414399
('the', 'vrwc') -3.3449275596614476
('the', 'left') -3.1401333932563147
('never', 'have') -3.0936134409404956
('al', 'qaeda') -2.993530129017142
('can', 'find') -2.8216801607753776



### g. Incorporate Phraseness

In [0]:
phrase_dicts = phrase_result.map(
    lambda pair: dict(
        phrase=pair[0],
        x=pair[0][0],
        y=pair[0][1],
        redness=pair[1]))

def join_in(dict_rdd, join_slot, join_rdd, new_slot):
    by_join_key = dict_rdd.map(lambda d: (d[join_slot], d))
    with_joined_info = by_join_key.join(join_rdd)
    return with_joined_info.values().map(
        lambda join_pair: {new_slot:join_pair[1], **join_pair[0]})

phrase_dicts = join_in(phrase_dicts, 'phrase', fg_phrase_count, 'phrase_count')
phrase_dicts = join_in(phrase_dicts, 'x', fg_word_count, 'x_count')
phrase_dicts = join_in(phrase_dicts, 'y', fg_word_count, 'y_count')

pprint(phrase_dicts.take(10))


[{'phrase': ('there', 'are'),
  'phrase_count': 27,
  'redness': -0.8964147542823403,
  'x': 'there',
  'x_count': 165,
  'y': 'are',
  'y_count': 336},
 {'phrase': ('s', 'are'),
  'phrase_count': 2,
  'redness': 0.6440143919931691,
  'x': 's',
  'x_count': 590,
  'y': 'are',
  'y_count': 336},
 {'phrase': ('we', 'are'),
  'phrase_count': 22,
  'redness': -1.1477289270188036,
  'x': 'we',
  'x_count': 368,
  'y': 'are',
  'y_count': 336},
 {'phrase': ('when', 'are'),
  'phrase_count': 1,
  'redness': -0.04911886322055696,
  'x': 'when',
  'x_count': 112,
  'y': 'are',
  'y_count': 336},
 {'phrase': ('democrats', 'are'),
  'phrase_count': 1,
  'redness': -2.6881490293418784,
  'x': 'democrats',
  'x_count': 32,
  'y': 'are',
  'y_count': 336},
 {'phrase': ('companies', 'are'),
  'phrase_count': 1,
  'redness': -0.7422514173754531,
  'x': 'companies',
  'x_count': 20,
  'y': 'are',
  'y_count': 336},
 {'phrase': ('and', 'are'),
  'phrase_count': 1,
  'redness': -1.147711649967727,
  'x':

### h. Compute Phraseness Scores

In [0]:
def add_computed_field(dict_rdd, new_slot, fn):
    return dict_rdd.map(
        lambda d: {new_slot:fn(d), **d})

fg_word_v = voc_size_pipe(sc, fg_word_count)
fg_word_n = total_count_pipe(sc, fg_word_count)
fg_phrase_v = voc_size_pipe(sc, fg_phrase_count)
fg_phrase_n = total_count_pipe(sc, fg_phrase_count)

def score_phrasiness(d, word_v, word_n, phrase_v, phrase_n):
    fx = d['x_count']
    fy = d['y_count']
    fxy = d['phrase_count']
    px = (fx + 1.0/word_v.value) / (word_n.value + 1.0)
    py = (fy + 1.0/word_v.value) / (word_n.value + 1.0)
    pxy = (fxy + 1.0/phrase_v.value) / (phrase_n.value + 1.0)
    return math.log(pxy / (px * py))

phrase_dicts = add_computed_field(
    phrase_dicts,
    'phraseness',
    lambda d: score_phrasiness(d, fg_word_v, fg_word_n, fg_phrase_v, fg_phrase_n))


### i. Filter and Inspect Good Phrases

In [0]:
good_phrase_dicts = (
    phrase_dicts
    .map(lambda d: (d['phrase'], d['phraseness']))
    .filter(lambda kv: kv[1]>2.5)
)

good_fg_phrase_count = fg_phrase_count.join(good_phrase_dicts).map(lambda kv:(kv[0],kv[1][0]))
good_phrase_pairs = good_fg_phrase_count.join(bg_phrase_count)

good_phrase_result = good_phrase_pairs.map(lambda p: score_counted_pair(p, fg_phrase_v, bg_phrase_v))

inspect(good_phrase_result, 10, 'foreground phrases')


top 10 foreground phrases:
('thank', 'you') 2.946586952009818
('the', 'rnc') 2.946586952009818
('election', 'day') 2.8952937309149895
('john', 'kerry') 2.515804785758036
('this', 'group') 2.3487510906222697
('like', 'he') 2.3487510906222697
('las', 'vegas') 2.2534411640106735
('also', 'be') 2.2534411640106735
('looks', 'like') 2.2534411640106735
('presidential', 'election') 2.2534411640106735

bottom 10 foreground phrases:
('dream', 'of') -3.712652006414399
('the', 'vrwc') -3.3449275596614476
('al', 'qaeda') -2.993530129017142
('a', 'rock') -2.757141761526959
('bin', 'laden') -2.757141761526959
('why', 'not') -2.5340177575233005
('interests', 'of') -2.5339986977692113
('national', 'guard') -2.5339986977692113
('one', 'man') -2.5339986977692113
('those', 'who') -2.485230333623989



# 2. PageRank Variants

In [0]:
import time
import operator

## PageRank SLOW
### a. Load and Preprocess Data

In [0]:
RESET = 0.15
NUM_ITERATIONS = 5

lines = sc.textFile(root + "citeseer_graph.txt")
edges = lines.map(lambda line:tuple(line.split()))
nodes = edges.keys().distinct()
outlinks_by_node = edges.groupByKey().mapValues(lambda iter:list(iter)).cache()

### b. Initialize Variables

In [0]:
pagerank_scores = nodes.map(lambda node:(node,1.0))

def messages(page_info):
    src, (current_score, outlinks) = page_info
    delta_from_src = (1 - RESET) * current_score/len(outlinks)
    return [(dst, delta_from_src) for dst in outlinks]

### c. Run PageRank Algorithm

In [0]:
for t in range(NUM_ITERATIONS):
    
    print(f'iteration {t + 1} of {NUM_ITERATIONS}')

    page_hops = pagerank_scores.join(outlinks_by_node).flatMap(messages)
    resets = pagerank_scores.mapValues(lambda _: RESET)
    pagerank_scores = page_hops.union(resets).reduceByKey(lambda a, b:a+b)

start = time.time()
pr_items = pagerank_scores.collect()
print('scores collected:',time.time() - start,'sec')

iteration 1 of 5
iteration 2 of 5
iteration 3 of 5
iteration 4 of 5
iteration 5 of 5
scores collected: 38.00826811790466 sec


In [0]:
pr_items = sorted(pr_items)
print(pr_items[:3])
print(pr_items[-3:])

[('100157', 3.2322227078356076), ('100598', 0.6282520410223835), ('101570', 1.0)]
[('zhu98line', 0.9464543724279834), ('zini01caselp', 1.8369012014003159), ('zunino01representing', 1.7840892358274425)]


## PageRank OPT 1
### a. Load and Preprocess Data

In [0]:
RESET = 0.15
NUM_ITERATIONS = 5

lines = sc.textFile(root + "citeseer_graph.txt")
# Parse edges and extract nodes
edges = lines.map(lambda line: tuple(line.split()))
nodes = edges.keys().distinct()
outlinks_by_node = edges.groupByKey().mapValues(lambda iter: list(iter)).cache()


### b. Initialize Variables

In [0]:
# Initialize reset scores
resets = nodes.map(lambda node: (node, RESET))

# Initialize PageRank scores
pagerank_scores = {node: 1.0 for node in nodes.collect()}

# Define function to calculate outgoing messages for PageRank
def make_message_mapper(pagerank_scores):
    def outgoing_msgs_for_page(node_outlink_pair):
        node, outlinks = node_outlink_pair
        current_score = pagerank_scores[node]
        delta = (1 - RESET) * current_score / len(outlinks)
        return [(dst, delta) for dst in outlinks]
    return outgoing_msgs_for_page


### c. Run PageRank Algorithm

In [0]:
start = time.time()

for t in range(NUM_ITERATIONS):
    # Monitor progress during iterations
    scores = pagerank_scores.values()
    print(f'iteration {t + 1} of {NUM_ITERATIONS} time {time.time() - start}', 
          f'len {len(scores)} max {max(scores)} min {min(scores)} mean {sum(scores) / len(scores)}')

    # Calculate new PageRank scores
    page_hops = outlinks_by_node.flatMap(make_message_mapper(pagerank_scores))
    new_scores = page_hops.union(resets).reduceByKey(lambda a, b: a + b)
    pagerank_scores = dict(new_scores.collect())

print('scores collected:', time.time() - start, 'sec')


iteration 1 of 10 time 0.00010848045349121094 len 3264 max 1.0 min 1.0 mean 1.0
iteration 2 of 10 time 1.6913073062896729 len 3264 max 23.102023809523807 min 0.15858585858585858 mean 0.9999999999999949
iteration 3 of 10 time 2.7930562496185303 len 3264 max 15.090175658108501 min 0.25811109737285187 mean 0.9999999999999989
iteration 4 of 10 time 3.6397438049316406 len 3264 max 18.386811935049895 min 0.2538220541980962 mean 1.000000000000001
iteration 5 of 10 time 4.2982401847839355 len 3264 max 16.688147103654313 min 0.25297874109868224 mean 0.999999999999993
iteration 6 of 10 time 4.904214382171631 len 3264 max 17.699839279444824 min 0.2627073619811465 mean 0.9999999999999997
iteration 7 of 10 time 5.531569957733154 len 3264 max 17.125850463122426 min 0.26203268598358687 mean 0.999999999999999
iteration 8 of 10 time 6.102543592453003 len 3264 max 17.563852395825087 min 0.2666937453039226 mean 1.0000000000000033
iteration 9 of 10 time 6.710574388504028 len 3264 max 17.330013037591854 mi

In [0]:
# Sort and display top and bottom scores
pr_items = sorted(pagerank_scores.items())
print(pr_items[:3])  # Top 3 results
print(pr_items[-3:])  # Bottom 3 results


[('100157', 3.0562646281949704), ('100598', 0.639154642276791), ('101570', 1.0)]
[('zhu98line', 0.9744159509929018), ('zini01caselp', 1.7892770350215748), ('zunino01representing', 1.7337550146261123)]


## PageRank OPT 2
### a. Load and Preprocess Data

In [0]:
RESET = 0.15
NUM_ITERATIONS = 5

lines = sc.textFile(root + "citeseer_graph.txt")
# Parse edges and extract nodes
edges = lines.map(lambda line: tuple(line.split()))
nodes = edges.keys().distinct()
outlinks_by_node = edges.groupByKey().mapValues(lambda iter: list(iter)).cache()

### b. Initialize Variables

In [0]:
# keep these in memory
pagerank_scores = {node:1.0 for node in nodes.collect()}

### c. Run PageRank Algorithm

In [0]:
# pass the pagerank_scores to the workers
# with an explicit closure
def make_message_mapper(pagerank_scores):
    def outgoing_msgs_for_page(node_outlink_pair):
        node, outlinks = node_outlink_pair
        current_score = pagerank_scores[node]
        delta = (1 - RESET) * current_score/len(outlinks)
        return [(dst, delta) for dst in outlinks]
    return outgoing_msgs_for_page

start = time.time()
for t in range(NUM_ITERATIONS):
    
    # monitor progress
    scores = pagerank_scores.values()
    print(f'iteration {t + 1} of {NUM_ITERATIONS} time {time.time() - start}', 
          f'len {len(scores)} max {max(scores)} min {min(scores)} mean {sum(scores)/len(scores)}')

    page_hops = outlinks_by_node.flatMap(make_message_mapper(pagerank_scores))
    new_scores = page_hops.combineByKey(
        createCombiner=lambda delta: delta + RESET,
        mergeValue=operator.add,
        mergeCombiners=operator.add)
    pagerank_scores = dict(new_scores.collect())

print('scores collected:',time.time() - start,'sec')


iteration 1 of 10 time 0.00011277198791503906 len 3264 max 25.93490875620072 min 0.3308757341929992 mean 1.3452160572028102
iteration 2 of 10 time 0.2828865051269531 len 3264 max 26.419294983256023 min 0.33530587633879755 mean 1.3579097515635656
iteration 3 of 10 time 0.608527660369873 len 3264 max 26.69166835884662 min 0.3386224723993816 mean 1.3686993917702044
iteration 4 of 10 time 0.9296083450317383 len 3264 max 27.03069213612026 min 0.34170005033104534 mean 1.3778705859458524
iteration 5 of 10 time 1.2646379470825195 len 3264 max 27.238475164017558 min 0.3441369947337696 mean 1.3856661009951512
iteration 6 of 10 time 1.554229974746704 len 3264 max 27.478403576727537 min 0.34631828902719647 mean 1.3922922887870524
iteration 7 of 10 time 1.890955924987793 len 3264 max 27.633993468971397 min 0.3480967362365355 mean 1.3979245484101714
iteration 8 of 10 time 2.227386236190796 len 3264 max 27.804882236080495 min 0.34965905763527716 mean 1.4027119690898227
iteration 9 of 10 time 2.549894

In [0]:
pr_items = sorted(pagerank_scores.items())
print(pr_items[:3])
print(pr_items[-3:])

[('100157', 4.781652130050926), ('100598', 1.115376342327226), ('101570', 1.0)]
[('zhu98line', 1.1459769912864102), ('zini01caselp', 2.8894270730268703), ('zunino01representing', 2.7714805865702004)]
