# Práctica IBD

* Matías Mussini
* Antonio Reviejo
* Pablo Alonso
* Marcos Cedenilla

Arquitectura para el análisis y almacenado de artículos científicos a partir de DOIs, proporcionados en un archivo txt, mediante la herramienta semantic scholar se extraerá un json con los datos de cada artículo, se irá almacenando por batches en una base de datos documental, MongoDB, se opta por una carga en batches debido a la posibilidad de saturar la memoria principal si se opta por obtener la información de todos los documentos y luego se vuelcan a la base de datos, en el caso de volúmenes de dato tipo big data, de la misma forma tanto las consultas como la carga información en otra base de datos se hará siempre por batches, el tamaño del batch deberá ser el máximo permitido sin saturarse la memoria principal del ordenador, spark trabaja con dicha memoria por lo que el sistema establecido se aplica también en las queries spark.

## Creación de los archivos requeridos y carga de los artículos en las bases de datos

En primer lugar se crearán los archivos csv requeridos, junto con sus cabeceras, en adelante los archivos se modificarán para obtener el contenido deseado.

In [1]:
# Importanción de dependencias
import csv
import pymongo
import pandas as pd
import json

# Ajuste del batch size
batch_size = 10000


data_file = open('data/Authors.csv', 'w')
csv_writer = csv.writer(data_file)
header = ["Autor","Publicaciones"]
csv_writer.writerow(header)
data_file.close()

data_file = open('data/Documents.csv', 'w')
csv_writer = csv.writer(data_file) 
header = ["paperId","title","publicationDate" ]
csv_writer.writerow(header)
data_file.close()

data_file = open('data/Keywords.csv', 'w')
csv_writer = csv.writer(data_file) 
header = ["word", "frequency"]
csv_writer.writerow(header)
data_file.close()

Volcado de los datos según se obtienen en la base de datos MongoDB, de esta manera al trabajar en batches se podría usar programación asíncrona, dado que mientras se buscan los siguentes datos se podría ir volcando los anteriores en MongoDB, lo que mejoraría la eficiencia.

In [2]:
from semanticscholar import SemanticScholar

# Creación del cliente Mongo en el puerto correspondiente
myclient = pymongo.MongoClient("mongodb://mongo1")
mydb = myclient["mydatabase"]
# Nos movemos a la colección donde irán los documentos
mycol = mydb["publicaciones"]
# Contacto con la API de SemanticScholar
sch = SemanticScholar()

# Apertura del archivo
with open('data/dois.txt') as f:
    
    # Lectura de este, se podría hacer por batches si fuera necesario
    dois = f.readlines()
    # Lista auxiliar donde se guardará cada batch de documentos antes de volcarlos en Mongo
    j = []
    
    # Procesamos cada línea
    for doi in dois:
        doi = doi.strip()
        
        # Obtenemos la información de la API
        paper = sch.get_paper(str(doi))
        
        # Diccionario auxiliar donde se guardará la información de cada documento
        j1 = dict()
        
        # Resolvemos la ausencia de un campo en los documentos
        if paper.paperId:
            j1['paperId'] = paper.paperId
        else:
            j1['paperId'] = None
        if paper.title:
            j1['title'] = paper.title
        else:
            j1['title'] = None
        
        if paper.publicationDate:
            j1['publicationDate'] = paper.publicationDate
        else:
            j1['publicationDate'] = None

        if paper.year:
            j1['year'] =  paper.year
        else:
            j1['year'] = None   
        
        if paper.authors:
            # Procesamos autores de manera distinta, ya qe en caso de haberlos será una lista de documentos embedidos
            authors_ = []
            for i in paper.authors:
                authors_.append({'authorId' :i.authorId, 'name':i.name})
            j1['authors'] = authors_
        else:
            j1['authors'] = None

        if paper.abstract:
            j1['abstract'] = paper.abstract
        else:
            j1['abstract'] = None
        
        # Añadimos el documento procesado al batch
        j.append(j1)
        
        # Si obtenemos los suficientes, volcamos
        if len(j) == batch_size:
            mycol.insert_many(j)
            # Vaciamos el batch
            j = []

# Insertamos los documentos restantes
if len(j) != 0:
    mycol.insert_many(j)
    j = []
    
# Cerramos el cliente
myclient.close()


Para capturar las relaciones entre Autores y resolver distintas queries, optamos por volcar los datos en Neo4j, debido a su potencia al trabajar con relaciones y nodos. Para ello vamos extrayendo los datos de Mongo a Neo4j por batches para no saturar la memoria, otra opción sería crear un archivo intermedio csv, con el que luego volcar la información en neo4j a priori sería mas eficiente en tiempo aunque menos en memoria, si se opta por nuestra opción y de nuevo se hace de manera asíncrona es posible recortar esa distancia en cuanto a complejidad temporal.

* La parte más costosa a nivel de memoria de los datos es guardar el abstract o si se quisiera el texto completo, por lo tanto en puntos como este que esos datos no se extraen de mongo,
el batch_size puede ampliarse significativamente, puediendo igualar la cantidad de datos y por lo tanto ser exactamente igual a una carga total en memoria y posterior volcado a neo4j

Cabe destacar que este script siguiente es para la inicialización, si se quieren añadir datos a neo4j, se deberán obtener solo los añadidos a mongo y recibirlos de una query, de forma que no se recorra toda la base de datos mongo tratando de insertarlos, aunque con la sentencia merge no deberían existir duplicidades, este comentario se hace a nivel de eficiencia del sistema.


In [3]:
from neo4j import GraphDatabase

batch_size = 100000
# Volvemos a abrir un cliente Mongo y nos desplazamos a nuestra colección
myclient = pymongo.MongoClient("mongodb://mongo1")
mydb = myclient["mydatabase"]
mycol = mydb["publicaciones"]

# Variable auxiliar para la descarga de los datos de Mongo en batches
skip = 0

# Iniciamos un driver Neo4j, conectándonos al puerto correspondiente
uri = "bolt://neo4j"
user = "neo4j"
password = "password"
driver = GraphDatabase.driver(uri, auth=(user, password))

# Ciclo de control
while True:
    
    # Query que extrae los datos de MongoDB, preparados para su carga en Neo4j, se añade skip y limit para el uso de batches
    authorPaper = list(mycol.aggregate([
        { "$unwind": "$authors"},
        { "$project": {"_id":0,  "authors.authorId":1, "authors.name":1, "paperId":1} },
        { "$skip": skip},
        { "$limit": batch_size}
    ]))
    
    # Si se extraen documentos
    if len(authorPaper) > 0:
        
        # Se abre el driver neo4j para cargar el batch
        with driver.session() as session:
            with session.begin_transaction() as tx:
                
                # Query de carga de los datos
                query = '''
                UNWIND $data AS entry
                MERGE (a:Author {name: entry.authors.name, id: entry.authors.authorId})
                MERGE (b:Paper {title: entry.paperId})
                MERGE (a)-[:WROTE]->(b)
                '''
                tx.run(query, data=authorPaper)
    
    # Mientras que el batch no se más pequeño que el batch size y por lo tanto el limit no se ha satisfecho por ausencia de más datos, se continúa
    if len(authorPaper) < batch_size:
        break

    # Incrementamos el valor skip para cargar el siguiente batch, sin repetir información
    skip += batch_size

# Cerramos el cliente mongo    
myclient.close()

# Para las queries posteriores es importante reflejar las relaciones de colaboración entre autores, desde mongo solo podemos obtener las de cada autor con cada paper, por lo que las creamos en neo4j
query = """MATCH (a1:Author)-[:WROTE]->(p: Paper)<-[:WROTE]-(a2:Author) with a1, a2, count(distinct p) as w 
   MERGE (a1)-[:COLABORA {weight: w}]-(a2)"""

with driver.session() as session:
    result = session.run(query)
    result = list(result)
    
# Cerramos el driver neo4j
driver.close()

## Completando los archivos csv requeridos

A continuación se mostrarán las lineas de código usadas para completar los csv pedidos en el guión de la práctica.

### Datos estáticos

* De nuevo el abstract o texto no es necesario aquí por lo que podemos mantener un alto batch_size e idílicamente con buenos recursos un batch_size que permita cargar todos los datos, debido a que
  son unos pocos datos str que no ocupan gran cantidad de espacio, aún así pensando en una posible cantidad ingente de artículos del orden de $10^7$ dejamos el sistema de batches, por si se
  necesitase
  
De manera similar al comentario hecho en neo4j, estos scripts están pensados para inicialización y no actualización de los csv, se entienden que se ejecutarán una vez la base de datos esta comnpleta
de no ser así bastaría con obtener de nuevo con una query mongo los datos nuevos e insertarlos con el mismo método, en contraste con recuperar la base de datos completa como se hace aquí,
bastaría con una sentencía que filtrará por lo que es trivial.

#### Documentos

In [4]:
myclient = pymongo.MongoClient("mongodb://mongo1")
mydb = myclient["mydatabase"]
mycol = mydb["publicaciones"]

data_file = open('data/Documents.csv', 'a')
writer = csv.DictWriter(data_file, fieldnames=["paperId","title","publicationDate" ])



# Inicializamos el valor de skip
skip = 0

# Bucle de volcado en batches
while True:
    # Recuperamos un batch de documentos
    documents = list(mycol.aggregate([
        { "$project": {"_id":0, "paperId":1, "publicationDate":1, "title":1 }},
        { "$skip": skip},
        { "$limit": batch_size}
    ]))
 
    writer.writerows(documents)

    # Chequeamos si hay más documentos
    if len(documents) < batch_size:
        break

    # Incrementamos el valor skip para el siguiente batch
    skip += batch_size

# Cerramos el cliente mongo
myclient.close()
data_file.close()

#### Autores

In [5]:
myclient = pymongo.MongoClient("mongodb://mongo1")
mydb = myclient["mydatabase"]
mycol = mydb["publicaciones"]

data_file = open('data/Authors.csv', 'a')
writer = csv.DictWriter(data_file, fieldnames=["Autor","Publicaciones"])

skip = 0
while True:
    
    # Query para obtener los datos necesarios para el csv
    authors = mycol.aggregate([
        { "$unwind": "$authors"},
        { "$group": {"_id": "$authors.authorId", "Autor": {"$push":"$authors.name"}, "Publicaciones" : {"$sum":1}}},
        { "$project": {"_id":0, "Autor":1, "Publicaciones":1} },
        { "$unwind": "$Autor"},
        { "$skip": skip},
        { "$limit": batch_size}
    ])
    
    authors = list(authors)
    writer.writerows(authors)
        
    if len(authors) < batch_size:
        break
    
    skip += batch_size
        
myclient.close()
data_file.close()

### Datos dinámicos

Se ha optado por el uso de spark debido a su mayor eficiencia por el uso de memoria principal, lo cual queremos explotar al cargar la máxima cantidad de datos en esta para trabajar, su
compatibilidad con python, lenguaje al que estamos más habituados, la descarga de datos de mongo se hará de la misma manera que anteriormente.

In [6]:
from pyspark.sql.functions import length, count, udf, split, explode, col, transform, size
import pyspark.sql.functions
from pyspark.sql import SparkSession
import string
import unicodedata

# Función auxiliar para la creación de una sesión spark con los parámetros que necesitamos
def init_spark_session():
    spark = SparkSession.\
            builder.\
            appName("Cluster work").\
            master("spark://spark-master:7077").\
            config("spark.executor.memory", "512m").\
            getOrCreate()
    
    return spark

# Funciónes auxiliares para la limpieza del texto, eliminamos carácteres no deseados pero manteniendo los acentos
def es_letra_con_tilde(caracter):
    try:
        return 'WITH' in unicodedata.name(caracter)
    except ValueError:
        return False

def limpiar_texto(texto):
    caracteres_validos = string.ascii_letters + string.digits
    texto_limpiado = ''.join(c if c in caracteres_validos or es_letra_con_tilde(c) else '' for c in texto)
    return texto_limpiado.lower()

#### Keywords

In [7]:
def cuenta_palabras_spark(ruta: str, palabras: list, mycol):

    # Las entradas serán la ruta del csv donde se volcarán los datos, la lista de palabras a buscar y la colección mongo de donde se obtendrán los documentos
    
    # Creamos una sesión spark
    spark = init_spark_session()
    
    # Limpiamos las palabras
    palabras = list(map(limpiar_texto, palabras))
    
    # Abrimos el archivo
    data_file = open(ruta, 'a')
    csv_writer = csv.writer(data_file)
    
    # Creamos una versión spark de la función anterior para poder usarla en un dataFrame spark
    limpiar_texto_spark = udf(limpiar_texto)

    # Inicializamos el valor de skip
    skip = 0
    
    # Inicializamos a None el resultado final
    final_result = None
    
    # Bucle de obtención de datos
    while True:
        
        # Recuperamos un batch de documentos
        projection = { 'abstract': 1, "_id": 0}
        documents = list(mycol.find({}, projection).skip(skip).limit(batch_size))
        
        # No continuar si no se recuperan documentos
        if len(documents) == 0:
            break

        # Convertimos el batch en un spark dataFrame
        df = spark.createDataFrame(documents)

        # Creamos una linea para cada palabra
        df = df.select(explode(split(df.abstract, " ")).alias("word"))
        
        # Limpiamos el texto
        df_with_processed_column = df.withColumn("word", limpiar_texto_spark(df["word"]))
        
        # Filtramos cada linea, si se encuentra en la lista de palabras la mantenemos si no se borrará
        filtered_df = df_with_processed_column.filter(col("word").isin(palabras))

        # Contamos cada palabra
        batch_result = filtered_df.groupBy("word").count().alias("frequency")

        # Combinamos los resultados
        if final_result is None:
            final_result = batch_result
        else:
            # Unimos los dataFrames
            final_result = final_result.union(batch_result)
            # Sumamos los resultados
            final_result = final_result.groupBy("word").sum("frequency")
            # Renombramos la columna
            final_result = final_result.withColumnRenamed("sum(frequency)", "frequency")

        # Chequeamos si hay más documentos que obtener
        if len(documents) < batch_size:
            break

        # Incrementamos el valor skip para el siguiente batch
        skip += batch_size


 

    
    
    # Escribimos el resultado final
    csv_writer.writerows(final_result.collect())
    
    # Cerramos el archivo y la sesión
    spark.stop()
    data_file.close()

In [8]:
# EJEMPLO

myclient = pymongo.MongoClient("mongodb://mongo1")
mydb = myclient["mydatabase"]
mycol = mydb["publicaciones"]
cuenta_palabras_spark('data/Keywords.csv', ["the", "in"], mycol)
myclient.close()

/usr/local/lib/python3.9/dist-packages/pyspark/bin/load-spark-env.sh: line 68: ps: command not found
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
23/05/18 07:43:56 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
                                                                                

## Queries

Con la infraestructura creada se debe ser capaz de resolver las siguientes consultas.

### Consultas simples

#### Articles

Ya contamos con el json el cual tiene las obras y sus características. Todo esto guardado en Mongo. Por tanto, abriendo un nuevo cliente, obtenemos las obras de un autor ordenados por el número de autores que han participado en cada obra, dándole más imporancia a las obras en las que menos autores hayan participado. 

In [9]:
import pymongo



def articles(name, mycol):
    
    Articles = list(mycol.aggregate([
            { "$unwind": "$authors"},
            { '$group': { '_id': '$paperId', 'nombre': { "$push": '$authors.name'},  'titulo': { "$addToSet": '$title'}, 'n_autores': { '$sum': 1 } } },
            { "$unwind": "$nombre"},
            { "$match": {"nombre": name}},
            { "$project": {'_id': 0, 'titulo': 1, 'n_autores': 1}},
            { "$sort": {'n_autores': 1}}
        ]))

    return Articles
        
myclient = pymongo.MongoClient("mongodb://mongo1")
mydb = myclient["mydatabase"]
mycol = mydb["publicaciones"]
name = 'G. Niu'
result = articles(name, mycol)
myclient.close()

for i in result:
    print(i)

{'titulo': ['Overview of Big Data and Its Visualization'], 'n_autores': 2}


#### Texts

La consulta se hará mediante spark, siguiendo la estructura de la creación del archivo Keywords

In [18]:
# ADAPTAR TEXT A RECUPERAR LOS DATOS EN BATCH DESDE MONGO, LO HAGO YO, PONEROS MEJOR CON LAS OTRAS SUGERENCIAS QUE VAN ENTRE *** ***
from pyspark.sql.types import StringType, ArrayType

def limpiar_texto_array(array):
    return [limpiar_texto(d) for d in array]


def texts(palabra, mycol):
    
    spark = init_spark_session()
    
    limpiar_texto_spark = udf(limpiar_texto_array, ArrayType(StringType()))

    # Inicializamos el valor de skip
    skip = 0
    
    # Inicializamos a None el resultado final
    final_result = None
    
    # Bucle de obtención de datos
    while True:
        
        # Recuperamos un batch de documentos
        projection = { 'abstract': 1, "title": 1, "_id": 0}
        documents = list(mycol.find({}, projection).skip(skip).limit(batch_size))
        
        # No continuar si no se recuperan documentos
        if len(documents) == 0:
            break

        # Convertimos el batch en un spark dataFrame
        df = spark.createDataFrame(documents)

        df = df.select("title", split(df.abstract, " ").alias("word"))
        df = df.withColumn("word", limpiar_texto_spark(col('word')))
        df = df.withColumn('word_count', size(col('word')))
        batch_result = df.withColumn("frequency", size(pyspark.sql.functions.filter(col("word"), lambda x: x == palabra))/col("word_count")).select("title", "frequency")
        
        # Combinamos los resultados
        if final_result is None:
            final_result = batch_result
        else:
            # Unimos los dataFrames
            final_result = final_result.union(batch_result)

        # Chequeamos si hay más documentos que obtener
        if len(documents) < batch_size:
            break

        # Incrementamos el valor skip para el siguiente batch
        skip += batch_size
        
        

    final_result = final_result.sort(final_result.frequency.desc()).collect() # Para mayor legibilidad usar toPandas() aunque sería más costoso a nivel
                                                                              # de memoria
    return final_result


In [17]:
# EJEMPLO

myclient = pymongo.MongoClient("mongodb://mongo1")
mydb = myclient["mydatabase"]
mycol = mydb["publicaciones"]
palabra = "the"
print(texts(palabra, mycol))
myclient.close()

                                                                                

                                               title  frequency
0  Comparison between Expert Systems, Machine Lea...   0.090909
1         Overview of Big Data and Its Visualization   0.044586


### Consultas Complejas

#### Collaborators

Consulta realizada mediante neo4j debido a su potencia en el trabajo con relaciones. Debido a que la relación puede ser directa o indirecta. Por ello tendremos esto en cuenta a la hora de ordenar el listado que se devuelve, devolviendo primero los autores con los que se haya colaborado directamente, y dentro de ello, priorizando con quienes haya colaborado más veces.

In [23]:
def Colaborators(name, driver):
    
    name = '\'' + name + '\''
    # Query
    query = f'''
            MATCH (r: Author {{name : {name} }})-[c:COLABORA*..2]-(r2:Author)
            WHERE r <> r2
            WITH DISTINCT r2
            MATCH p = shortestPath((:Author {{name : {name} }})-[:COLABORA*..2]-(r2))
            RETURN r2.name, length(p), reduce(totalWeight = 0, rel in relationships(p) | totalWeight + rel.weight) AS shortestPathWeight
            ORDER BY length(p), shortestPathWeight
            '''
    with driver.session() as session:
        result = session.run(query)
        result = list(result)

    driver.close()
        
    return result
        
        
# Creamos el driver Neo4j
uri = "bolt://neo4j"
user = "neo4j"
password = "password"

driver = GraphDatabase.driver(uri, auth=(user, password))
name = 'Maad M. Mijwil'
result = Colaborators(name, driver)

for i in result:
    print("Autor:", i[0], "| Distancia:", i[1], "| Peso:",i[2])
    

Autor: Y. Filali | Distancia: 1 | Peso: 1
Autor: Karan Aggarwal | Distancia: 1 | Peso: 1
Autor: Humam Al-Shahwani | Distancia: 1 | Peso: 1
Autor: Dhamyaa Salim Mutar | Distancia: 1 | Peso: 1


#### Words

De nuevo consulta creada con spark, seguirá la misma estructura que las anteriores

In [14]:
def palabras_por_size(size: int, mycol):
        
    # Las entradas serán el tamaño requerido y la colección mongo
    
    # Filtrar para que el tamaño no pueda ser de más de 20 letras
    if size < 1 or size > 20:
        raise ValueError("size must be a integer between 1 and 20")
    
    # Iniciamos la sesión
    spark = init_spark_session()
    limpiar_texto_spark = udf(limpiar_texto)

    # Inicializamos el valor skip
    skip = 0

    final_result = None
    
    while True:
        projection = { 'abstract': 1, "_id": 0}
        # Recuperamos un batch de documentos, se podría recuperar solo el abstract
        documents = list(mycol.find({}, projection).skip(skip).limit(batch_size))
        
        if len(documents) == 0:
            break
        
        # Cramos el dataFrame en base al batch
        df = spark.createDataFrame(documents)
        
        # Creamos una linea por palabra
        df = df.select(explode(split(df.abstract, " ")).alias("word"))
        
        # Limpiamos el texto
        df_with_processed_column = df.withColumn("word", limpiar_texto_spark(df["word"]))
        
        # Filtramos por tamaño
        filtered_df = df_with_processed_column.filter(length("word")==size)

        # Contamos
        batch_result = filtered_df.count()
        



        # Combinamos los resultados
        if final_result is None:
            final_result = batch_result
        else:
            final_result =+ batch_result

        # Chequeamos si hay más documentos
        if len(documents) < batch_size:
            break

        # Incrementamos el skip para el siguiente batch
        skip += batch_size

    
    # Cerramos la sesión y printeamos los resultados
    spark.stop()
    return final_result

In [15]:
# EJEMPLO

myclient = pymongo.MongoClient("mongodb://mongo1")
mydb = myclient["mydatabase"]
mycol = mydb["publicaciones"]

size = 3
num = palabras_por_size(size, mycol)
myclient.close()

print(f'Existen: {num} de tamaño {size}')

55
