<a href="https://colab.research.google.com/github/elorberb/Search-Engine/blob/etay/seach_engine.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# Setup

## General imports

The `inverted_index_colab` import requires the `inverted_index_colab.py` file. You should upload the file and then run this cell.

In [1]:
import sys
from collections import Counter, OrderedDict
import itertools
from itertools import islice, count, groupby
import pandas as pd
import os
import re
from operator import itemgetter
import nltk
from nltk.stem.porter import *
from nltk.corpus import stopwords
from time import time
from timeit import timeit
from pathlib import Path
import pickle
import pandas as pd
import numpy as np
from google.cloud import storage

import hashlib
def _hash(s):
    return hashlib.blake2b(bytes(s, encoding='utf8'), digest_size=5).hexdigest()

nltk.download('stopwords')

from inverted_index_colab import *


[nltk_data] Downloading package stopwords to /root/nltk_data...
[nltk_data]   Package stopwords is already up-to-date!


## Installing, importing, and initializing PySpark


The following code installs PySpark and its dependencies in Colab. In addition, we install GraphFrames, which is a PySpark package for dealing with graphs in a distributed fashion. Colab notebooks run on a single machine so we will work in local mode, i.e. there is no cluster of machines and both the master and worker processes run on a single machine. This will help us debug and iron out the code we will use in the second half of this assignment on an actual cluster mode in GCP. 

The installation in the next cell should take about 1 minute in a fresh environment. Don't worry about going  over the 90 seconds limit for the assignment because in our testing enviroment all of these requirements will already be met. 

In [2]:
# These will already be installed in the testing environment so disregard the 
# amount of time (~1 minute) it takes to install. 
!pip install -q pyspark
!pip install -U -q PyDrive
!apt install openjdk-8-jdk-headless -qq
!pip install -q graphframes
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
graphframes_jar = 'https://repos.spark-packages.org/graphframes/graphframes/0.8.2-spark3.2-s_2.12/graphframes-0.8.2-spark3.2-s_2.12.jar'
spark_jars = '/usr/local/lib/python3.7/dist-packages/pyspark/jars'
!wget -N -P $spark_jars $graphframes_jar

openjdk-8-jdk-headless is already the newest version (8u352-ga-1~18.04).
The following package was automatically installed and is no longer required:
  libnvidia-common-460
Use 'apt autoremove' to remove it.
0 upgraded, 0 newly installed, 0 to remove and 21 not upgraded.
--2023-01-07 10:00:13--  https://repos.spark-packages.org/graphframes/graphframes/0.8.2-spark3.2-s_2.12/graphframes-0.8.2-spark3.2-s_2.12.jar
Resolving repos.spark-packages.org (repos.spark-packages.org)... 108.156.83.69, 108.156.83.116, 108.156.83.15, ...
Connecting to repos.spark-packages.org (repos.spark-packages.org)|108.156.83.69|:443... connected.
HTTP request sent, awaiting response... 304 Not Modified
File ‘/usr/local/lib/python3.7/dist-packages/pyspark/jars/graphframes-0.8.2-spark3.2-s_2.12.jar’ not modified on server. Omitting download.



In [3]:
import pyspark
from pyspark.sql import *
from pyspark.sql.functions import *
from pyspark import SparkContext, SparkConf
from pyspark.sql import SQLContext
from pyspark.ml.feature import Tokenizer, RegexTokenizer
from graphframes import *

In [4]:
# Initializing spark context
# create a spark context and session

conf = SparkConf().set("spark.ui.port", "4050")
conf.set("spark.jars.packages", "graphframes:graphframes:0.8.2-spark3.2-s_2.12")
# conf = SparkConf().set("spark.ui.port", "4050")
sc = pyspark.SparkContext(conf=conf)
sc.addPyFile(str(Path(spark_jars) / Path(graphframes_jar).name))
spark = SparkSession.builder.getOrCreate()

You can easily check the current version and get the link of the web interface. In the Spark UI, you can monitor the progress of your job and debug the performance bottlenecks (if your Colab is running with a **local runtime**).

In [5]:
spark

## Copy some wiki data

As mentioned above, all wiki dumps were preprocessed and placed in a shared google storage bucket. To access the files in Colab, you will first have to authenticate with google storage, and then copy the data to your local environment. 

**IMPORTANT NOTE**

Before you start working with the wiki data, you first have to go over the 
"Working with GCP.pdf" Provided to you under the gcp folder in the same zip file as this folder. In that pdf you will redeem your credits for the GCP and create your instance. You have to do this procedure because the files will be mounted to you through GCP process. 
Notice that you have to change the project_id variable in the second cell below. 

In GCP, the storage will already be mounted on the cluster machines and we will show you how to access it.


In [6]:
# Authenticate your user
# The authentication should be done with the email connected to your GCP account
from google.colab import auth
auth.authenticate_user()

In [7]:
# Copy one wikidumps files 
import os
from pathlib import Path
from google.colab import auth
## RENAME the project_id to yours project id from the project you created in GCP 
project_id = 'dataretrivel3'
!gcloud config set project {project_id}

data_bucket_name = 'wikidata20210801_preprocessed'
try:
    if os.environ["wikidata20210801_preprocessed"] is not None:
        pass  
except:
      !mkdir wikidumps
      !gsutil -u {project_id} cp gs://{data_bucket_name}/multistream1_preprocessed.parquet "wikidumps/" 


Updated property [core/project].
mkdir: cannot create directory ‘wikidumps’: File exists
Copying gs://wikidata20210801_preprocessed/multistream1_preprocessed.parquet...
\ [1 files][316.7 MiB/316.7 MiB]                                                
Operation completed over 1 objects/316.7 MiB.                                    


# Processing wikipedia

Now that we completed the setup and have some data in our local environment, we are ready to process it using PySpark. 

## A 2-minute intro to PySpark

One of the fundamental data structures in spark is a **Resillient Distributed Dataset (RDD)**. It is an immutable distributed collections of objects that are partitioned across machines, and thus making parallel processing natural and easy. RDD's support two types of operations: *transformations* (e.g. selecting a subset of fields from each element or projecting each element from A->B), and *actions* (e.g. save to disk or `collect` elements into the master node's memory). Watch out for `collect` operations that send too much data to the master node, it will crash without a trace because of running out of memory. 

Spark employs **lazy-evaluation** (like many of us...), meaning that it accumulates things we ask it to do into an execution plan, but doesn't do any calculation until a result is actually needed. When a result is needed? when we save something to disk, ask spark to show the data, or use other actions. But make no mistakes, Spark is not a slacker. It employs lazy-evaluation because it allows optimizations that wouldn't be possible otherwise and it often saves users from unnecessary data reads, tranfers across machines, and other costly operations. As we will see in a bit, the RDD data structure lends itself to simple and efficient MapReduce operations.

Let's look at our data before transforming it to RDD.

In [9]:
from pathlib import Path 
import os

try:
    if os.environ["wikidata20210801_preprocessed"] is not None:
      path = os.environ["wikidata20210801_preprocessed"]+"/wikidumps/*"
except:
      path = "wikidumps/*"

parquetFile = spark.read.parquet(path)
parquetFile.show()

+---+--------------------+--------------------+--------------------+
| id|               title|                text|         anchor_text|
+---+--------------------+--------------------+--------------------+
| 12|           Anarchism|'''Anarchism''' i...|[{23040, politica...|
| 25|              Autism|'''Autism''' is a...|[{492271, Clinica...|
| 39|              Albedo|thumb|upright=1.3...|[{679294, diffuse...|
|290|                   A|'''A''', or '''a'...|[{290, See below}...|
|303|             Alabama|'''Alabama''' () ...|[{351590, Yellowh...|
|305|            Achilles|thumb|260px|Ancie...|[{1076007, potter...|
|307|     Abraham Lincoln|'''Abraham Lincol...|[{1827174, Alexan...|
|308|           Aristotle|'''Aristotle''' (...|[{1389981, bust},...|
|309|An American in Paris|'''''An American ...|[{13066, George G...|
|316|Academy Award for...|The '''Academy Aw...|[{39842, Academy ...|
|324|      Academy Awards|The '''Academy Aw...|[{649481, film in...|
|330|             Actrius|'''''Act

In [10]:
# take the 'text' and 'id' or the first 1000 rows and create an RDD from it
doc_text_pairs = parquetFile.limit(1000).select("text", "id").rdd
doc_title_pairs = parquetFile.limit(1000).select("title", "id").rdd
doc_anchor_pairs = parquetFile.limit(1000).select("anchor_text", "id").rdd

In [11]:
!mkdir 'text'
!mkdir 'title'
!mkdir 'anchor'

mkdir: cannot create directory ‘text’: File exists
mkdir: cannot create directory ‘title’: File exists
mkdir: cannot create directory ‘anchor’: File exists


## Word counts

### Term frequency

In [12]:
english_stopwords = frozenset(stopwords.words('english'))
corpus_stopwords = ['category', 'references', 'also', 'links', 'extenal', 'see', 'thumb']
RE_WORD = re.compile(r"""[\#\@\w](['\-]?\w){2,24}""", re.UNICODE)

all_stopwords = english_stopwords.union(corpus_stopwords)

def word_count(text, id):
  ''' Count the frequency of each word in `text` (tf) that is not included in 
  `all_stopwords` and return entries that will go into our posting lists. 
  Parameters:
  -----------
    text: str
      Text of one document
    id: int
      Document id
  Returns:
  --------
    List of tuples_list
      A list of (token, (doc_id, tf)) pairs 
      for example: [("Anarchism", (12, 5)), ...]
  '''
  tokens = [token.group() for token in RE_WORD.finditer(text.lower())]
  # YOUR CODE HERE
  # remove stop words
  tokens = [token for token in tokens if token not in all_stopwords]
  # count tokens
  tokens_count = Counter(tokens)
  # build list of tuples_list
  tuples_list = [(token, (id, tokens_count[token])) for token in tokens_count]
  return tuples_list    

In [13]:
word_counts_text = doc_text_pairs.flatMap(lambda x: word_count(x[0], x[1]))
word_counts_title = doc_title_pairs.flatMap(lambda x: word_count(x[0], x[1]))
id_to_anchor = doc_anchor_pairs.flatMap(lambda x: x[0])
id_to_text = id_to_anchor.distinct().groupByKey().mapValues(lambda x: " ".join(x))
word_counts_anchor = id_to_text.flatMap(lambda x: word_count(x[1], x[0]))     # -> [(token, (doc_id, tf)), ]

In [14]:
def reduce_word_counts(unsorted_pl):
  ''' Returns a sorted posting list by wiki_id.
  Parameters:
  -----------
    unsorted_pl: list of tuples
      A list of (wiki_id, tf) tuples 
  Returns:
  --------
    list of tuples
      A sorted posting list.
  '''
  # YOUR CODE HERE
  return sorted(unsorted_pl)

In [15]:
postings_text = word_counts_text.groupByKey().mapValues(reduce_word_counts)
postings_title = word_counts_title.groupByKey().mapValues(reduce_word_counts)
postings_anchor = word_counts_anchor.groupByKey().mapValues(reduce_word_counts)

### Document frequency and filteration

Filter postings with less then 50 documents that tey appear

In [16]:
postings_filtered_text = postings_text.filter(lambda x: len(x[1])>50)

###Clculate DF

In [17]:
def calculate_df(postings):
  ''' Takes a posting list RDD and calculate the df for each token.
  Parameters:
  -----------
    postings: RDD
      An RDD where each element is a (token, posting_list) pair.
  Returns:
  --------
    RDD
      An RDD where each element is a (token, df) pair.
  '''
  # YOUR CODE HERE
  return postings.mapValues(lambda x: len(x))

In [18]:
# global statistics
w2df_text = calculate_df(postings_filtered_text)
w2df_title = calculate_df(postings_title)
w2df_anchor = calculate_df(postings_anchor)

In [19]:
dfs_text = w2df_text.collectAsMap()
dfs_title = w2df_title.collectAsMap()
dfs_anchor = w2df_anchor.collectAsMap()

## Partitioning and writing the index

In [20]:
NUM_BUCKETS = 124
def token2bucket_id(token):
  return int(_hash(token),16) % NUM_BUCKETS

def partition_postings_and_write(postings, dir_path):
    ''' A function that partitions the posting lists into buckets, writes out 
    all posting lists in a bucket to disk, and returns the posting locations for 
    each bucket. Partitioning should be done through the use of `token2bucket` 
    above. Writing to disk should use the function  `write_a_posting_list`, a 
    static method implemented in inverted_index_colab.py under the InvertedIndex 
    class. 
    Parameters:
    -----------
    postings: RDD
        An RDD where each item is a (w, posting_list) pair.
    Returns:
    --------
    RDD
        An RDD where each item is a posting locations dictionary for a bucket. The
        posting locations maintain a list for each word of file locations and 
        offsets its posting list was written to. See `write_a_posting_list` for 
        more details.
    '''
    token_to_bucket_rdd = postings.map(lambda item: (token2bucket_id(item[0]), item)).groupByKey()
    res = token_to_bucket_rdd.map(lambda item: InvertedIndex.write_a_posting_list(item, dir_path))
    return res

The next cell uses the `collect` operator to aggregate the posting location information from the multiple buckets/indices into a single dictionary. We know in this case that the posting locations info is relatively small so this is not going to crash our master node. 

In [21]:
posting_locs_list_text = partition_postings_and_write(postings_filtered_text, 'text').collect()
posting_locs_list_title = partition_postings_and_write(postings_title, 'title').collect()
posting_locs_list_anchor = partition_postings_and_write(postings_anchor, 'anchor').collect()

In [22]:
# merge the posting locations into a single dict and run more tests for text
super_posting_locs_text = defaultdict(list)
for posting_loc in posting_locs_list_text:
  for k, v in posting_loc.items():
    super_posting_locs_text[k].extend(v)

In [23]:
# merge the posting locations into a single dict and run more tests for title
super_posting_locs_title = defaultdict(list)
for posting_loc in posting_locs_list_title:
  for k, v in posting_loc.items():
    super_posting_locs_title[k].extend(v)

In [24]:
# merge the posting locations into a single dict and run more tests for anchor
super_posting_locs_anchor = defaultdict(list)
for posting_loc in posting_locs_list_anchor:
  for k, v in posting_loc.items():
    super_posting_locs_anchor[k].extend(v)

Putting everything together (posting locations, df) and test that the resulting index is correct. 

####Text Index

In [25]:
# Create inverted_text index instance
inverted_text = InvertedIndex()
# Adding the posting locations dictionary to the inverted_text index
inverted_text.posting_locs = super_posting_locs_text
# Add the token - df dictionary to the inverted_text index
inverted_text.df = dfs_text
# write the global stats out
inverted_text.write_index('.', 'index_text')

####Title Index

In [26]:
# Create inverted_title index instance
inverted_title = InvertedIndex()
# Adding the posting locations dictionary to the inverted_title index
inverted_title.posting_locs = super_posting_locs_title
# Add the token - df dictionary to the inverted_title index
inverted_title.df = dfs_title
# write the global stats out
inverted_title.write_index('.', 'index_title')

####Anchor Index

In [27]:
# Create inverted_anchor index instance
inverted_anchor = InvertedIndex()
# Adding the posting locations dictionary to the inverted_anchor index
inverted_anchor.posting_locs = super_posting_locs_anchor
# Add the token - df dictionary to the inverted_anchor index
inverted_anchor.df = dfs_anchor
# write the global stats out
inverted_anchor.write_index('.', 'index_anchor')

In [28]:
TUPLE_SIZE = 6       
TF_MASK = 2 ** 16 - 1 # Masking the 16 low bits of an integer
from contextlib import closing

def read_posting_list(inverted, w):
  with closing(MultiFileReader()) as reader:
    locs = inverted.posting_locs[w]
    b = reader.read(locs, inverted.df[w] * TUPLE_SIZE)
    posting_list = []
    for i in range(inverted.df[w]):
      doc_id = int.from_bytes(b[i*TUPLE_SIZE:i*TUPLE_SIZE+4], 'big')
      tf = int.from_bytes(b[i*TUPLE_SIZE+4:(i+1)*TUPLE_SIZE], 'big')
      posting_list.append((doc_id, tf))
    return posting_list

In [33]:
post_gen = get_posting_gen(inverted_text)

## PageRank

In this section, your task is to compute PageRank for wiki articles using the anchor text we extracted from the MediaWiki markdown. Let's select the 'id' and 'anchor_text' fields into a new RDD:

In [None]:
pages_links = spark.read.parquet(path).limit(1000).select("id", "anchor_text").rdd

In [None]:
pages_links.take(1)[0][1][2]

**YOUR TASK (20 POINTS):** Complete the implementation of `generate_graph`, which generates the graph of internal wikipedia links (wiki articles pointing to other wiki articles). Use PySpark to compute the vertices (pages) and edges (links) of the graph. Multiple links from page A to page B need to be represented by a single edge (edges are not weighted). You may use the built-in function `distinct` of the RDD object to identify distinct elements, but be mindful of its impact on computation time. 

In [None]:
def generate_graph(pages):
  ''' Compute the directed graph generated by wiki links.
  Parameters:
  -----------
    pages: RDD
      An RDD where each row consists of one wikipedia articles with 'id' and 
      'anchor_text'.
  Returns:
  --------
    edges: RDD
      An RDD where each row represents an edge in the directed graph created by
      the wikipedia links. The first entry should the source page id and the 
      second entry is the destination page id. No duplicates should be present. 
    vertices: RDD
      An RDD where each row represents a vetrix (node) in the directed graph 
      created by the wikipedia links. No duplicates should be present. 
  '''
  # YOUR CODE HERE
  def get_ids(rows):
    """return list of ids as key and set of the rows it apears"""
    ids = list({row[0] for row in rows})
    return ids

  def get_edges(ids):
    """return list of edges represented as (id, row_num)"""
    edges = [(ids[0], id) for id in ids[1]]
    return edges

  def get_vertices(edges):
    """return list of vertices represented as (id/row_num, )"""
    vertices = [(i,) for i in edges]
    return vertices

  ids = pages.mapValues(get_ids)
  edges = ids.flatMap(get_edges)
  vertices = edges.flatMap(get_vertices).distinct()

  return edges, vertices

In [None]:
# test graph construction efficiency (10 points)
t_start = time()
# construct the graph for a small sample of (1000) pages
edges, vertices = generate_graph(pages_links)
# time the actual execution
v_cnt, e_cnt = vertices.count(), edges.count()
graph_const_time = time() - t_start
# test that the implementation is efficient (<20 seconds)
assert graph_const_time < 20

In [None]:
# test that we get the right number of vertices (5 points)
assert v_cnt == 114966

In [None]:
# test that we get the right number of edges (5 points)
assert e_cnt == 183727

Let's compute PageRank in a distributed fashion (using PySpark's GraphFrame package) and show the top 20 pages.
You will have to implement the PageRank algorithm in the next cell

In [None]:
#BEGIN YOUR CODE HERE
edgesDF = edges.toDF(['src', 'dst']).repartition(4, 'src')
verticesDF = vertices.toDF(['id']).repartition(4, 'id')
g = GraphFrame(verticesDF, edgesDF)
pr_results = g.pageRank(resetProbability=0.15, maxIter=10)
pr = pr_results.vertices.select("id", "pagerank")
pr = pr.sort(col('pagerank').desc())
pr.repartition(1).write.csv('pr', compression="gzip")
pr.show()
#END OF CODE

**YOUR TASK (5 POINTS)**: Complete the implmentation of `get_top_pr_page_titles` to return the titles a few of the top pages as ranked by PageRank and printed above. To get the title for each page id listed above, navigate to the url https://en.wikipedia.org/?curid=wiki_id_here by replacing the `wiki_id_here` part with the page id. For example, if you navigate to https://en.wikipedia.org/?curid=25507 for the page with id 25507 you'd see that its title is 'Roman Empire'.

In [None]:
def get_top_pr_page_titles():
  ''' Returns the title of the first, fourth, and fifth pages as ranked about 
      by PageRank.
  Returns:
  --------
    list of three strings.
  '''
  # YOUR CODE HERE
  # links:
  # "https://en.wikipedia.org/?curid=606848"
  # "https://en.wikipedia.org/?curid=148363"
  # "https://en.wikipedia.org/?curid=308"  
  return ["Catholic Church", "Ancient Greek", "Aristotle"]

In [None]:
assert [_hash(x.lower()) for x in get_top_pr_page_titles()] == \
  ['52bb7dbc02', '3e920ea944', '3d007b19ee']

Do the top ranked pages make sense to you? Why do you think these pages received such a high scores from PageRank?

# GCP

The second part of the assignment involves running index creation in GCP and reporting results in this notebook. Follow [this](https://docs.google.com/document/d/1HTZVDMQ1gyeq8dlyzmomOkxjSo1Td76PsCGlFkQohps/edit?usp=sharing) step-by-step guide in order to set up your cluster environment on GCP so you can execute pyspark code on a cluster of machines. 

The code to the GCP notebook is in the moodle under assignment 3. You will have to copy the code you completed here in the relevant sections.

In the current notebook we used MapReduce to create the index for only one wiki dump. Using GCP, we will be able to create the index on the entire english wikipedia. The GCP part of this assignment will be the first time we will work with the entire corpus. You will need to follow the notebook, complete the code as you did here (the same code you completed in this notebook can be used in the GCP notebook). 
At the end of the GCP assignment you will have questions that you will need to answer - this are not the tests we used so far, so you should follow the instructions in the notebook.