In [None]:
!pip3 install pyspark

Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/
Collecting pyspark
  Downloading pyspark-3.2.1.tar.gz (281.4 MB)
[K     |████████████████████████████████| 281.4 MB 29 kB/s 
[?25hCollecting py4j==0.10.9.3
  Downloading py4j-0.10.9.3-py2.py3-none-any.whl (198 kB)
[K     |████████████████████████████████| 198 kB 40.6 MB/s 
[?25hBuilding wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.2.1-py2.py3-none-any.whl size=281853642 sha256=c0a23a6cc5e0ee6676e6f42955c0de809dbab9f7a0532cb823806d133a5a829f
  Stored in directory: /root/.cache/pip/wheels/9f/f5/07/7cd8017084dce4e93e84e92efd1e1d5334db05f2e83bcef74f
Successfully built pyspark
Installing collected packages: py4j, pyspark
Successfully installed py4j-0.10.9.3 pyspark-3.2.1


In [None]:
# Give Colab Access to Google Drive
from google.colab import drive
drive.mount('/content/drive/')

Mounted at /content/drive/


In [None]:
import os
os.chdir('drive/MyDrive/SearchEngine')

In [None]:
"""
N-Gram Processing

Not only word counts, but N-Gram Counts
"""
N = 2
class NGramException(Exception):
  pass

def N_2_Gram(content, N=2):
  words = content.lower().split(' ')
  if N > len(words):
    raise NGramException('Trying to split into N-Grams on content with less than N words.')
  grams = []
  for i in range(len(words) - N + 1):
    grams.append(' '.join(words[i : i + N]))
  return grams

def N_3_Gram(content, N=3):
  words = content.lower().split(' ')
  if N > len(words):
    raise NGramException('Trying to split into N-Grams on content with less than N words.')
  grams = []
  for i in range(len(words) - N + 1):
    grams.append(' '.join(words[i : i + N]))
  return grams

# content_map = {
#   'link1': 'q',
#   'link2': 'this is. like. another sentence or two'
# }

# content_map = {k : content_map[k] for k in content_map.keys() if len(content_map[k]) != 0 and len(content_map[k].split(' ')) > N}

# # Get (link, content) for each link
# raw_in = sc.parallelize([(l,c) for l,c in content_map.items()])

# # def get_lookup_for_single_words(N=1):

# if N == 1:
#   # Get (link, word) for each row
#   # We do this by splitting the text content and flattening on the values
#   tf_map_input = raw_in.flatMapValues(lambda content: content.lower().strip().split(' '))
# elif N == 2:
#   # Get (link, 2-grams) for each row
#   tf_map_input = raw_in.flatMapValues(NGram)

# tf_map_input.collect()

In [None]:
# Main Script
from pyspark.context import SparkContext
from typing import List, Tuple
import json, math
import numpy as np

# sc = SparkContext('local', 'term frequency')
with open('json_files/content.json', 'r') as content_json:
	content_map = json.load(content_json)
 
# Clean empty content, and have at least 3 words per content
content_map = {k : content_map[k] for k in content_map.keys() if len(content_map[k]) != 0 and len(content_map[k].split(' ')) > N}

"""
Calculate the Term Frequency
Raw Input: (link, content)
Mapper Input: (link, word)
Mapper Output: ((link, word), 1)
Reducer: ((link, word), TF)
"""
# Get (link, content) for each link
raw_in = sc.parallelize([(l,c) for l,c in content_map.items()])

def get_lookup(N=1):

  if N == 1:
    # Get (link, word) for each row
    # We do this by splitting the text content and flattening on the values
    tf_map_input = raw_in.flatMapValues(lambda content: content.lower().strip().split(' '))
  elif N == 2:
    # Get (link, 2-grams) for each row
    print('Calling flatMapValues(NGram)')
    tf_map_input = raw_in.flatMapValues(N_2_Gram)
  elif N == 3:
    # Get (link, 3-grams) for each row
    tf_map_input = raw_in.flatMapValues(N_3_Gram)

  # print('Filtering out empty words')
  # Filter out empty words
  # tf_map_input_cleaned = tf_map_input.filter(lambda tup: tup[0][1] != '')

  print('Getting mapper output ((link, word), 1)')
  # Get ((link, word), 1) for each row
  tf_map_output = tf_map_input.map(lambda x: (x,1))

  print('Reducing to ((link, word), TF)')
  # Reduce on the key and sum up the ones
  # We now have ((link, word), TF)
  tf_red_output = tf_map_output.reduceByKey(lambda a,b: a + b)

  """
  Calculate the Document Frequency
  Mapper Input: ((link, word), TF)
  Mapper Output: (word, 1)
  Reducer Output: (word, DF)

  Mapper Input: ((link, word), TF)
  Mapper Output: ((word, link), (TF, DF))
  """

  print('Mapping (word, 1)')
  # Find the document frequency
  # select distinct words in link_to_word_count
  df_map_output = tf_red_output.map(lambda tup: (tup[0][1], 1))

  print('Reducing to (word, DF)')
  # Sum 1s, to get DF
  df_red_output = df_map_output.reduceByKey(lambda a,b: a + b)

  # Store the DF of each word in a dict
  word_to_df = {}
  
  print('Calling df_red_output.collect()')
  document_frequencies = df_red_output.collect()

  print('Storing word -> DF into dict')
  for word,freq in document_frequencies:
    word_to_df[word] = freq

  # Add the DF to each record
  tf_df_rdd = tf_red_output.map(lambda tup: [tup[0][1], tup[0][0], tup[1], word_to_df[tup[0][1]]])

  """
  Calculate the TF-IDF of word per link (relevance of that word in the body of text for that link)

  TF = Word Frequency in that document
  IDF = log(# of Documents / Document Frequency)
  TF-IDF = TF * IDF

  Mapper Input: [word, link, TF, DF]
  Mapper Output: (word, document, TF-IDF)
  """

  # Calculate the total number of documents
  total_documents = len(content_map)

  # Function for calculating & saving the TF-IDF
  def calculate_TF_IDF(record):
    # input: [word, link, TF, DF]
    # output: (word, link, TF-IDF)
    # Calculate idf
    idf = np.log(total_documents / int(record[3]))
    
    # Calculate tf-idf
    tf_idf = int(record[2]) * idf

    return (record[0], [(record[1], tf_idf)])

  # Calculate the TF-IDF for each word in links
  tf_idf_rdd = tf_df_rdd.map(calculate_TF_IDF)

  """
  For searching, we want to speed up search. We'll have an RDD that is
  mimicking a lookup table (map) that has keys as individual words. 
  The value will be a list of links, sorted by the TF-IDF,
  that is to say, the links in which the word is most semantically relevant will be prioritized.
  We can output this as a JSON of the format:
  {
    "word_i" : [link_0, link_1, ..., link_m],
    ...
    "word_n" : [link_0, link_1, ..., link_m],
  }
  """
  # Function that reduces the individual links (into one large list of links)
  def aggregateLinks(v1, v2):
    if v2 and len(v1) < 100:
      v1.extend(v2)
    return v1

  # Apply the function to get the word -> [(link_0, tf-idf), ..., (link_n, tf-idf)] RDD
  grouped_words = tf_idf_rdd.reduceByKey(aggregateLinks)

  # Sort the list of links by tf-idf (desc)
  def sort_links(record: List[Tuple]):
    return (record[0], sorted(record[1], key=lambda value: value[1], reverse=True))

  # Now we have a lookup table for searching ready!!	
  lookup_table = grouped_words.map(sort_links) # (word, [(link, TF-IDF), (link, TF-IDF)]

  return lookup_table

In [None]:
lookup_1gram = get_lookup(N=1)

Getting mapper output ((link, word), 1)
Reducing to ((link, word), TF)
Mapping (word, 1)
Reducing to (word, DF)
Calling df_red_output.collect()
Storing word -> DF into dict


In [None]:
# Collect() too much memory usage, save it as text file and read it back line by line
lookup_1gram.saveAsTextFile('1gram_lookup_2')

In [None]:
lookup_2gram = get_lookup(N=2)

Calling flatMapValues(NGram)
Getting mapper output ((link, word), 1)
Reducing to ((link, word), TF)
Mapping (word, 1)
Reducing to (word, DF)
Calling df_red_output.collect()
Storing word -> DF into dict


In [None]:
# Collect() too much memory usage, save it as text file and read it back line by line
lookup_2gram.saveAsTextFile('2gram_lookup_2')

In [None]:
lookup_3gram = get_lookup(N=3)

Getting mapper output ((link, word), 1)
Reducing to ((link, word), TF)
Mapping (word, 1)
Reducing to (word, DF)
Calling df_red_output.collect()
Storing word -> DF into dict


In [None]:
# Collect() too much memory usage, save it as text file and read it back line by line
lookup_3gram.saveAsTextFile('3gram_lookup')

In [None]:
import re

# Match this regular expression to find words / links
word_pattern = r'\'\s*[a-zA-Z0-9:\./!@#$%\^&\*\(\)\-_+=~`<>,\.\?/]+\s*\''
word_pattern = r'\'[a-zA-Z0-9:\./!@#$%\^&\*\(\)\-_+=~`<>,\.\?/\s]+\''

def get_map(nGram=1):
  if nGram == 1:
    file_path = '1gram_lookup_2/part-00000'
    word_pattern = r'\'\s*[a-zA-Z0-9:\./!@#$%\^&\*\(\)\-_+=~`<>,\.\?/]+\s*\''
  elif nGram >= 2:
    file_path = '2gram_lookup_2/part-00000'
    word_pattern = r'\'[a-zA-Z0-9:\./!@#$%\^&\*\(\)\-_+=~`<>,\.\?/\s]+\''
  else:
    print('Invalid nGram Value.')
    return {}
  link_pattern = r'\(\'[a-zA-Z0-9:\./!@#$%\^&\*\(\)\-_+=~`<>\.\?/]+\',\s*[\d\.]+\)'

  def extract_word(line):
    prog = re.compile(word_pattern, flags=re.ASCII)
    match = prog.search(line)
    if match:
      return match.group(0)[1:-1]
    return None

  def extract_links(line):
    prog = re.compile(link_pattern)
    matches = prog.findall(line)
    if matches:
      return matches
    return None

  lookup = {}
  with open(file_path, 'r') as lookup_file:
    for line in lookup_file:
      word = extract_word(line)
      if not word:
        continue
      if 'http' in word:
        continue
      
      line = line[2 + len(word) + 3 : ]
      # print(line)
      links = extract_links(line)
      # print(links)
      if not links:
        continue
      
      lookup[word] = links
  return lookup

In [None]:
lookup_1gram = get_map(nGram=1)
# lookup_1gram.values()

In [None]:
lookup_2gram = get_map(nGram=2)
# lookup_2gram.keys()
# lookup_2gram.values()

In [None]:
lookup_3gram = get_map(nGram=3)

In [None]:
# Write it out to a JSON file that can be loaded as a python dict in the search function
with open('json_files/lookup_1gram.json', 'w') as lookup_json:
	json.dump(lookup_1gram, lookup_json, indent=4)

with open('json_files/lookup_2gram.json', 'w') as lookup_json:
	json.dump(lookup_2gram, lookup_json, indent=4)
 
 with open('json_files/lookup_3gram.json', 'w') as lookup_json:
	json.dump(lookup_2gram, lookup_json, indent=4)