# Problem 1, 30 points: $On the Flajolet-Martin Algorithm$


# Problem 2, 40 points: $Bloom Filters$

In [1]:
#download the dict and print the first 15 words

import nltk
nltk.download('words')
from nltk.corpus import words

word_list = words.words()
print(f'Dictionary length: {len(word_list)}')
print(word_list[:15])


Dictionary length: 236736
['A', 'a', 'aa', 'aal', 'aalii', 'aam', 'Aani', 'aardvark', 'aardwolf', 'Aaron', 'Aaronic', 'Aaronical', 'Aaronite', 'Aaronitic', 'Aaru']


[nltk_data] Downloading package words to /Users/zhengwan/nltk_data...
[nltk_data]   Package words is already up-to-date!


In [2]:
from nltk.corpus import movie_reviews
nltk.download('movie_reviews')

neg_reviews = []
pos_reviews = []

for fileid in movie_reviews.fileids('neg'):
    neg_reviews.extend(movie_reviews.words(fileid))

for fileid in movie_reviews.fileids('pos'):
    pos_reviews.extend(movie_reviews.words(fileid))


[nltk_data] Downloading package movie_reviews to
[nltk_data]     /Users/zhengwan/nltk_data...
[nltk_data]   Package movie_reviews is already up-to-date!


## Question 1

In [3]:
# 1. 
from bloom_filter import BloomFilter

# Initialize Bloom Filter 
word_filter = BloomFilter(max_elements=236736)
type(word_filter)


bloom_filter.bloom_filter.BloomFilter

In [4]:
# Add words from 'word_list' to the Bloom filter
for word in word_list:
    word_filter.add(word)

# Create a Python set built from the same list of words in the English dictionary.
word_set = set(word_list)

#test
'a' in word_filter, 'a' in word_set

(True, True)

## Question 2

In [5]:
from sys import getsizeof
print(f'the size of word_list: {getsizeof(word_list)}')
print(f'the size of word_filter: {getsizeof(word_filter)}')
print(f'the size of word_set: {getsizeof(word_set)}')

the size of word_list: 2115944
the size of word_filter: 48
the size of word_set: 8388824


From the above answer, we can see clearly that the usage of word_filter is more efficient storage-wise compared to list and set.

## Question 3

In [6]:
#How fast is the main operation
#use the timeit function with 3 repetitions and 1000 loops per repetition as a basic benchmark

%timeit -r 3 "California" in word_filter


14.6 µs ± 132 ns per loop (mean ± std. dev. of 3 runs, 100,000 loops each)


In [7]:
3.
%timeit -r 3 "California" in word_list
%timeit -r 3 "California" in word_set

'''
comment:
the speed of our bloom-filter beats the other two

'''


470 µs ± 24.5 µs per loop (mean ± std. dev. of 3 runs, 1,000 loops each)
57 ns ± 0.695 ns per loop (mean ± std. dev. of 3 runs, 10,000,000 loops each)


'\ncomment:\nthe speed of our bloom-filter beats the other two\n\n'

## Question 4

In [8]:
# 4. 
def check(lst: list, word_filter):
    '''
    A function that takes as arguments a list of words
    and any of the 3 dictionary data structures we constructed. 
    ---
    Output:
    The number of words which do not appear in the dictionary. 
    '''
    l = 0
    for word in lst:
        if not word_filter.__contains__(word):
            l += 1
    return l

test = ["ojbk", "bye", "jiumin"]
check(test, word_filter)

print(f'the total number of words that are not appeared in the dictionary is \
{check(neg_reviews, word_filter) + check(pos_reviews, word_filter)}')

print(f'the  number of words that are not appeared in the dictionary is \
{check(neg_reviews, word_filter)} for negative ones, and {check(pos_reviews, word_filter)} for positive ones')



the total number of words that are not appeared in the dictionary is 408151
the  number of words that are not appeared in the dictionary is 193802 for negative ones, and 214349 for positive ones


# Problem 3, 30 points: $Dead ends in PageRank computa- tions$

###  1.Prove that $w(r′) = w(r)$.

Since $ M $ is a stochastic matrix (every column sums to 1, as there are no dead ends and the value of $m_{ij}$  is $ \frac{1}{k} $ in each corresponding entry of those k links), thus we have:

$ w(r') = \sum_i r'_i = \sum_i \sum_j m_{ij} r_j $

$ M $ is stochastic and column-stochastic , the sum over j for each i is 1:

$ \sum_j m_{ij} = 1 $

Hence:

$ w(r') = \sum_i \sum_j m_{ij} r_j = \sum_j r_j \sum_i m_{ij} $

And since the inner sum $\sum_i m_{ij}$ is 1 for all  $j$:

$ w(r') = \sum_j r_j \cdot 1 = \sum_j r_j = w(r) $

**Conclude:$ w(r)$ remains the same after the update**

### 2.Under what circumstances $ w(r') = w(r) $?


Proof:

$ w(r') = \sum_i r'_i = \sum_i \left( \beta \sum_j m_{ij}r_j + \frac{1 - \beta}{n} \right) $

$ w(r') = \beta \sum_i \sum_j m_{ij}r_j + \sum_i \frac{1 - \beta}{n} $

$ w(r') = \beta \sum_j r_j \sum_i m_{ij} + (1 - \beta) $

Since $ M $ is a stochastic matrix, $ \sum_i m_{ij} = 1 $ for any j. Therefore:
$ w(r') = \beta \sum_j r_j \cdot 1 + (1 - \beta) $

$ w(r') = \beta \sum_j r_j + (1 - \beta) $

$ w(r') = \beta w(r) + (1 - \beta) $

For $ w(r') $ to equal $ w(r) $ the following must hold true:

$ \beta w(r) + (1 - \beta) = w(r) $

$ \beta w(r) + 1 - \beta = w(r) $

$ 1 - \beta = w(r)(1 - \beta) $

This last equality holds if $ w(r) = 1 $ 

In conclusion, the sum of PageRank scores remains the same after an update with teleportation **if the total PageRank score is normalized to 1 before the update**.

###  3.prove that $w(r′)$ is also 1.

The PageRank update equation with teleportation and dead ends is:

$ r' = \beta Mr + \frac{1 - \beta}{n} (E - uD^T)r $

where $ M $ is the transition matrix, $ E $ is a matrix with all entries equal to 1, $ u $ is a vector with all entries equal to 1, and $ D^T $ is a vector indicating dead ends.

Given $ w(r) = \sum_i r_i = 1 $, we calculate $ w(r') $ as:

$ w(r') = \sum_i r'_i $
$ w(r') = \sum_i \left( \beta \sum_j m_{ij}r_j + \frac{1 - \beta}{n} \sum_j (e_{ij} - u_id_j)r_j \right) $

Since $ M $ is column-stochastic and $ \sum_i m_{ij} = 1 $ for all $ j $, and $ \sum_i e_{ij} = n $, we get:

$ w(r') = \beta \sum_j r_j + (1 - \beta) \sum_j r_j $
$ w(r') = \beta w(r) + (1 - \beta) w(r) $
$ w(r') = w(r) $
$ w(r') = 1 $

Thus, the sum of PageRank scores remains conserved after the update, $ w(r') = 1 $.


# Problem 4, 60 points: (Implementing PageRank and HITS)

### PageRank Implementation

In [9]:
from pyspark import SparkContext, Broadcast
from operator import add
import numpy as np

sc = SparkContext()

n = 1000  # number of nodes
iterations = 40  # number of iterations
beta = 0.8

def generateM(x, broadcasted_ranks_dict):
    src, d_list = x
    deg = len(d_list)
    return [(dist, src, 1.0 / deg * broadcasted_ranks_dict.value[src]) for dist in d_list]

edges = sc.textFile("graph.txt").map(lambda x: x.split("\t")).map(lambda x: (int(x[0]), int(x[1])))
links = edges.distinct().groupByKey()

ranks = sc.parallelize([(i, 1.0 / n) for i in range(1, n + 1)])

for _ in range(iterations):
    # Broadcast the ranks dictionary to the workers
    broadcasted_ranks = sc.broadcast(dict(ranks.collect()))

    # Apply the generateM transformation using the broadcasted ranks
    M = links.flatMap(lambda x: generateM(x, broadcasted_ranks))

    # Proceed with the multiplication and update of ranks
    multiplications = M.map(lambda x: (x[0], x[2]))
    ranks = multiplications.reduceByKey(add).mapValues(lambda rank: beta * rank + (1 - beta) / n)

ranks = ranks.collect()
ranks.sort(key=lambda x: x[1], reverse=True)

print('Top 5 in descending order: ', [node for node, _ in ranks[:5]])
print('Bottom 5 in ascending order: ', [node for node, _ in ranks[-5:]])


23/11/27 13:42:29 WARN Utils: Your hostname, yongyuangendangzouxinzhongyoudangshiyelixiang.local resolves to a loopback address: 127.0.0.1; using 192.168.43.248 instead (on interface en0)
23/11/27 13:42:29 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
23/11/27 13:42:30 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
                                                                                

Top 5 in descending order:  [263, 537, 965, 243, 285]
Bottom 5 in ascending order:  [408, 424, 62, 93, 558]


In [None]:
def generateTuple(x):
        src, dst = x.split()
        return (int(src)-1, int(dst)-1)

def generateM(x):
    src, dist_list = x
    deg = len(dist_list)
    return [(dist, src, 1./deg) for dist in dist_list]

def generateR(x):
    r = np.zeros(n)
    for i,v in x:
        r[i] = v
    return r

data = sc.textFile("graph.txt")
data = data.map(lambda x: generateTuple(x)).distinct()# (s, d)
temp = data.groupByKey()# (s, resultiterable d_list)
M = temp.flatMap(lambda x: generateM(x))# (d, s, v)
r = np.ones(n) / n
for _ in range(iterations):
    muliplications = M.map(lambda h : (h[0], h[2]*r[h[1]]))
    r_rdd = muliplications.reduceByKey(lambda x, y : x + y)
    r_rdd = r_rdd.map(lambda h: (h[0], beta*h[1]+(1-beta)/n))
    r = generateR(r_rdd.collect())
    
ascending = np.argsort(r.T) + 1
descending = np.argsort(-r.T) + 1

print('top 5 in descending order: ', descending[0:5])# [263 537 965 243 285]
print('bottom 5 in ascending order: ', ascending[0:5])# [558  93  62 424 408]


In [10]:
sc.stop()

### HITS Implementation

In [11]:

from pyspark.mllib.linalg.distributed import CoordinateMatrix, MatrixEntry, IndexedRowMatrix
from pyspark.mllib.linalg import Vectors
from pyspark import SparkContext
from pyspark.sql import SparkSession
from pyspark.mllib.linalg import DenseMatrix

# Initialize SparkSession
spark = SparkSession.builder \
    .appName("HITS Algorithm") \
    .getOrCreate()


# Convert the ResultIterable into a list for each key in `links`
# let `links` be an RDD of tuples representing the adjacency list of the graph
# For example: [(1, [2, 3]), (2, [3]), ...]
links = links.map(lambda x: (x[0], list(x[1])))

# Convert links to entries of a coordinate matrix and then to IndexedRowMatrix
entries = links.flatMap(lambda x: [MatrixEntry(x[0] - 1, y - 1, 1) for y in x[1]])  # Subtracting 1 to make index start from 0
coord_matrix = CoordinateMatrix(entries)
L = coord_matrix.toIndexedRowMatrix()

# Transpose the matrix L to get Lt
Lt = L.toCoordinateMatrix().transpose().toIndexedRowMatrix()

# Number of nodes in the graph
n = L.numRows()

# Initialize h and a as Vectors of ones
h = Vectors.dense([1] * n)
a = Vectors.dense([1] * n)

# Function to normalize a vector so the largest value is 1
def normalize(v):
    max_value = max(v)
    return Vectors.dense([x / max_value for x in v])

# Function to convert a vector to a matrix
def vector_to_matrix(v):
    return DenseMatrix(len(v), 1, v, isTransposed=True)

# Perform the iteration
for iteration in range(40):
    # Convert the h vector to a dense matrix
    h_matrix = vector_to_matrix(h.toArray())

    # Compute a = Lt * h as an IndexedRowMatrix
    a = Lt.multiply(h_matrix)

    # Extract vectors from rows, normalize, and convert back to DenseVector
    a = Vectors.dense(a.toBlockMatrix().toLocalMatrix().toArray().flatten())
    a = normalize(a)

    # Convert the a vector to a dense matrix
    a_matrix = vector_to_matrix(a.toArray())

    # Compute h = L * a as an IndexedRowMatrix
    h = L.multiply(a_matrix)

    # Extract vectors from rows, normalize, and convert back to DenseVector
    h = Vectors.dense(h.toBlockMatrix().toLocalMatrix().toArray().flatten())
    h = normalize(h)




AttributeError: 'NoneType' object has no attribute 'sc'

In [None]:
# Convert Vectors to RDD and append the corresponding index as the node ID
h_rdd = spark.sparkContext.parallelize(h.toArray()).zipWithIndex().map(lambda x: (x[1]+1, x[0]))  # +1 if node IDs start at 1
a_rdd = spark.sparkContext.parallelize(a.toArray()).zipWithIndex().map(lambda x: (x[1]+1, x[0]))  # +1 if node IDs start at 1

# Find the 5 nodes with the highest hub  scores
top5_hub_nodes = h_rdd.top(5, key=lambda x: x[1])
# Find the 5 nodes with the lowest hub  scores
bottom5_hub_nodes = h_rdd.takeOrdered(5, key=lambda x: x[1])
# Find the 5 nodes with the highest authority  scores
top5_auth_nodes = a_rdd.top(5, key=lambda x: x[1])
# Find the 5 nodes with the lowest authority  scores
bottom5_auth_nodes = a_rdd.takeOrdered(5, key=lambda x: x[1])

print("Top 5 hub nodes:", top5_hub_nodes)
print()
print("Bottom 5 hub nodes:", bottom5_hub_nodes)
print()
print("Top 5 authority nodes:", top5_auth_nodes)
print()
print("Bottom 5 authority nodes:", bottom5_auth_nodes)


Top 5 hub nodes: [(840, 1.0), (155, 0.9499618624906543), (234, 0.8986645288972263), (389, 0.8634171101843789), (472, 0.8632841092495218)]

Bottom 5 hub nodes: [(23, 0.04206685489093652), (835, 0.057790593544330145), (141, 0.06453117646225177), (539, 0.0660265937341849), (889, 0.07678413939216452)]

Top 5 authority nodes: [(893, 1.0), (16, 0.9635572849634398), (799, 0.9510158161074015), (146, 0.9246703586198443), (473, 0.899866197360405)]

Bottom 5 authority nodes: [(19, 0.05608316377607618), (135, 0.06653910487622794), (462, 0.07544228624641901), (24, 0.08171239406816942), (910, 0.08571673456144875)]


# Problem 5, 40 points: (PageRank for Sports Analytics)

In [None]:
# Question 1

import pandas as pd

file_path = 'NCAA.csv'

df = pd.read_csv(file_path)

df.head()

Unnamed: 0,GameId,GameDate,NeutralSite,AwayTeam,HomeTeam,Team,Home,Score,AST,TOV,...,Rebounds,ORB,DRB,FGA,FGM,3FGM,3FGA,FTA,FTM,Fouls
0,1,1/1/2019 13:00,0,Notre Dame Fighting Irish,Virginia Tech Hokies,Notre Dame Fighting Irish,0,66,13,11,...,30,13,17,56,23,13,34,13,7,10
1,1,1/1/2019 13:00,0,Notre Dame Fighting Irish,Virginia Tech Hokies,Virginia Tech Hokies,1,81,19,7,...,24,2,22,55,33,11,18,5,4,13
2,2,1/3/2019 19:00,0,North Carolina State Wolfpack,Miami (FL) Hurricanes,Miami (FL) Hurricanes,1,82,12,7,...,27,9,18,61,28,10,25,29,16,14
3,2,1/3/2019 19:00,0,North Carolina State Wolfpack,Miami (FL) Hurricanes,North Carolina State Wolfpack,0,87,17,16,...,50,17,33,68,31,11,30,18,14,23
4,3,1/5/2019 3:27,0,Clemson Tigers,Duke Blue Devils,Clemson Tigers,0,68,14,16,...,35,9,26,63,27,6,15,12,8,16


In [None]:
df.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 298 entries, 0 to 297
Data columns (total 22 columns):
 #   Column       Non-Null Count  Dtype 
---  ------       --------------  ----- 
 0   GameId       298 non-null    int64 
 1   GameDate     298 non-null    object
 2   NeutralSite  298 non-null    int64 
 3   AwayTeam     298 non-null    object
 4   HomeTeam     298 non-null    object
 5   Team         298 non-null    object
 6   Home         298 non-null    int64 
 7   Score        298 non-null    int64 
 8   AST          298 non-null    int64 
 9   TOV          298 non-null    int64 
 10  STL          298 non-null    int64 
 11  BLK          298 non-null    int64 
 12  Rebounds     298 non-null    int64 
 13  ORB          298 non-null    int64 
 14  DRB          298 non-null    int64 
 15  FGA          298 non-null    int64 
 16  FGM          298 non-null    int64 
 17  3FGM         298 non-null    int64 
 18  3FGA         298 non-null    int64 
 19  FTA          298 non-null    

In [None]:
scores = df.groupby('GameId')['Score'].diff().fillna(method='bfill') * -1
df['delta'] = scores

# Initialize an empty list to store the edges.
edges = []

# Iterate through the DataFrame rows to build the edges list.
# We skip every other row since we're assuming the data is in pairs of away-home teams.
for idx, row in df.iterrows():
    if idx % 2 == 0:  # Process every pair of rows
        # Extract the necessary information from the row.
        away, home, relative_team = row[['AwayTeam', 'HomeTeam', 'Team']]
        delta = row['delta']

        # If the relative team is the away team, negate the delta.
        if relative_team == away:
            delta *= -1

        # Determine the winner, loser, and points based on the delta.
        if delta > 0:
            winner, loser, points = home, away, delta
        elif delta < 0:
            winner, loser, points = away, home, -delta
        else:  # If delta is zero, it's a draw, and we continue to the next iteration.
            continue

        # Append a tuple of loser, winner, and points to the edges list.
        edges.append((loser, winner, points))


In [None]:
from igraph import Graph

game_graph = Graph.TupleList(edges, directed=True, edge_attrs=['weight'])


print(game_graph.summary())


IGRAPH DNW- 15 149 -- 
+ attr: name (v), weight (e)


In [None]:
# Question 3
import operator
vectors = game_graph.pagerank()#creates the vector of rankings
e = {name:cen for cen, name in  zip([v for v in vectors],game_graph.vs['name'])}#we create a dict. with the names and scores
sorted_eigen = sorted(e.items(), key=operator.itemgetter(1),reverse=True)#we sort the teams accordingly the rankings
sorted_eigen

[('Duke Blue Devils', 0.18701215826953893),
 ('North Carolina Tar Heels', 0.15201923838621956),
 ('Virginia Cavaliers', 0.12424148988346527),
 ('Florida State Seminoles', 0.09481511469157541),
 ('Louisville Cardinals', 0.07752197728366199),
 ('Virginia Tech Hokies', 0.07310353575259473),
 ('Syracuse Orange', 0.0728390788302208),
 ('Clemson Tigers', 0.0366776772777835),
 ('Boston College Eagles', 0.0346795937311922),
 ('North Carolina State Wolfpack', 0.03397914060551954),
 ('Pittsburgh Panthers', 0.03379168787220402),
 ('Georgia Tech Yellow Jackets', 0.025552395760336584),
 ('Miami (FL) Hurricanes', 0.0194194243594586),
 ('Notre Dame Fighting Irish', 0.01755256066239208),
 ('Wake Forest Demon Deacons', 0.016794926633836707)]