In [65]:
!pip install -q pyspark

In [66]:
# if letters are combined with numbers, get rid of the numbers

In [67]:
from pyspark import SparkContext, SparkConf
from pyspark.sql import SparkSession

conf = SparkConf().setAppName('SparkWordCount') \
                  .set("spark.dynamicAllocation.enabled", "true") \
                  .set("spark.executor.memory", "4g") \
                  .set("spark.executor.cores", "2")

sc = SparkContext.getOrCreate(conf=conf)

sqlContext = SparkSession.builder \
        .master("local") \
        .appName("Colab") \
        .config('spark.ui.port', '4050') \
        .getOrCreate()

In [68]:
from nltk.stem.lancaster import LancasterStemmer
import nltk
nltk.download('stopwords')
from nltk.corpus import stopwords

st = LancasterStemmer()

[nltk_data] Downloading package stopwords to /root/nltk_data...
[nltk_data]   Package stopwords is already up-to-date!


In [69]:
from google.colab import drive
drive.mount('/content/drive')

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).


In [106]:
path_to_chunks = '/content/drive/MyDrive/Colab Notebooks/wiki2022'

text_files_rdd = sc.wholeTextFiles(path_to_chunks)

file_names_list = text_files_rdd.keys().map(lambda path: os.path.basename(path)).collect()

print(file_names_list)

['wiki2022_small.000000', 'wiki2022_small.000002', 'wiki2022_small.000001', 'wiki2022_small.000005', 'wiki2022_small.000004', 'wiki2022_small.000003', 'wiki2022_small.000008', 'wiki2022_small.000007', 'wiki2022_small.000009', 'wiki2022_small.000006', 'wiki2022_small.000010', 'wiki2022_small.000012', 'wiki2022_small.000011', 'wiki2022_small.000014', 'wiki2022_small.000013', 'wiki2022_small.000017', 'wiki2022_small.000015', 'wiki2022_small.000016', 'wiki2022_small.000020', 'wiki2022_small.000019', 'wiki2022_small.000018', 'wiki2022_small.000021', 'wiki2022_small.000022', 'wiki2022_small.000024', 'wiki2022_small.000023', 'wiki2022_small.000025', 'wiki2022_small.000028', 'wiki2022_small.000027', 'wiki2022_small.000026', 'wiki2022_small.000029', 'wiki2022_small.000031', 'wiki2022_small.000030']


In [71]:
import os

path_to_chunks = '/content/drive/MyDrive/Colab Notebooks/testing_files'

text_files_rdd = sc.wholeTextFiles(path_to_chunks)

# Extract just the filenames (keys) from RDD, extract basenames, and collect as a list
file_names_list = text_files_rdd.keys().map(lambda path: os.path.basename(path)).collect()

# Print the list of file names
print(file_names_list)

['doc1.txt', 'doc2.txt', 'doc3.txt']


In [72]:
from dateutil.parser import parse
import datetime
import re

# getting dates in consistent format

def _fix_date(date):
  try:
    bool(parse(date))
    dt = parse(date, default=datetime.datetime(300, 1, 1))
    if dt.year != 300:
        date = dt.strftime("%B %d %Y")
    else:
        date = dt.strftime("%B %d")
    return date
  except:
    return date

# getting rid of special chararacters

def remove_special_chars(word):
  if not word.isalpha():
     word = re.sub(r'[^\w\s]', '', word)
  return word

# creating document ids

def extract_id(doc):
  match = re.search(r"curid=(\d+)", doc)
  if match:
      return match.group(1)
  else:
      return None

# recognizing dates

month = r"([Jj]anuary|[Ff]ebruary|[Mm]arch|[Aa]pril|[Mm]ay|[Jj]une|[Jj]uly|[Aa]ugust|[Ss]eptember|[Oo]ctober|[Nn]ovember|[Dd]ecember)"
day_and_year = r"\s(\d{1,4})(?:st|nd|rd|th)?,?\s?(\d{4})?"
#day_and_year = r"\s(\d{1,4}),?\s?(\d{4})?"

expression = f'({month}{day_and_year})'

slashed_dates = r"\b(\d{1,2})/(0?\d{1,2}|1[0-2])/(?:\d{2}|\d{4})\b"
slashed_expression = f'({slashed_dates})'

date_expression = expression + r"|" + slashed_expression

# stop words

stop_words = stopwords.words("english")
stop_words.append('')
stop_words.append('also')

In [73]:
#input_file = input_file.sample(True, .001)

In [74]:
text_files_rdd.collect()

[('file:/content/drive/MyDrive/Colab Notebooks/testing_files/doc1.txt',
  'https://en.wikipedia.org/wiki?curid=1770 my name is charlie and i love cheese                                                                 \r\nhttps://en.wikipedia.org/wiki?curid=1773 the wolf jumped over the cheese barn\r\nhttps://en.wikipedia.org/wiki?curid=1769 cheese curds'),
 ('file:/content/drive/MyDrive/Colab Notebooks/testing_files/doc2.txt',
  'https://en.wikipedia.org/wiki?curid=1746 the cheese broke down the oregano\r\nhttps://en.wikipedia.org/wiki?curid=1892 what is up seasoning with this seasoning'),
 ('file:/content/drive/MyDrive/Colab Notebooks/testing_files/doc3.txt',
  'https://en.wikipedia.org/wiki?curid=124 my lunch is delicious oregano\r\nhttps://en.wikipedia.org/wiki?curid=196 the oregano cheese is smelling like oregano')]

In [75]:
def preprocessing(doc):
  matches = re.findall(date_expression, doc[0])
  dates = [match[0] for match in matches if match[0]]

  words, docID = doc
  print(type(words))
  words = words.split()[1:]
  for date in dates:
    words.append(_fix_date(date))

  return [(st.stem(remove_special_chars(word)),
           docID,
           1) for word in words]

In [76]:
def local_inverted_index(input_file):
  document = input_file.map(lambda x: (x, extract_id(x)))

  docIDs = document.flatMap(lambda doc: preprocessing(doc))\
  .sortBy(lambda x: x)
#.filter(lambda x: x[0] not in stop_words)\
#.sortBy(lambda x: x)

  combining_docs = docIDs.map(lambda x: ((x[0], x[1]), x[2]))\
.reduceByKey(lambda x, y: x + y)\
.map(lambda x: (x[0][0], x[0][1], x[1]))

  inverted_index = combining_docs.map(lambda x: (x[0], [(x[1], x[2])], )) \
            .reduceByKey(lambda x, y: x + y) \
            .map(lambda x: (x[0], x[1]))

  return inverted_index

In [None]:
def local_inverted_index(input_file):
    # Mapping each line to document ID
    document = input_file.map(lambda x: (x, extract_id(x)))

    # FlatMapping and sorting documents
    docIDs = document.flatMap(lambda doc: preprocessing(doc)).sortBy(lambda x: x)

    # Combining documents and counting occurrences
    inverted_index = docIDs.map(lambda x: ((x[0], x[1]), x[2])) \
                           .reduceByKey(lambda x, y: x + y) \
                           .map(lambda x: (x[0][0], [(x[0][1], x[1])])) \
                           .reduceByKey(lambda x, y: x + y)

    return inverted_index


In [89]:
from math import log2

def inverse_doc_freq(doc_num, doc_freq):
  return log2(doc_num / doc_freq)

In [None]:
import os

path_to_chunks = '/content/drive/MyDrive/Colab Notebooks/testing_files'

text_files_rdd = sc.textFile(path_to_chunks + "/test*")

In [103]:
import shutil
shutil.rmtree('index2')

In [107]:
list_of_rdds = []
local_inv_indexes = []

for file_name in file_names_list:
  input_file = f'{path_to_chunks}/{file_name}'
  print(input_file)
  input_file = sc.textFile(input_file)

  inv_index = local_inverted_index(input_file)

  list_of_rdds.append((inv_index, input_file.count()))

/content/drive/MyDrive/Colab Notebooks/wiki2022/wiki2022_small.000000
/content/drive/MyDrive/Colab Notebooks/wiki2022/wiki2022_small.000002
/content/drive/MyDrive/Colab Notebooks/wiki2022/wiki2022_small.000001
/content/drive/MyDrive/Colab Notebooks/wiki2022/wiki2022_small.000005
/content/drive/MyDrive/Colab Notebooks/wiki2022/wiki2022_small.000004
/content/drive/MyDrive/Colab Notebooks/wiki2022/wiki2022_small.000003
/content/drive/MyDrive/Colab Notebooks/wiki2022/wiki2022_small.000008
/content/drive/MyDrive/Colab Notebooks/wiki2022/wiki2022_small.000007
/content/drive/MyDrive/Colab Notebooks/wiki2022/wiki2022_small.000009
/content/drive/MyDrive/Colab Notebooks/wiki2022/wiki2022_small.000006
/content/drive/MyDrive/Colab Notebooks/wiki2022/wiki2022_small.000010
/content/drive/MyDrive/Colab Notebooks/wiki2022/wiki2022_small.000012
/content/drive/MyDrive/Colab Notebooks/wiki2022/wiki2022_small.000011
/content/drive/MyDrive/Colab Notebooks/wiki2022/wiki2022_small.000014
/content/drive/MyDri

In [None]:
import concurrent.futures

list_of_rdds = []
local_inv_indexes = []

def process_file(file_name):
    input_file = f'{path_to_chunks}/{file_name}'
    print(input_file)
    input_file = sc.textFile(input_file)
    inv_index = local_inverted_index(input_file)
    return inv_index, input_file.count()

# Process files concurrently using ThreadPoolExecutor
with concurrent.futures.ThreadPoolExecutor() as executor:
    futures = [executor.submit(process_file, file_name) for file_name in file_names_list]
    for future in concurrent.futures.as_completed(futures):
        list_of_rdds.append(future.result())

# Now list_of_rdds contains tuples of (inv_index, count) for each file


In [None]:
def merge_rdds(rdd1, rdd2):
  return rdd1.union(rdd2)

merged = list_of_rdds[0][0]

for i in range(1, len(list_of_rdds)):
  merged = merge_rdds(merged, list_of_rdds[i][0])

word_codes_rdd = merged.reduceByKey(lambda x, y: x)\
.sortByKey().zipWithIndex().map(lambda x: (x[0][0], x[1]))

word_codes_rdd.collect()

In [104]:
num = 0

for inv_index in list_of_rdds:

  doc_count = inv_index[1]

  inv_index = inv_index[0].join(word_codes_rdd)

  inv_index = inv_index.map(lambda x: (x[1][1],
                                       x[0],
                                       len(x[1][0]),
                                       x[1][0],
                                       sum([tf[1] for tf in x[1][0]]),
                                       None))\
                                       .map(lambda x: (x[0], x[1], x[2], x[3], x[4],
                                                       x[4] * inverse_doc_freq(doc_count, x[2])))\
                                       .sortBy(lambda x: x[5])

  inv_index.collect()

  #match = re.search(r"\.(.*)", file_name)

  #if match:
  #    matched_part = match.group(1)



  inv_index_single_partition = inv_index.coalesce(1)
  output_file_path = 'index' + str(num)
  inv_index_single_partition.saveAsTextFile(output_file_path)
  num += 1
  break