In [1]:
!pip install -q google-cloud-storage==1.43.0
!pip install -q graphframes

import json
import pyspark
import sys
from collections import Counter, OrderedDict, defaultdict
import itertools
from itertools import islice, count, groupby
import pandas as pd
import os
import re
from operator import itemgetter
import nltk
from nltk.stem.porter import *
from nltk.corpus import stopwords
from time import time
from pathlib import Path
import pickle
import pandas as pd
from google.cloud import storage
import hashlib
def _hash(s):
    return hashlib.blake2b(bytes(s, encoding='utf8'), digest_size=5).hexdigest()

nltk.download('stopwords')
from pyspark.sql import *
from pyspark.sql.functions import *
from pyspark import SparkContext, SparkConf, SparkFiles
from pyspark.sql import SQLContext
from graphframes import *



[0m

[nltk_data] Downloading package stopwords to /root/nltk_data...
[nltk_data]   Unzipping corpora/stopwords.zip.


In [5]:
bucket_name = 'irproject_bucket' 
full_path = f"gs://{bucket_name}/"
paths=[]

client = storage.Client()
blobs = client.list_blobs(bucket_name)
for b in blobs:
    if b.name != 'graphframes.sh' and ('/' not in b.name or b.name.endswith('/')):
        paths.append(full_path+b.name)

parquetFile = spark.read.parquet(*paths)

wiki1000_body = parquetFile.select("id", "text").rdd
wiki1000_title = parquetFile.select("id", "title").rdd
wiki1000_anchor = parquetFile.select("id", "anchor_text").rdd

                                                                                

ID TO TITLES

In [6]:
ids_titles = wiki1000_title.collectAsMap()

                                                                                

In [7]:
with open('titles.json', 'w') as titles:
     json.dump(ids_titles, titles)

titles_src = "titles.json"
titles_dst = f'gs://{bucket_name}/titles/{titles_src}'
!gsutil cp $titles_src $titles_dst

Copying file://titles.json [Content-Type=application/json]...
==> NOTE: You are uploading one or more large file(s), which would run          
significantly faster if you enable parallel composite uploads. This
feature can be enabled by editing the
"parallel_composite_upload_threshold" value in your .boto
configuration file. However, note that if you do this large files will
be uploaded as `composite objects
<https://cloud.google.com/storage/docs/composite-objects>`_,which
means that any user who downloads such objects will need to have a
compiled crcmod installed (see "gsutil help crcmod"). This is because
without a compiled crcmod, computing checksums on composite objects is
so slow that gsutil disables downloads of composite objects.

\ [1 files][218.2 MiB/218.2 MiB]                                                
Operation completed over 1 objects/218.2 MiB.                                    


In [8]:
%cd -q /home/dataproc
!ls inverted_index.py
sc.addFile("/home/dataproc/inverted_index.py")
sys.path.insert(0,SparkFiles.getRootDirectory())
from inverted_index import InvertedIndex

inverted_index.py


In [9]:
english_stopwords = frozenset(stopwords.words('english'))
corpus_stopwords = ["category", "references", "also", "external", "links", 
                    "may", "first", "see", "history", "people", "one", "two", 
                    "part", "thumb", "including", "second", "following", 
                    "many", "however", "would", "became"]

all_stopwords = english_stopwords.union(corpus_stopwords)
RE_WORD = re.compile(r"""[\#\@\w](['\-]?\w){2,24}""", re.UNICODE)
ps = PorterStemmer()

In [10]:
def get_tf(doc_id, text):
    # Tokenize, filter, and count in one step for efficiency
    tokens = (token.group() for token in RE_WORD.finditer(text.lower()))
    filtered_tokens = [token for token in tokens if token not in all_stopwords]
    
    # Count tokens and prepare output format
    token_counts = Counter(filtered_tokens)
    return [(token, (doc_id, count)) for token, count in token_counts.items()]

In [16]:
def get_DL(text):
    # Tokenize and filter using a generator expression for efficiency
    tokens = [token.group() for token in RE_WORD.finditer(text.lower())]
    filtered_tokens = [token for token in tokens if token not in all_stopwords]
    
    # Count the number of filtered tokens directly
    return len(filtered_tokens)

BODY

In [12]:
#get tfs
body_tfs = wiki1000_body.flatMap(lambda x: get_tf(x[0], x[1]))

In [13]:
#sort pls
body_postings = body_tfs.groupByKey().mapValues(InvertedIndex.reduce_word_counts)
body_postings_filtered = body_postings.filter(lambda x: len(x[1])>50)

In [14]:
body_df = InvertedIndex.calculate_df(body_postings_filtered)
body_df_dict = body_df.collectAsMap()

                                                                                

In [17]:
body_DL = wiki1000_body.map(lambda x: (x[0], get_DL(x[1]))).collectAsMap()

                                                                                

In [18]:
body_tot_term = body_postings_filtered.mapValues(InvertedIndex.get_total_term).collectAsMap()

                                                                                

In [19]:
body_posting_locs_list = InvertedIndex.partition_postings_and_write(body_postings_filtered, bucket_name, "body").collect()

                                                                                

In [20]:
body_super_posting_locs = defaultdict(list)
for blob in client.list_blobs(bucket_name, prefix='body'):
    if not blob.name.endswith("pickle"):
        continue
    with blob.open("rb") as f:
        posting_locs = pickle.load(f)
        for k, v in posting_locs.items():
            body_super_posting_locs[k].extend(v)

In [21]:
# Create inverted index instance
body_inv_index = InvertedIndex()
body_inv_index.DL = body_DL

# Adding the posting locations dictionary to the inverted index
body_inv_index.posting_locs = body_super_posting_locs

# Add the token - df dictionary to the inverted index
body_inv_index.df = body_df_dict
body_inv_index.term_total = body_tot_term

# write the global stats out
body_inv_index.write_index('.', 'body_index')
index_src = "body_index.pkl"
index_dst = f'gs://{bucket_name}/body/{index_src}'
!gsutil cp $index_src $index_dst

Copying file://body_index.pkl [Content-Type=application/octet-stream]...
- [1 files][ 69.0 MiB/ 69.0 MiB]                                                
Operation completed over 1 objects/69.0 MiB.                                     


In [None]:
# body_inv_index.read_posting_list('state', 'body', bucket_name)

TITLE

In [22]:
def get_title_tf(id, text):
    tokens = [token.group() for token in RE_WORD.finditer(text.lower())]
    # filtered tokens
    tokens = [token for token in tokens if token not in all_stopwords]
    # use counter to count and map values with id 
    stems = []
    for token in tokens:
        stems.append(ps.stem(token))
    return list(map(lambda x: (x[0], (id, x[1])), list(Counter(stems).items())))

def get_title_DL(text):
    tokens = [token.group() for token in RE_WORD.finditer(text.lower())]
    # filtered tokens
    tokens = [token for token in tokens if token not in all_stopwords]
    stems = []
    for token in tokens:
        stems.append(ps.stem(token))
    return len(stems)


In [23]:
#get tfs
title_tfs = wiki1000_title.flatMap(lambda x: get_title_tf(x[0], x[1]))

In [24]:
# #sort pls
title_postings = title_tfs.groupByKey().mapValues(InvertedIndex.reduce_word_counts)

In [25]:
title_df = InvertedIndex.calculate_df(title_postings)
title_df_dict = title_df.collectAsMap()

                                                                                

In [26]:
title_DL = wiki1000_title.map(lambda x: (x[0], get_title_DL(x[1]))).collectAsMap()

                                                                                

In [27]:
title_tot_term = title_tfs.groupByKey().mapValues(InvertedIndex.get_total_term).collectAsMap()

                                                                                

In [28]:
title_posting_locs_list = InvertedIndex.partition_postings_and_write(title_postings, bucket_name, "title").collect()

                                                                                

In [29]:
title_super_posting_locs = defaultdict(list)
for blob in client.list_blobs(bucket_name, prefix='title'):
    if not blob.name.endswith("pickle"):
        continue
    with blob.open("rb") as f:
        posting_locs = pickle.load(f)
        for k, v in posting_locs.items():
            title_super_posting_locs[k].extend(v)

In [30]:
# Create inverted index instance
title_inv_index = InvertedIndex()
title_inv_index.DL = title_DL

# Adding the posting locations dictionary to the inverted index
title_inv_index.posting_locs = title_super_posting_locs

# Add the token - df dictionary to the inverted index
title_inv_index.df = title_df_dict
title_inv_index.term_total = title_tot_term

# write the global stats out
title_inv_index.write_index('.', 'title_index')
index_src2 = "title_index.pkl"
index_dst2 = f'gs://{bucket_name}/title/{index_src2}'
!gsutil cp $index_src2 $index_dst2

Copying file://title_index.pkl [Content-Type=application/octet-stream]...
| [1 files][126.1 MiB/126.1 MiB]                                                
Operation completed over 1 objects/126.1 MiB.                                    


In [31]:
title_inv_index.read_posting_list('anarchism', 'title', bucket_name)

[]

In [32]:
title_inv_index.read_posting_list('anarch', 'title', bucket_name)

[(12, 1),
 (14936, 1),
 (98514, 1),
 (105859, 1),
 (371351, 1),
 (470052, 1),
 (673063, 1),
 (805586, 1),
 (1063286, 1),
 (1249918, 1),
 (1325940, 1),
 (1332770, 1),
 (1433310, 1),
 (1596739, 1),
 (1596742, 1),
 (2052697, 1),
 (2141543, 1),
 (2274182, 1),
 (2287742, 1),
 (2382358, 1),
 (2553405, 1),
 (2722899, 1),
 (4117528, 1),
 (4398733, 1),
 (4532867, 1),
 (4687957, 1),
 (4977561, 1),
 (5601442, 1),
 (5658365, 1),
 (5773736, 1),
 (5879835, 1),
 (6244745, 1),
 (6759361, 1),
 (7066267, 1),
 (7080268, 1),
 (7475769, 1),
 (8360302, 1),
 (8364409, 1),
 (8603686, 1),
 (8784176, 1),
 (8817545, 1),
 (8874671, 1),
 (9265113, 1),
 (10304567, 1),
 (10403320, 1),
 (11630863, 1),
 (11893205, 1),
 (14526954, 1),
 (14603019, 1),
 (14728186, 1),
 (14946461, 1),
 (15280013, 1),
 (15475159, 1),
 (15481723, 1),
 (15484912, 1),
 (15499386, 1),
 (16032615, 1),
 (16102540, 1),
 (17846785, 1),
 (17878864, 1),
 (18017282, 1),
 (18328984, 1),
 (18330784, 1),
 (18809819, 1),
 (20264216, 1),
 (20679746, 1),
 

ANCHOR

In [33]:
def get_anchor_tf(id, rows):
    rows2 = list(map(lambda x: x[1], rows))
    text = ''
    for t in rows2:
        text+=t+" "
    tokens = [token.group() for token in RE_WORD.finditer(text.lower())]
    # filtered tokens
    tokens = [token for token in tokens if token not in all_stopwords]
    # use counter to count and map values with id 
    return list(map(lambda x: (x[0], (id, x[1])), list(Counter(tokens).items())))

In [34]:
def get_anchor_DL(rows):
    rows2 = list(map(lambda x: x[1], rows))
    text = ''
    for t in rows2:
        text+=t+" "
    tokens = [token.group() for token in RE_WORD.finditer(text.lower())]
    # filtered tokens
    tokens = [token for token in tokens if token not in all_stopwords]
    return len(tokens) # TODO Maybe not filtered

In [35]:
#get tfs
anchor_tfs = wiki1000_anchor.flatMap(lambda x: get_anchor_tf(x[0], x[1]))

In [36]:
# sort pls
anchor_postings = anchor_tfs.groupByKey().mapValues(InvertedIndex.reduce_word_counts)
anchor_postings_filtered = body_postings.filter(lambda x: len(x[1])>20)

In [37]:
anchor_df = InvertedIndex.calculate_df(anchor_postings_filtered)
anchor_df_dict = anchor_df.collectAsMap()

                                                                                

In [38]:
anchor_DL = wiki1000_anchor.map(lambda x: (x[0], get_anchor_DL(x[1]))).collectAsMap()

                                                                                

In [None]:
anchor_tot_term = anchor_postings_filtered.mapValues(InvertedIndex.get_total_term).collectAsMap()



In [27]:
anchor_posting_locs_list = InvertedIndex.partition_postings_and_write(anchor_postings_filtered, bucket_name, "anchor").collect()

                                                                                

In [28]:
anchor_super_posting_locs = defaultdict(list)
for blob in client.list_blobs(bucket_name, prefix='anchor'):
    if not blob.name.endswith("pickle"):
        continue
    with blob.open("rb") as f:
        posting_locs = pickle.load(f)
        for k, v in posting_locs.items():
            anchor_super_posting_locs[k].extend(v)

In [30]:
# Create inverted index instance
anchor_inv_index = InvertedIndex()
anchor_inv_index.DL = anchor_DL

# Adding the posting locations dictionary to the inverted index
anchor_inv_index.posting_locs = anchor_super_posting_locs

# Add the token - df dictionary to the inverted index
anchor_inv_index.df = anchor_df_dict
anchor_inv_index.term_total = anchor_tot_term

# write the global stats out
anchor_inv_index.write_index('.', 'anchor_index')
index_src3 = "anchor_index.pkl"
index_dst3 = f'gs://{bucket_name}/anchor/{index_src3}'
!gsutil cp $index_src3 $index_dst3

Copying file://anchor_index.pkl [Content-Type=application/octet-stream]...
- [1 files][ 91.4 MiB/ 91.4 MiB]                                                
Operation completed over 1 objects/91.4 MiB.                                     


In [None]:
# anchor_inv_index.read_posting_list('teresa', 'anchor', bucket_name)

PAGE RANK

In [None]:
def get_ids_from_anchors(id,anchorlist):
    return [(id, anchor[0]) for anchor in anchorlist]
  

def generate_graph(pages):
    # Flatten the list of anchor IDs for vertices and create distinct vertices
    verticesFromLinks = pages.flatMap(lambda x: [y[0] for y in x[1]]).distinct()
    verticeFromIds = pages.map(lambda x: x[0]).distinct()

    # Union of vertices from IDs and links ensures all unique vertices are considered
    vertices = verticesFromLinks.union(verticeFromIds).map(lambda x: (x, ))

    # Generate edges by flattening the anchor list with the corresponding page ID
    edges = pages.flatMap(lambda x: get_ids_from_anchors(x[0], x[1])).distinct()

    return edges, vertices

In [None]:
edges, vertices = generate_graph(wiki1000_anchor)
v_cnt, e_cnt = vertices.count(), edges.count()

In [None]:
edgesDF = edges.toDF(['src', 'dst']).repartition(124, 'src')
verticesDF = vertices.toDF(['id']).repartition(124, 'id')
g = GraphFrame(verticesDF, edgesDF)
pr_results = g.pageRank(resetProbability=0.15, maxIter=6)
pr = pr_results.vertices.select("id", "pagerank")

dictpr = {}
for row in pr.toPandas().iterrows():
    dictpr[int(row[1][0])]=row[1][1]

with open('pr.json', 'w') as pr:
     json.dump(dictpr, pr)

pr_src = "pr.json"
pr_dst = f'gs://{bucket_name}/pr/{pr_src}'
!gsutil cp $pr_src $pr_dst