In [None]:
# installing open java development kit
!apt-get install openjdk-8-jdk-headless -qq > /dev/null

# getting spark package
!wget -q https://archive.apache.org/dist/spark/spark-3.2.0/spark-3.2.0-bin-hadoop3.2.tgz

# unzipping
!tar xf spark-3.2.0-bin-hadoop3.2.tgz

# Installing findspark
!pip install -q findspark

In [None]:
# importing OS
import os

# setting java environment
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"

# setting spark environment
os.environ["SPARK_HOME"] = "spark-3.2.0-bin-hadoop3.2"

In [None]:
# importing spark
import findspark

# initializing spark
findspark.init()

In [None]:
# importing spark session
from pyspark.sql import SparkSession

# creating spark session
spark = SparkSession.builder.master("local[*]").getOrCreate()

# getting sparkcontext from spark
sc = spark.sparkContext

# print
sc

In [None]:
# importing drive from google colab
from google.colab import drive

# mounting drive
drive.mount('/content/gdrive')

Mounted at /content/gdrive


In [None]:
# path to the text files folder
path = '/content/gdrive/MyDrive/data/assignment_1_textfiles/'

In [None]:
# reading stopwords
stopwords = sc.textFile(path + 'stopwords.txt')

In [None]:
# collecting the stopwords into a list from an rdd
stopwords_list = stopwords.collect()

In [None]:
# defining a function called "Index_lambda"
def Index_lambda(path,stopwords_list):

  # initializing the all_words dictionary to store the results
  all_words = {}

  # looping through each file in the above path
  for file_name in os.listdir(path):

    # Checking if the file name matches the expected pattern
    if file_name.endswith('.txt.txt'):
      file_number = file_name[:-8]
    else:
      continue

    # reading the file content
    file_content = sc.textFile(os.path.join(path, file_name))

    # process the file content
    # splitting the words and converting them to lower case
    rdd1 = file_content.flatMap(lambda x: x.lower().split())

    # filtering the words and removing stopwords
    rdd1 = rdd1.filter(lambda x:x not in stopwords_list)

    # mapping 1 to all the words
    mapped_Rdd = rdd1.map(lambda x: (x, 1))

    # counting the keys
    count_rdd = mapped_Rdd.countByKey()



    # formating the output
    for word, count in count_rdd.items():

        # appending the all words together
        if word in all_words:
            all_words[word].append((file_number, count))
        else:
            all_words[word] = [(file_number, count)]

  return all_words

In [None]:
# calling the function
dictionary_lambda = Index_lambda(path, stopwords_list)

In [None]:
#print
dictionary_lambda

In [None]:
# defining a function called "Index_lambda"
def Index_without_lambda(path,stopwords_list):

  # initializing the all_words dictionary to store the results
  all_words = {}

  # looping through each file in the above path
  for file_name in os.listdir(path):

    # Checking if the file name matches the expected pattern
    if file_name.endswith('.txt.txt'):
      file_number = file_name[:-8]
    else:
      continue

    # reading the file content
    file_content = sc.textFile(os.path.join(path, file_name))

    # process the file content
    # splitting the words and converting them to lower case
    def lower_split(x):
      return x.lower().split()

    rdd1 = file_content.flatMap(lower_split)

    # filtering the words and removing stopwords
    def filter_words(x):
      return x not in stopwords_list

    rdd1 = rdd1.filter(filter_words)

    # mapping 1 to all the words
    def mapping(x):
      return(x,1)
    mapped_Rdd = rdd1.map(mapping)

    # counting the keys
    count_rdd = mapped_Rdd.countByKey()

    # formating the output
    for word, count in count_rdd.items():

        # appending the all words together
        if word in all_words:
            all_words[word].append((file_number, count))
        else:
            all_words[word] = [(file_number, count)]

  return all_words

In [None]:
# calling the function
dictionary_without_lambda = Index_without_lambda(path, stopwords_list)

In [None]:
# print
dictionary_without_lambda

In [None]:
# taking the query from user input
query = input("Enter your query:")

Enter your query:many years


In [None]:
# defining search function
# takes query, index, stopwords_list as argument
def search(query, index, stopwords_list):

  # spliting the query
  split_query = query.split()

  # converting the split query into an rdd
  rdd_query = sc.parallelize(split_query)

  # mapping them to lower case
  query_words = rdd_query.map(lambda x: x.lower())

  # filtering the words from stopwords
  filtered = query_words.filter(lambda x:x not in stopwords_list)

  # creating an empty dictionary to store the scores
  doc_scores = {}

  # looping through each word in the filtered query
  for i in filtered.collect():

      # checking if the word is in index
      if i in index:

          # looping through elements of each index
          for id, frequency in index[i]:

              # if id is in doc_scores dictionary add the frequency of the word
              if id in doc_scores:
                  doc_scores[id] += frequency

              # else add the id and frequency to the dictionary
              else:
                  doc_scores[id] = frequency

  # Filter documents with scores greater than zero
  result = [(id, score) for id, score in doc_scores.items() if score > 0]

  # Sort the result in descending order of scores
  result.sort(key=lambda x: x[1], reverse=True)

  # returning it as
  return {query:result}

In [None]:
#calling the search function on a query
search(query,dictionary_lambda,stopwords_list)

{'many years': [('16', 2),
  ('08', 2),
  ('14', 2),
  ('18', 2),
  ('12', 1),
  ('05', 1),
  ('07', 1),
  ('09', 1),
  ('15', 1)]}

In [None]:
#calling the search function on a query
search(query,dictionary_without_lambda,stopwords_list)

{'many years': [('16', 2),
  ('08', 2),
  ('14', 2),
  ('18', 2),
  ('12', 1),
  ('05', 1),
  ('07', 1),
  ('09', 1),
  ('15', 1)]}

In [None]:
coordinates = sc.parallelize([(1,2), (3,4), (0,0)])

In [None]:
coordinates.collect()

[(1, 2), (3, 4), (0, 0)]

In [None]:
def is_within_bounded_box(point):
    x, y = point
    return abs(x) <= 1 and abs(y) <= 1

In [None]:
points_within_box = coordinates.filter(is_within_bounded_box)

In [None]:
count_within_box = points_within_box.count()

In [None]:
print("Number of points within the bounded box:", count_within_box)

Number of points within the bounded box: 1
