## 1. Configuración de Spark

In [7]:
import os
from pyspark.sql import SparkSession
from pyspark import SparkConf
from pyspark.sql import functions as f
from pyspark.sql.types import StringType
from datetime import datetime
import pandas as pd

os.environ[
    "PYSPARK_SUBMIT_ARGS"
] = "--packages org.apache.hadoop:hadoop-aws:3.2.2,io.delta:delta-core_2.12:1.1.0  pyspark-shell "
config = {
    "spark.jars.packages":"org.apache.hadoop:hadoop-aws:3.2.2",
    "spark.kubernetes.namespace": "spark",
    "spark.kubernetes.container.image": "cronosnull/abd-spark-base:202301",
    "spark.executor.instances": "4",
    "spark.executor.memory": "1g",
    "spark.executor.cores": "1",
    "spark.driver.port":"38889",
    "spark.driver.blockManager.port":"7777",
    "spark.driver.bindAddress": "0.0.0.0",
    "spark.driver.host":"172.24.99.30",
    "spark.kubernetes.executor.request.cores":"500m",
    "spark.driver.memory":"4g",
    "spark.hadoop.fs.s3a.endpoint": "http://172.24.99.18:9000",
    "spark.hadoop.fs.s3a.access.key": os.environ.get('MINIO_USERNAME', "--"),
    "spark.hadoop.fs.s3a.secret.key": os.environ.get('MINIO_PASSWORD',"--"),
    "spark.hadoop.fs.s3a.path.style.access": True,
    "spark.hadoop.fs.s3a.impl": "org.apache.hadoop.fs.s3a.S3AFileSystem",
    "spark.hadoop.fs.s3a.aws.credentials.provider": "org.apache.hadoop.fs.s3a.SimpleAWSCredentialsProvider",
    "spark.kubernetes.local.dirs.tmpfs":True,

}
def get_spark_session(app_name: str, conf: SparkConf):
    conf.setMaster("k8s://https://172.24.99.68:16443")
    for key, value in config.items():
        conf.set(key, value)    
    return SparkSession.builder.appName(app_name).config(conf=conf).getOrCreate()

In [2]:
# Get or Create Spark session
spark = get_spark_session("grupo03-news", SparkConf())

In [3]:
!spark-submit --version

Welcome to
      ____              __
     / __/__  ___ _____/ /__
    _\ \/ _ \/ _ `/ __/  '_/
   /___/ .__/\_,_/_/ /_/\_\   version 3.2.0
      /_/
                        
Using Scala version 2.12.15, OpenJDK 64-Bit Server VM, 1.8.0_322
Branch 
Compiled by user  on 2022-03-26T09:34:47Z
Revision 
Url 
Type --help for more information.


In [4]:
spark

## 2. Archivo de Stopwords 

In [5]:
# Loading stop words
#stopwords_df = spark.read.text("data/userdata/grupo-03/stopwords/spanish")

In [5]:
# Loading stop words
import requests
response = requests.get('https://github.com/bsgarciac/taller1-bigdata/blob/master/data/spanish.txt')
stopwords = response.json()['payload']['blob']['rawLines']
data = [(string,) for string in stopwords] # fixing schema

In [6]:
# Adding more Stopwords
data.append(("tras", ))
data.append(("año", ))
data.append(("años", ))
data.append(("según", ))
data.append(("así", ))
data.append(("ademas", ))
data.append(("hace", ))
data.append(("ser", ))
data.append(("pide", ))
data.append(("parte", ))
data.append(("puede", ))
data.append(("ser", ))
data.append(("sido", ))
data.append(("después", ))
data.append(("además", ))
data.append(("si", ))
# Stop words to DF
stopwords_df = spark.createDataFrame(data, ["value"])

In [7]:
# Stopwords dataframe to String
stopwords_grouped_df = stopwords_df.groupBy().agg(
            f.concat_ws(" ", f.collect_set("value"))
        )
stopwords_str = stopwords_grouped_df.collect()[0][0]

## 3. Archivos de Noticias

In [8]:
# Loading news Data schema
news_df = spark.read.json("s3a://noticias2016/individual_files/news_0000001.json.gz")
schema = news_df.schema

In [9]:
# Loading news data
start = datetime.now()
news_df = spark.read.json("s3a://noticias2016/individual_files/*.json.gz", schema=schema)
end = datetime.now()
print("Execution Time:", end - start)

Execution Time: 0:04:49.346136


In [13]:
# Showing a sample for news DF
news_df.show(1, vertical=True)

-RECORD 0------------------------------
 author         |                      
 crawled        | 2016-10-20T23:44:... 
 entities       | {[], [], []}         
 external_links | []                   
 highlightText  |                      
 highlightTitle |                      
 language       | spanish              
 locations      | []                   
 ord_in_thread  | 0                    
 organizations  | []                   
 persons        | []                   
 published      | 2016-10-20T14:36:... 
 text           | MundoDeportivo.co... 
 thread         | {ES, 2564, http:/... 
 title          | Los rumores de fi... 
 url            | http://www.mundod... 
 uuid           | 96eb6858709dc9ffb... 
only showing top 1 row



In [10]:
# Adding stopwords to News dataframe
news_df = news_df.withColumn("stopwords", f.lit(stopwords_str))
news_df = news_df.withColumn("stopwords", f.split("stopwords", ' '))
news_df.select('stopwords')

DataFrame[stopwords: array<string>]

## 4. Top 10 palabras más frecuentes en los títulos

In [43]:
# Formatting title
news_df = news_df.withColumn("title_standarized", f.lower('title'))
news_df = news_df.withColumn("title_standarized", f.regexp_replace('title_standarized', "-|\t|\n|\.|,|\|", ""))

In [44]:
# Tokenizing words in news title
news_df = news_df.withColumn("words_title", f.split('title_standarized', ' '))
# Removing stopwords from words in title
news_df = news_df.withColumn("words_title_filtered", f.array_except("words_title", "stopwords"))

In [45]:
# Words frequency in Title
title_count_df = news_df.withColumn('word', f.explode('words_title_filtered')) \
    .groupBy('word').count() \
    .sort('count', ascending=False) 

In [17]:
# Showing title count df
title_count_df.show()

+-----------+-----+
|       word|count|
+-----------+-----+
|           |58020|
|     diesel|10706|
|     madrid| 9201|
|ecodiarioes| 9048|
|      nuevo| 7202|
|      color| 6564|
|       2016| 6469|
|   gobierno| 5991|
|        dos| 5699|
|     españa| 5558|
|        4x4| 5485|
|       psoe| 5187|
|   millones| 5146|
|  barcelona| 4796|
|      trump| 4257|
|    octubre| 4202|
|  univision| 3983|
|      nueva| 3791|
|         20| 3758|
|   gasolina| 3649|
+-----------+-----+
only showing top 20 rows



In [46]:
start = datetime.now()
title_count_df.write.parquet("s3a://user-data/grupo-03/title_count")
end = datetime.now()
print("Execution Time:", end - start)

Execution Time: 0:05:52.349614


## 5. Top 10 palabras más frecuentes en el contenido de las noticias

In [11]:
# Making all words in title lower case
news_df = news_df.withColumn("text_standarized", f.lower('text'))
news_df = news_df.withColumn("text_standarized", f.regexp_replace('text_standarized', "-|\t|\n|\.|,|\|", ""))

In [12]:
# Tokenizing words in news text
news_df = news_df.withColumn("words_text", f.split('text_standarized', ' '))
# Removing stopwords from words in text
news_df = news_df.withColumn("words_text_filtered", f.array_except("words_text", "stopwords"))

In [13]:
# Words frequency in Text
text_count_df = news_df.withColumn('word', f.explode('words_text_filtered')) \
    .groupBy('word').count() \
    .sort('count', ascending=False).limit(100)

In [15]:
start = datetime.now()
text_count_df.show()
end = datetime.now()
print("Execution Time:", end - start)

+----------+------+
|      word| count|
+----------+------+
|          |262205|
|       dos|108303|
|      2016| 99768|
|   octubre| 90702|
|      tres| 67132|
|       vez| 66328|
|    pasado| 65812|
|  gobierno| 63240|
|     ahora| 62809|
|       hoy| 61018|
|   primera| 60776|
|  personas| 60454|
|       día| 59645|
|presidente| 59282|
|  nacional| 58598|
|      gran| 57621|
|      país| 56919|
|      cada| 56219|
|     hacer| 55774|
|      dijo| 55686|
+----------+------+
only showing top 20 rows

Execution Time: 0:18:18.295750


In [14]:
start = datetime.now()
text_count_df.write.parquet("s3a://user-data/grupo-03/news/text_count")
end = datetime.now()
print("Execution Time:", end - start)

Execution Time: 0:17:28.459716


## 6. Finalizando sesión de Spark

In [15]:
spark.stop()

## 7. Análisis Comparativo

### 7.1 Primera Prueba: 1 Nodo Time Out

Inicialmente se intentó ejecutar este procedimiento con la siguiente configuración:

In [8]:
first_conf = {
    "spark.executor.instances": "1",
    "spark.executor.memory": "1g",
    "spark.executor.cores": "1",
    "spark.driver.memory":"1g",

}
config = config | first_conf
config_df = pd.DataFrame.from_dict(config, orient="index")

In [9]:
config_df

Unnamed: 0,0
spark.jars.packages,org.apache.hadoop:hadoop-aws:3.2.2
spark.kubernetes.namespace,spark
spark.kubernetes.container.image,cronosnull/abd-spark-base:202301
spark.executor.instances,1
spark.executor.memory,1g
spark.executor.cores,1
spark.driver.port,38889
spark.driver.blockManager.port,7777
spark.driver.bindAddress,0.0.0.0
spark.driver.host,172.24.99.30


Esto resultó en el siguiente error **OutOfMemoryError** (java.lang.OutOfMemoryError: GC overhead limit exceeded) El cual hace referencía a que el Driver se quedó sin recursos. Por lo cual en la siguiente prueba se intentó aumentar esta capacidad

### Segunda Prueba:  1 Nodo

In [11]:
second_config = {
    "spark.executor.instances": "1",
    "spark.executor.memory": "1g",
    "spark.executor.cores": "1",
    "spark.driver.memory":"4g",

}
config = config | second_config
config_df = pd.DataFrame.from_dict(config, orient="index")

In [12]:
config_df

Unnamed: 0,0
spark.jars.packages,org.apache.hadoop:hadoop-aws:3.2.2
spark.kubernetes.namespace,spark
spark.kubernetes.container.image,cronosnull/abd-spark-base:202301
spark.executor.instances,1
spark.executor.memory,1g
spark.executor.cores,1
spark.driver.port,38889
spark.driver.blockManager.port,7777
spark.driver.bindAddress,0.0.0.0
spark.driver.host,172.24.99.30


Con esta configuración se observó que el proceso se terminaba de ejecutar satisfactoríamente. Sin embargo, el tiempo de ejecución fue grande. Este experimento se realizó en la semana 7, por lo tanto, es de esperar que no tantos grupos estaban usando la infraestructura. Por lo tanto podemos asumir que esta demora se debe ahora a que **solo se está usando un Nodo**. Esto no nos permite aprovechar al máximo las ventajas de ejecutar procedimientos de Big Data usando computación distribuida.

### Tercera Prueba: 4 Nodos

In [15]:
third_config = {
    "spark.executor.instances": "4",
    "spark.executor.memory": "1g",
    "spark.executor.cores": "1",
    "spark.driver.memory":"4g",

}
config = config | third_config
config_df = pd.DataFrame.from_dict(config, orient="index")

In [16]:
config_df

Unnamed: 0,0
spark.jars.packages,org.apache.hadoop:hadoop-aws:3.2.2
spark.kubernetes.namespace,spark
spark.kubernetes.container.image,cronosnull/abd-spark-base:202301
spark.executor.instances,4
spark.executor.memory,1g
spark.executor.cores,1
spark.driver.port,38889
spark.driver.blockManager.port,7777
spark.driver.bindAddress,0.0.0.0
spark.driver.host,172.24.99.30


**Rendimiento:**

Esta configuración (con la que se realizó el ejercicio), presenta una gran mejoría en terminos de eficiencia. Además la implementación de los **Schema** en la lectura de los datos permite tener aún más velocidad. Esto se hizo debido a que los archivos parquet presentan SchemaOnWrite, por lo tanto ya cuentan con una estructura definida al momento de ser guardarlos. El problema es que Spark no sabe si este esquema será el mismo para todos los archivos. Para el caso de las Noticias sí es el caso, por lo tanto podemos aumentar la velocidad leyendo primero un archivo parquet, para extraer su estructura, y luego aplicar esta estructura en la lectura de los demás.

Finalmente se decidió usar solo 4 instancías para no acaparar más infraestructura de la necesaria, y no afectar otros grupos. Por esta misma razón siempre se corrío el comando spark.stop para mitigar este problema.

**Facilidad de uso**

En terminos de usabilidad, sin importar la cantidad de nodos, trabajar en Spark con grandes cantidades de datos puede ser desafiante. Esto se debe porque en el momento de desarrollar es normal presentar errores en el código, los cuales requieren una iteración entre Programar -> Probar -> Arreglar errores. Si se prueba este código con todos los datos desde un inicio, tendremos que esperar una gran cantidad de tiempo para ver los errores que tengamos. 

Una alternativa es probar nuestro código con una pequeña parte de los datos, esto con el fín de encontrar errores más fácilmente. Ya cuando tengamos una versión lista de nuestro código podemos probarlo ya con la cantidad total de los datos. Sin embargo hay que tener en cuenta que nuestro código debe ser eficiente en terminos de complejidad computacional, podemos crear diversos algoritmos para solucionar un mismo problema, pero no todos serán óptimos cuando la cantidad de datos es muy grande. Por lo tanto puede ocurrir que tengamos un código final que funcione con una porción de los datos, pero al probarlo con todos los datos nos de TimeOut.