# <span style="color:#E8846F">Dictionary Inverted Index <a name="id6"></a>
   Position: Junior Data Eng at Glofox 
   Asigment: Build a inverted index  
   Candidate: Jacky Barraza  

This notebook is a key part of the requirement done by Sam to Jon Snow. The main goal of the whole project is to build a friendly solution to search for information in documents from keywords. 
    
Here is presented a program to create the inverted index of documents. Because Sam commneted to Jon about the huge collection in the Castle Black's, Jon Snow design a system to transfer the documents to hdfs, which will allow handling big data through the distributed file system and speed up the search. 
    
The result of this process is saved in a document in a directory output directory in hdfs. 
    
As part of getting a better job, Jon Snow has some ideas for improving the process. This is not developed in this notebook but is mentioned for tracking improvements ideas.
    
*The code can be implemented also doing streaming using Kafka or Nifi. Once the connection is done in a notebook running spark streaming, the process will get the new information added to the path in hdfs (new main repository of documents). The new data will be analyzed and **the dictionary of the inverted index** will be updated. The output is proposed to  be saved in a database such as MongoDB, in a bucket or where the data lake is located.*


### Libraries

In [1]:
import os
import pyspark
from pyspark.sql.types import *
import pyspark.sql.functions as func
from pyspark.sql.functions import collect_list
from pyspark.ml.feature import Tokenizer, StopWordsRemover
from pyspark.sql.functions import udf, col, lower, regexp_replace
from pyspark.sql.functions import row_number
from pyspark.sql.window import Window
import nltk
nltk.download('stopwords')
from nltk.corpus import stopwords
stop = stopwords.words('english')


[nltk_data] Downloading package stopwords to /home/ubuntu/nltk_data...
[nltk_data]   Package stopwords is already up-to-date!


### Creating the Spark Context

In [2]:
spark = pyspark.sql.SparkSession.builder\
    .appName('inverted_dictionary')\
    .getOrCreate()


### Loading documents 

In [3]:
def extractFilePath(path):
    '''
    Extract the file path
    '''
    base = os.path.basename(path)
    return os.path.splitext(base)[0]

#taking only the name of files

udfFileName = func.udf(lambda f: extractFilePath(f))

In [4]:
#reading the path with the files and taking only the name of the files for the df
book_lines = spark.read.text('hdfs://localhost:9000/Users/jackyb/PycharmProjects/4_Inverted_Index_dataset/data/input/dataset_test')\
    .withColumn('book', udfFileName(func.input_file_name()))
book_lines.show()

+--------------------+----+
|               value|book|
+--------------------+----+
|Project Gutenberg...|   0|
|This is our 3rd e...|   0|
|                    |   0|
|                    |   0|
|Copyright laws ar...|   0|
|the copyright law...|   0|
|                    |   0|
|Please take a loo...|   0|
|We encourage you ...|   0|
|electronic path o...|   0|
|                    |   0|
|                    |   0|
|**Welcome To The ...|   0|
|                    |   0|
|**Etexts Readable...|   0|
|                    |   0|
|*These Etexts Pre...|   0|
|                    |   0|
|Information on co...|   0|
|further informati...|   0|
+--------------------+----+
only showing top 20 rows



In [5]:
#Creating a dataframe made of words and files name. 
#For this, we expands the array output from the split function with the func explode
#fun explode made a similar process to flatmap from the RDD
#func lenght is taking words > to 2 characters to avoid some of the stopwords

book_words = book_lines\
    .select('book', func.explode(func.split('value', '[^a-zA-Z]+')).alias('word'))\
    .where((func.length('word') > 2))\
    .select('book', func.trim(func.lower(func.col('word'))).alias('word'))

book_words.show(10) 

+----+-----------+
|book|       word|
+----+-----------+
|   0|    project|
|   0|  gutenberg|
|   0|      etext|
|   0|shakespeare|
|   0|      first|
|   0|      folio|
|   0|      plays|
|   0|       this|
|   0|        our|
|   0|    edition|
+----+-----------+
only showing top 10 rows



In [6]:
#creating a df which agroup by word and create a list of all the doc where the word was found and sorting it. 

dict = book_words.distinct().groupby('word')\
       .agg(func.sort_array(collect_list('book')).alias('book'))

In [7]:
dict.show(15)

+--------------+--------------------+
|          word|                book|
+--------------+--------------------+
|        abazai|                 [2]|
|    abruptness|     [11, 15, 43, 6]|
|  accumulation|[0, 1, 15, 18, 2,...|
|       acheron|              [0, 3]|
|       acidity|                 [6]|
|      affixing|                [18]|
|alimentiveness|                [31]|
|     ammonites|            [23, 34]|
|      antennae|        [18, 28, 34]|
|     antiphony|                [41]|
|     apathaton|                 [0]|
| apprehensions|[0, 1, 13, 15, 26...|
|arctopithecine|                [18]|
|       argueil|                [26]|
|     arguments|[0, 1, 10, 11, 12...|
+--------------+--------------------+
only showing top 15 rows



In [8]:
# creating a index column, so as we have unique words, the index will correspond a that word ordered alphabetic desc.

w = Window().orderBy("word")

df = dict.select(row_number().over(w).alias("word_idx"), col("*"))

In [9]:
df.show()

+--------+-----------+--------------------+
|word_idx|       word|                book|
+--------+-----------+--------------------+
|       1|     aachen|                [21]|
|       2|        aah|            [14, 41]|
|       3|     aahmes|                 [8]|
|       4|   aanaware|                [27]|
|       5|     aaraaf|            [30, 34]|
|       6|   aarenias|                [27]|
|       7|      aaron|[0, 1, 12, 19, 20...|
|       8|     aarons|                 [0]|
|       9|        aba|                 [9]|
|      10|      aback|[11, 12, 13, 15, ...|
|      11|     abacus|                [23]|
|      12|    abaddon|                 [9]|
|      13|      abaft|             [4, 43]|
|      14|      abana|             [2, 44]|
|      15|    abandon|[0, 1, 11, 12, 15...|
|      16|  abandoned|[0, 1, 11, 12, 15...|
|      17| abandoning|[1, 10, 11, 17, 2...|
|      18|abandonment|[12, 2, 21, 22, 2...|
|      19|   abandons|                 [1]|
|      20|     abaout|          

In [10]:
#selecting only the filds need in another df
inv_dictionary = df.select('word_idx', 'book').show(10)


+--------+--------------------+
|word_idx|                book|
+--------+--------------------+
|       1|                [21]|
|       2|            [14, 41]|
|       3|                 [8]|
|       4|                [27]|
|       5|            [30, 34]|
|       6|                [27]|
|       7|[0, 1, 12, 19, 20...|
|       8|                 [0]|
|       9|                 [9]|
|      10|[11, 12, 13, 15, ...|
+--------+--------------------+
only showing top 10 rows

