# Toy Search Engine in Spark
Create a toy search engine using the Map & Reduce functions you have written in Google colab notebook in Spark. Use map, flatMap, groupByKey and reduceByKey functions to achieve it. Your code should perform the following:

0. Punctuation removal
1. Lemmatization
2. Stemming
3. Count 



In [None]:
#get Java
!apt-get install openjdk-8-jdk-headless -qq 


#get Spark
!wget https://mirrors.estointernet.in/apache/spark/spark-3.0.1/spark-3.0.1-bin-hadoop2.7.tgz 



#extract Spark
!tar xf spark-3.0.1-bin-hadoop2.7.tgz



#get FindSpark for notebook
!pip install -q findspark

In [None]:
#set env for Java and Spark
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-3.0.1-bin-hadoop2.7"

In [None]:
#run findspark
import findspark
findspark.init()


# import pyspark
from pyspark import SparkContext, SparkConf
from pyspark.sql import SparkSession


# create the session
conf = SparkConf()

# create the context
sc = SparkContext.getOrCreate(conf=conf)
spark = SparkSession.builder.getOrCreate()

In [None]:
#get 20newbytes
!wget http://qwone.com/~jason/20Newsgroups/20news-18828.tar.gz

#extract 20newsbytes
!tar xzf 20news-18828.tar.gz

#consolidate
!cat 20news-18828/*/* > DataFile.txt



In [None]:
#read directory of files,and each file read as single record and it returned in a key value pair and each key represent path of file and value represent content of each file

rddWhole = spark.sparkContext.wholeTextFiles("/content/20news-18828/*")
rddWhole.collect()

#1.Punctuation removal


In [None]:
#function for removing punctuation

import re,string

def remove_punc(x):
  res = re.sub(r'[^\w\s]', ' ', x)
  return res

one_RDD=rddWhole.mapValues(remove_punc)
one_RDD.take(10)

[('file:/content/20news-18828/rec.sport.hockey/54507',
  'From  etxonss ufsa ericsson se  Staffan Axelsson \nSubject  WC 93  Scores and standings  April 25\n\n\n 1993 World Championships in Germany \n                                     \n\n  Group A standings  Munich             Group B standings  Dortmund \n                                                                    \n\n             GP  W T L  GF GA      P                  GP  W T L  GF GA      P\n\n  Canada      5  5 0 0  31  4  27 10    Czech republic 5  4 1 0  17  4  13  9\n  Sweden      5  3 0 2  17 14   3  6    Germany        5  4 0 1  20 12   8  8\n  Russia      5  2 1 2  15 12   3  5    USA            5  2 2 1  14 10   4  6\n  Switzerland 5  2 0 3  11 14   3  4    Finland        5  2 1 2   7  7   0  5\n                                                                             \n  Italy       4  1 1 2   7 19  12  3    Norway         4  0 0 4   1 13  12  0\n  Austria     4  0 0 4   3 21  18  0    France         4  0 0 


# 2.Stemming


In [None]:
#function for steming
from nltk.stem.porter import PorterStemmer
porter_stemmer = PorterStemmer()
def stemmming(x):
  char=""

  for w in x.split():
    porter_stemmer = PorterStemmer()
    char=char+" " + porter_stemmer.stem(w)
  return char


one_RDD=one_RDD.mapValues(stemmming)
one_RDD.take(10)

[('file:/content/20news-18828/rec.sport.hockey/54507',
  ' from etxonss ufsa ericsson se staffan axelsson subject WC 93 score and stand april 25 1993 world championship in germani group A stand munich group B stand dortmund GP W T L GF GA P GP W T L GF GA P canada 5 5 0 0 31 4 27 10 czech republ 5 4 1 0 17 4 13 9 sweden 5 3 0 2 17 14 3 6 germani 5 4 0 1 20 12 8 8 russia 5 2 1 2 15 12 3 5 usa 5 2 2 1 14 10 4 6 switzerland 5 2 0 3 11 14 3 4 finland 5 2 1 2 7 7 0 5 itali 4 1 1 2 7 19 12 3 norway 4 0 0 4 1 13 12 0 austria 4 0 0 4 3 21 18 0 franc 4 0 0 4 6 19 13 0 april 18 itali russia 2 2 norway germani 0 6 sweden austria 1 0 usa czech republ 1 1 april 19 canada switzerland 2 0 russia austria 4 2 finland franc 2 0 april 20 sweden canada 1 4 czech republ germani 5 0 switzerland itali 0 1 finland usa 1 1 april 21 germani franc 5 3 itali sweden 2 6 czech republ norway 2 0 april 22 switzerland russia 0 6 usa franc 6 1 austria canada 0 11 norway finland 0 2 april 23 switzerland austria 5 1 germ


#3.Lemmatization


In [None]:
#Lemmatization
import nltk
nltk.download('punkt')
nltk.download('wordnet')
from nltk.stem import WordNetLemmatizer
wordnet_lemmatizer = WordNetLemmatizer()

#function for remove Lemmatization
def lemmatization(x):
  char=""
  for w in x.split():
    wordnet_lemmatizer = WordNetLemmatizer()
    char=char + " "+wordnet_lemmatizer.lemmatize(w)
  return char


one_RDD=one_RDD.mapValues(lemmatization)
one_RDD.take(10)

[nltk_data] Downloading package punkt to /root/nltk_data...
[nltk_data]   Package punkt is already up-to-date!
[nltk_data] Downloading package wordnet to /root/nltk_data...
[nltk_data]   Package wordnet is already up-to-date!


[('file:/content/20news-18828/rec.sport.hockey/54507',
  ' from etxonss ufsa ericsson se staffan axelsson subject WC 93 score and stand april 25 1993 world championship in germani group A stand munich group B stand dortmund GP W T L GF GA P GP W T L GF GA P canada 5 5 0 0 31 4 27 10 czech republ 5 4 1 0 17 4 13 9 sweden 5 3 0 2 17 14 3 6 germani 5 4 0 1 20 12 8 8 russia 5 2 1 2 15 12 3 5 usa 5 2 2 1 14 10 4 6 switzerland 5 2 0 3 11 14 3 4 finland 5 2 1 2 7 7 0 5 itali 4 1 1 2 7 19 12 3 norway 4 0 0 4 1 13 12 0 austria 4 0 0 4 3 21 18 0 franc 4 0 0 4 6 19 13 0 april 18 itali russia 2 2 norway germani 0 6 sweden austria 1 0 usa czech republ 1 1 april 19 canada switzerland 2 0 russia austria 4 2 finland franc 2 0 april 20 sweden canada 1 4 czech republ germani 5 0 switzerland itali 0 1 finland usa 1 1 april 21 germani franc 5 3 itali sweden 2 6 czech republ norway 2 0 april 22 switzerland russia 0 6 usa franc 6 1 austria canada 0 11 norway finland 0 2 april 23 switzerland austria 5 1 germ

In [None]:
#spliting word
final_RDD=one_RDD.flatMapValues(lambda x: x.split())
final_RDD.take(10)

[('file:/content/20news-18828/rec.sport.hockey/54507', 'from'),
 ('file:/content/20news-18828/rec.sport.hockey/54507', 'etxonss'),
 ('file:/content/20news-18828/rec.sport.hockey/54507', 'ufsa'),
 ('file:/content/20news-18828/rec.sport.hockey/54507', 'ericsson'),
 ('file:/content/20news-18828/rec.sport.hockey/54507', 'se'),
 ('file:/content/20news-18828/rec.sport.hockey/54507', 'staffan'),
 ('file:/content/20news-18828/rec.sport.hockey/54507', 'axelsson'),
 ('file:/content/20news-18828/rec.sport.hockey/54507', 'subject'),
 ('file:/content/20news-18828/rec.sport.hockey/54507', 'WC'),
 ('file:/content/20news-18828/rec.sport.hockey/54507', '93')]

#Searching user input word 

In [None]:
search_word=input("enter a word : ")
#filter the search word and store 
filtered_RDD=final_RDD.filter(lambda x: search_word in  x[1])
wordcount_data=filtered_RDD.map(lambda x: (x,1)).reduceByKey(lambda x, y: (x+y))
#sorting desending order using count 
sorted_data=wordcount_data.sortBy(lambda x:x[1],False)
#collect 3 file with maximum count
sorted_data.take(3)

enter a word : subject


[(('file:/content/20news-18828/sci.electronics/53569', 'subject'), 50),
 (('file:/content/20news-18828/comp.windows.x/66422', 'subject'), 40),
 (('file:/content/20news-18828/comp.windows.x/67882', 'subject'), 37)]