In [1]:
import pandas as pd
import numpy as np
import findspark
import glob
import os
import re
import nltk
import pickle
import spacy
# nltk.download('stopwords')
# nltk.download('punkt')
# nltk.download('wordnet')

# Inverted Index

Solo correr este capitulo si no hay inverted index creado. Se asume que se tiene spark 2.4.5 instalado.

## Inicialización de SPARK

In [20]:
import findspark
localizacion_spark = '/opt/spark-2.4.5' # lugar donde tenga instalado spark
findspark.init(localizacion_spark)

import pyspark
from pyspark import SparkConf
from pyspark.sql import SparkSession

# numero de cores: 4, memoria ram que se le permite a spark usar: 7GB
spark_configurations = SparkConf()\
    .setMaster('local[4]')\
    .setAppName('Tarea_1')\
    .set("spark.driver.memory", "7g")

sc = pyspark.SparkContext(conf = spark_configurations)

spark = SparkSession\
    .builder\
    .master('local[4]')\
    .appName("Tarea_1") \
    .getOrCreate()

## Creación del índice

In [21]:
def documentReaderSpark(data_path, sparkContext):
    """
    Reads the documents using the RDD format of Spark. Each partition of the database 
    is a single document.
    :param data_path: path of the folder where all the documents are located
    :param sparkContext: object SparkContext() initialized
    :return: RDD of the documents
    """
    documents = sc\
        .wholeTextFiles(data_path,
                        minPartitions=None, 
                        use_unicode=True)\
        .map(lambda s: (re.search('<public publicId="(.*?)" uri="(.*?)" />',s[1]).group(1),
                        s[1].replace("\n","")\
                            .replace("\xa0"," "))
            )\
        .map(lambda s: (int(s[0].replace('d','')),re.search('<raw><!\[CDATA\[(.*?)\]\]></raw>',s[1]).group(1)))
    # print(documents.collect()[0])
    return documents

In [22]:
def tokenizationSpark(documents_rdd, use_spacy=False):
    """
    Tokenizes, removes stop words, normalizes and lemmatizes the documents
    :param documents_rdd:RDD of the documents
    :param use_spacy: Boolean used to specify the use of the package Spacy. By default uses
    nltk
    :return: RDD of term and corresponding document
    """
    if use_spacy:
        nlp_spacy_en = spacy.load('en_core_web_sm')
        nltk_lemmaList = documents_rdd\
            .map(lambda s : (s[0], nlp_spacy_en(s[1])))\
            .flatMap(lambda s : [(lemma,s[0]) for lemma in [token.lemma_ for token in s[1]]
                                 if nlp_spacy_en.vocab[lemma].is_stop == False
                                 and nlp_spacy_en.vocab[lemma].is_punct == False])\
            .map(lambda t : ((t[0], t[1]),1))
    else:
        nltk_stop_words_en = set(nltk.corpus.stopwords.words("english"))
        p_stemmer = nltk.stem.porter.PorterStemmer()
        wordnet_lemmatizer = nltk.stem.WordNetLemmatizer()

        nltk_lemmaList = documents_rdd\
            .map(lambda s : (s[0], [word for word in nltk.word_tokenize(s[1]) 
                                    if word.isalnum()]))\
            .flatMap(lambda s : [(token,s[0]) for token in s[1] 
                                 if token not in nltk_stop_words_en])\
            .map(lambda s : ((wordnet_lemmatizer.lemmatize(s[0]), s[1]),1))
    
    # print(nltk_lemmaList.filter(lambda x : 223==x[0][1]).collect())
    return nltk_lemmaList


In [23]:
def makeInvertedIndexSpark(tokenized_documents_rdd):
    """
    Creates and saves the inverted index in pickle format. Additionally this implementation calculates
    term frecuency.
    :param tokenized_documents_rdd:RDD of the terms
    :return: Dictionary of the inverted index
    """
    inverted_index = tokenized_documents_rdd\
        .reduceByKey(lambda a, b : a+b )\
        .map(lambda s : (s[0][0], [[s[0][1],s[1]]]))\
        .reduceByKey(lambda a, b : sorted(a+b) )\
        .sortBy(lambda s : s[0])\
        .map(lambda s :  { s[0]: {'freq' : len(s[1]), 'posting':s[1]}})\
        .collect()
    respuesta = {}
    for item in inverted_index:
        respuesta.update(item)
    with open(os.path.join('docs','inverted_index_lemma.pkl'), 'wb') as handle:
        pickle.dump(respuesta, handle, protocol=pickle.HIGHEST_PROTOCOL)
    return respuesta


In [24]:
# se crea el indice
documents_path = os.path.join('docs', 'docs-raw-texts')
documents = documentReaderSpark(documents_path, sc)
tokenized_docs = tokenizationSpark(documents)
inverted_index = makeInvertedIndexSpark(tokenized_docs)
# se termina la sesion de spark
sc.stop()

# Binary Search using Inverted Index

In [7]:
def lectura_inverted_index(path):
    """
    Reads the inverted index given in the path
    :param path: path of the inverted index in pickle format
    :return: Dictionary of the inverted index
    """
    with open(path, 'rb') as handle:
        unserialized_data = pickle.load(handle)
    return unserialized_data
def obtener_total_documentos(inverted_index):
    """
    Calculates the number of documents used in the inverted index
    :param inverted_index: Dictionary of the inverted index 
    :return: Number of documents
    """
    res = max([max([id_doc[0] for id_doc in inverted_index[term]['posting']]) 
     for term in inverted_index.keys()])
    return res

In [9]:
def merge_and(term1_docs, freq1, term2_docs, freq2):
    """
    Calculates the intersection between two lists of documents using the 'merge' algorithm.
    :param term1_docs: Sorted ascending list of integers which represents the list of 
    documents of the first term in the inverted index
    :param freq1: Number of documents the first term appears 
    :param term2_docs: Sorted ascending list of integers which represents the list of 
    documents of the second term
    :param freq2: Number of documents the second term appears  
    :return: Sorted ascending list of integers of the intersection.
    """
    ndocs_term1 = freq1
    ndocs_term2 = freq2
    answer = []
    i1 = 0
    i2 = 0
    while((i1<ndocs_term1) and (i2<ndocs_term2)):
        if term1_docs[i1]==term2_docs[i2]:
            answer.append(term1_docs[i1])
            i1+=1
            i2+=1
        elif term1_docs[i1]<term2_docs[i2]:
            i1+=1
        else:
            i2+=1
    return answer
def merge_or(term1_docs, freq1, term2_docs, freq2):
    """
    Calculates the union between two lists of documents using a variant of the
    'merge' algorithm.
    :param term1_docs: Sorted ascending list of integers which represents the list of 
    documents of the first term in the inverted index
    :param freq1: Number of documents the first term appears 
    :param term2_docs: Sorted ascending list of integers which represents the list of 
    documents of the second term
    :param freq2: Number of documents the second term appears  
    :return: Sorted ascending list of integers without duplicates of the union.
    """
    ndocs_term1 = freq1
    ndocs_term2 = freq2
    answer = []
    i1 = 0
    i2 = 0
    while((i1<ndocs_term1) and (i2<ndocs_term2)):
        if term1_docs[i1]==term2_docs[i2]:
            answer.append(term1_docs[i1])
            i1+=1
            i2+=1
        elif term1_docs[i1]<term2_docs[i2]:
            answer.append(term1_docs[i1])
            i1+=1
        else:
            answer.append(term2_docs[i2])
            i2+=1
    while( i1<ndocs_term1):
        answer.append(term1_docs[i1])
        i1+=1
    while( i2<ndocs_term2):
        answer.append(term2_docs[i2])
        i2+=1
    return answer

def merge_not(term_docs, freq, num_documentos):
    """
    Calculates the complement of the given list of documents of a term using a variant of the
    'merge' algorithm.
    :param term_docs: Sorted ascending list of integers which represents the list of 
    documents of the term in the inverted index
    :param freq: Number of documents where the term appears 
    :param num_documentos: Number of documents used in the creation of the inverted index
    :return: Sorted ascending list of integers of the documents not included in term_docs.
    """
    ndocs_term = freq
    answer = []
    i1 = 0
    i2 = 1
    while((i1<ndocs_term) and (i2<=num_documentos)):
        if term_docs[i1]==i2:
            i1+=1
            i2+=1
        elif term_docs[i1]<i2:
            i1+=1
        else:
            answer.append(i2)
            i2+=1
    while( i2<=num_documentos):
        answer.append(i2)
        i2+=1
    return answer

In [28]:
def leer_query(path):
    """
    Reads a query
    :param path: Path of the file of the query
    :return: List of each line of a query
    """
    with open(path) as f:
        lines = f.readlines()
    return lines
def leer_queries():
    """
    Creates a dictionary with the terms of each query after applying the same process of tokenization, 
    stop words, normalization and lemmatization done to the inverted index.
    :param :None
    :return: Dictionary where each key is the id of a query and each value is 
    the list of terms of the query
    """
    queries = {}
    archivos_queries = glob.glob(os.path.join(*['docs', 'queries-raw-texts','*.naf']))
    nltk_stop_words_en = set(nltk.corpus.stopwords.words("english"))
    p_stemmer = nltk.stem.porter.PorterStemmer()
    wordnet_lemmatizer = nltk.stem.WordNetLemmatizer()
    for archivo in archivos_queries:
        query_content = ' '.join(leer_query(archivo)).replace('\n','')
        id_query = re.search('<public publicId="(.*?)" uri="(.*?)"',query_content).group(1)
        text_query = re.search('<raw><!\[CDATA\[(.*?)\]\]></raw>',query_content).group(1)
        queries[id_query] = [wordnet_lemmatizer.lemmatize(token) 
                             for token in nltk.word_tokenize(text_query) 
                             if (token.isalnum()) and (token not in nltk_stop_words_en)]
    return queries

In [29]:
def conjunction_queries(inverted_index, queries):
    """
    Calculates the conjunction binary query for each record in the dictionary of queries. 
    :param inverted_index: Dictionary of the inverted index
    :param queries: Dictionary of the queries
    :return: Dictionary with the result of the conjunction for each query
    """
    respuesta = {}
    for query in queries:
        first_term = queries[query][0]
        respuesta_query = []
        respuesta_query_freq = 0
        try:
            respuesta_query = [item[0] for item in inverted_index[first_term]['posting']]
            respuesta_query_freq = inverted_index[queries[query][0]]['freq']
        except:
            pass
        for term in queries[query][1:]:
            term_docs = []
            term_freq = 0
            try:
                term_docs = [item[0] for item in inverted_index[term]['posting']]
                term_freq = inverted_index[term]['freq']
            except:
                pass
            respuesta_query = merge_and(respuesta_query, respuesta_query_freq, term_docs, term_freq ) 
            respuesta_query_freq = len(respuesta_query)
            if respuesta_query_freq==0:
                break
        respuesta[query] = ','.join([ f"d{doc:03d}" for doc in respuesta_query])
    respuesta = {i : respuesta[i] for i in sorted(respuesta)}
    return respuesta
def disjunction_queries(inverted_index, queries):
    """
    Calculates the disjunctive binary query for each record in the dictionary of queries. 
    :param inverted_index: Dictionary of the inverted index
    :param queries: Dictionary of the queries
    :return: Dictionary with the result of the disjunction for each query
    """
    respuesta = {}
    for query in queries:
        first_term = queries[query][0]
        respuesta_query = []
        respuesta_query_freq = 0
        try:
            respuesta_query = [item[0] for item in inverted_index[first_term]['posting']]
            respuesta_query_freq = inverted_index[queries[query][0]]['freq']
        except:
            pass
        for term in queries[query][1:]:
            term_docs = []
            term_freq = 0
            try:
                term_docs = [item[0] for item in inverted_index[term]['posting']]
                term_freq = inverted_index[term]['freq']
            except:
                pass
            respuesta_query = merge_or(respuesta_query, respuesta_query_freq, term_docs, term_freq ) 
            respuesta_query_freq = len(respuesta_query)

        respuesta[query] = ','.join([ f"d{doc:03d}" for doc in respuesta_query])
    respuesta = {i : respuesta[i] for i in sorted(respuesta)}
    return respuesta
def guardar_file(path, dict_resultado):
    """
    Saves the given dictionary in tsv format 
    :param path: Path of the name fo the file
    :param dict_resultado: Dictionary to be saved
    :return: None
    """
    with open(path, "w") as record_file:
        for key in dict_resultado:
            record_file.write(f'{key}\t{dict_resultado[key]}\n')

In [30]:
# se lee el inverted index
ii = lectura_inverted_index(os.path.join('docs','inverted_index_lemma.pkl'))
total_documentos = obtener_total_documentos(ii)
# se leen las queries
queries = leer_queries()
# se calcula los queries binarios usando AND y OR.
disj = disjunction_queries(ii, queries)
conj = conjunction_queries(ii, queries)
# se guardan los resultados
guardar_file(os.path.join('docs', 'BSII-AND-queries_results.tsv'), conj)
guardar_file(os.path.join('docs', 'BSII-OR-queries_results.tsv'), disj)