# **HDFS (Hadoop Distributed File System)**

<img src="https://hadoop.apache.org/docs/r1.2.1/images/hadoop-logo.jpg">

**HDFS (Hadoop Distributed File System)** es una parte esencial del ecosistema Big Data de Apache Hadoop. HDFS está diseñado para almacenar y gestionar grandes volúmenes de datos distribuidos en varios nodos de un cluster, proporcionando alta tolerancia a fallos y escalabilidad. En este primer ejercicio, vamos a interactuar con HDFS mediante la línea de comandos dentro del entorno de **JupyterLab**, lo que nos permitirá familiarizarnos con las operaciones básicas de este sistema de archivos distribuido.

Para comenzar, es necesario abrir un terminal desde **JupyterLab**. Una vez abierto, podemos enviar comandos al sistema de archivos HDFS, que son muy similares a los comandos de bash en entornos Linux. Algunos de los comandos de HDFS que ejecutaremos comenzarán con `hdfs dfs`, seguidos de la operación que deseemos realizar. Por ejemplo, si queremos listar los archivos y directorios en el directorio raíz de HDFS, usaremos el comando ls de la siguiente manera:

# ![Spark Logo](http://spark-mooc.github.io/web-assets/images/ta_Spark-logo-small.png)

# Actividad BATCH

## Sistema de ficheros HDFS y extracción de conocimiento de fuentes de datos heterogéneas mediante RDDs

En esta práctica comenzaremos con una breve introducción a HDFS (Hadoop Distributed File System), para entender cómo se almacena y distribuye la información. Luego, nos meteremos de lleno con Spark RDDs y Spark SQL para procesar grandes volúmenes de datos de forma eficiente. Para finalizar, trabajaremos con datos relacionales y su manejo en entornos distribuidos.

### Puntuación de la actividad:
- **Ejercicio 1**: Gestión y análisis de datos en HDFS *(0.5 puntos)*
- **Ejercicio 2**: Manipulación de RDDs en PySpark *(1.25 puntos)*
- **Ejercicio 3**: Análisis de Datos de Tweets en PySpark *(1.25 puntos)*
- **Ejercicio 4**: Optimización de Cálculos con Persistencia *(0.25 puntos)*
- **Ejercicio 5**: Análisis de Tweets mediante DataFrames y consultas SQL *(2 puntos)*
- **Ejercicio 6**: Análisis de Tweets Geolocalizados *(1.5 puntos)*
- **Ejercicio 7**: Análisis del Patrón de Actividad Horaria en Twitter *(1 puntos)*
- **Ejercicio 8**: Análisis de la Relación entre Tweets y Diputados por Provincia *(0.75 puntos)*
- **Ejercicio 9**: Análisis de Interacciones de Retweets y Grados de Usuario *(0.75 puntos)*
- **Ejercicio 10**: Distribución del Grado de Salida en una Red de Retweets *(0.75 puntos)*

In [None]:
!hdfs dfs -ls /

Es importante que todos los comandos se ejecuten correctamente en el entorno **JupyterLab** para obtener los resultados deseados.

Para consultar la documentación completa de los comandos disponibles en HDFS, puedes acceder a la guía oficial en el siguiente enlace: [HDFS Command Guide](https://hadoop.apache.org/docs/current/hadoop-project-dist/hadoop-hdfs/HDFSCommands.html)

A lo largo de este ejercicio, utilizaremos algunos de los comandos más comunes de HDFS para realizar operaciones como la creación de directorios, la carga y descarga de archivos, y la gestión de permisos, entre otros. A medida que avanzamos, te familiarizarás con la estructura de HDFS y cómo aprovechar sus funcionalidades en entornos Big Data.

### **Ejercicio 1**: Gestión y análisis de datos en HDFS (*0.5 puntos*)

En este ejercicio trabajarás con un conjunto de registros de consumo eléctrico almacenados en el archivo `consumo_hogar_2024.csv`, disponible en la ruta `/aula_M2.858/data/consumo_hogar_2024.csv`.

Tu tarea consiste en realizar una exploración completa del archivo directamente en HDFS, verificando su tamaño, permisos, propietario, factor de replicación y número de bloques. Deberás además comprobar que el archivo no presenta errores de integridad ni bloques dañados.

Sin descargar el archivo completamente, analiza su estructura y verifica que los datos sean legibles (por ejemplo, revisando encabezados y algunas filas de muestra).

Una vez confirmes que el archivo está correcto, crea dentro de tu espacio personal en HDFS una carpeta llamada `procesado` en la ruta `/user/[tu_usuario]/` y reorganiza allí el archivo aplicando un nombre que indique que ha sido validado, por ejemplo `consumo_hogar_2024_validado.csv`.

Finalmente, genera un pequeño informe de verificación (en texto plano) que resuma la información principal del archivo (tamaño, bloques, factor de replicación, propietario y fecha del proceso) y guárdalo en la misma carpeta procesado.

In [None]:
# YOUR CODE HERE
raise NotImplementedError()

# **Apache Spark RDDs (Resilient Distributed Datasets)**

En el marco del procesamiento de grandes volúmenes de datos con Apache Spark, los RDDs, o Resilient Distributed Datasets, juegan un papel fundamental. Un RDD es una colección de elementos que se distribuyen a través de un clúster de nodos y sobre la cual se pueden aplicar operaciones que se ejecutan en paralelo.

Recordemos sus características:

- Inmutabilidad: Una vez que se crea un RDD, no se puede modificar. En lugar de eso, cualquier operación que modifique los datos generará un nuevo RDD.

- Distribución: Los RDDs están repartidos entre los diferentes nodos del clúster, permitiendo un procesamiento paralelo eficiente.

- Tolerancia a Fallos: Los RDDs son resistentes a fallos. En caso de que un nodo falle, Spark puede reconstruir los datos perdidos a partir de los datos originales y las operaciones realizadas.

Esta estructura permite un procesamiento eficiente y escalable de datos, lo que es esencial para trabajar con grandes volúmenes de información en entornos de clúster.

A continuación se muestra el código que debéis ejecutar para configurar vuestro entorno de Spark.

> Como referencia a todos métodos que se requieren para implementar esta práctica podéis consultar:
> * [API Python de Spark](https://archive.apache.org/dist/spark/docs/2.4.0/api/python/index.html)

### Configuración del entorno python + spark


In [None]:
import findspark
import os

SPARK_HOME_PATH = "/usr/bigtop/current/spark-client/" 
os.environ['SPARK_HOME'] = SPARK_HOME_PATH
findspark.init(SPARK_HOME_PATH)

from pyspark.sql import SparkSession

spark = SparkSession.builder \
    .appName("ActividadRDDs_usuario") \
    .master("local[*]") \
    .getOrCreate()

print(spark.sparkContext.appName)
print(spark.version)

sc=spark.sparkContext

### **Ejercicio 2**: Manipulación de RDDs en PySpark (*1.25 puntos*)

En este ejercicio, te proporcionamos dos listas de números en las que realizarás diversas operaciones sobre ellas utilizando RDDs en PySpark. La solución y el enfoque quedan a tu criterio.
Contexto:

Tienes dos listas de números que representan datos de sensores:
- **Sensor A**: Números del 1 al 25.
- **Sensor B**: Números del 15 al 35.

Debes crear RDDs a partir de las listas de números de cada sensor. Una vez hecho esto, para el **Sensor A**, transforma cada número en una tupla `(número, número al cubo)`. Y filtra solo aquellos números cuyo cubo sea **múltiplo de 7** y **mayor que 50**. El RDD resultante se almacenará en una variable llamada `rdd_a_filtrado`. Finalmente, agrupa los números filtrados según si son **pares, impares o múltiplos de 5** (un número puede pertenecer a más de un grupo), y guarda este resultado en `rdd_a_grupos`.

Volviendo a los RDDs iniciales, calcula la intersección entre los RDDs de **Sensor A y Sensor B**, guárdalo en `rdd_interseccion` y calcula la diferencia de **Sensor B menos Sensor A**, que se guardará en `rdd_diferencia`. A continuación, realiza una unión de ambos RDDs, eliminando los valores duplicados y ordénala de mayor a menor, guardando el resultado en `rdd_union`.

- Imprime los resultados de cada una de las operaciones realizadas utilizando el método `collect()`.

In [None]:
# YOUR CODE HERE
raise NotImplementedError()

In [None]:
# DO NOT USE THIS CELL

### **Ejercicio 3**: Análisis de Datos de Tweets en PySpark (*1.25 puntos*)

En este ejercicio, trabajarás con un archivo JSON llamado `tweets_sample.json` que se encuentra en la ruta `/aula_M2.858/data/tweets_sample.json`. Este archivo contiene datos de tweets y métricas relacionadas. Deberás utilizar PySpark para realizar un análisis de los datos. La estructura del archivo JSON incluye información como el número de retweets, likes, seguidores, y más. Sin embargo, para este ejercicio, te enfocarás en procesar y analizar el contenido textual de los tweets.

- Carga el archivo JSON en un RDD utilizando el método `textFile()`. Examina la estructura de los datos para identificar cómo extraer el contenido relevante.

- Extrae el campo tweets de cada uno de los tweets. Define y aplica una función para limpiar el texto. Esta función debe eliminar la puntuación, convertir el texto a minúsculas y asegurar que haya un solo espacio entre las palabras.

- Divide el texto en palabras y filtra las palabras para quedarte con aquellas que tengan menos de 7 caracteres. Después, realiza un conteo de palabras distintas, guárdalo en la variable `palabras_distintas_rdd`.

- Por último, encuentra las 5 palabras más frecuentes que terminen en vocal. Guárdalo en la variable `top_5_palabras`.

- Imprime los resultados de cada una de las operaciones realizadas.

In [None]:
# YOUR CODE HERE
raise NotImplementedError()

In [None]:
# DO NOT USE THIS CELL

### **Ejercicio 4**: Optimización de Cálculos con Persistencia (*0.25 puntos*)

Para reducir los tiempos de ejecución en Spark, es fundamental utilizar la persistencia de un RDD mediante el método `persist()`. Esta técnica es particularmente útil cuando se realizan múltiples operaciones repetidas sobre un mismo RDD.

Cuando persistes un RDD, Spark almacena los datos en memoria (o en disco, dependiendo del nivel de persistencia, para ver mas sobre los niveles de persistencia ir a la web [Persistencia Spark](https://spark.apache.org/docs/latest/rdd-programming-guide.html#rdd-persistence)) para evitar recomputaciones cada vez que se necesita realizar una acción sobre el RDD. Esto significa que cada nodo del clúster guarda en su memoria las particiones del RDD que ha procesado, permitiendo que las siguientes operaciones sobre el RDD sean mucho más rápidas.

**Medición de Rendimiento**

Para medir la mejora en los tiempos de ejecución, podemos utilizar la función mágica `%%time` en un entorno Jupyter Notebook, que permite observar:

- Wall clock time: Tiempo total real que lleva ejecutar una tarea, incluyendo la CPU, el tiempo de entrada/salida (I/O), y las posibles comunicaciones entre nodos en el clúster.

- CPU time: Tiempo efectivo en que la CPU está ocupada ejecutando la tarea, excluyendo otras latencias como la de entrada/salida.

En este ejercicio, se explorará el uso de la persistencia en RDDs (Resilient Distributed Datasets) utilizando PySpark. El objetivo es observar cómo la persistencia afecta al rendimiento de las operaciones de transformación y acción sobre los RDDs.

- Crea un RDD a partir de una lista de números que va del 1 al 10.000.

- Filtra el RDD para obtener solo los números mayores a 5.000 y almacena este resultado en un nuevo RDD.

- Aplica una transformación para duplicar los valores del RDD filtrado y guárdalo en un nuevo RDD.

In [None]:
# YOUR CODE HERE
raise NotImplementedError()

- Utiliza el método collect() para recuperar y mostrar los números mayores a 5.000 y sus dobles, y mide el tiempo que tarda en ejecutarse esta operación utilizando la función mágica `%%time`.

In [None]:
# YOUR CODE HERE
raise NotImplementedError()

- Aplica la persistencia sobre el RDD de números mayores a 5.000 para que su contenido se mantenga en memoria entre las operaciones.

In [None]:
# YOUR CODE HERE
raise NotImplementedError()

- Vuelve a ejecutar el método collect() como antes. Compara este tiempo con el tiempo de la primera ejecución. (Puedes ejecutarlo varias veces y ver que ocurre con el tiempo de procesamiento.)

In [None]:
# YOUR CODE HERE
raise NotImplementedError()

- Deshazte de la persistencia del RDD utilizando unpersist() para liberar recursos y detén la sesión de Spark al final del ejercicio con sc.stop().

In [None]:
# YOUR CODE HERE
raise NotImplementedError()

Al terminar el ejercicio, analiza y comenta los resultados obtenidos, explicando cómo la persistencia afectó el rendimiento de tus cálculos.

In [None]:
# YOUR CODE HERE
raise NotImplementedError()

# **Apache Spark Dataframes**

En esta parte de la práctica vamos a introducir los elementos que ofrece Spark para trabajar con estructuras de datos. Veremos desde estructuras muy simples hasta estructuras complejas, donde los campos pueden a su vez tener campos anidados. En concreto utilizaremos datos de Twitter capturados en el contexto de las elecciones generales en España del 28 de abril de 2019

### Configuración del entorno

In [None]:
import findspark
import os

SPARK_HOME_PATH = "/usr/bigtop/current/spark-client/" 
os.environ['SPARK_HOME'] = SPARK_HOME_PATH
findspark.init(SPARK_HOME_PATH)

from pyspark.sql import SparkSession

spark = SparkSession.builder \
    .appName("ActividadSparkSQL_usuario") \
    .master("local[1]") \
    .enableHiveSupport() \
    .getOrCreate()
#    .config("spark.hadoop.hive.execution.engine", "mr") \
#    .enableHiveSupport() \

print(spark.sparkContext.appName)
print(spark.version)

sc=spark.sparkContext

In [None]:
import re
import os
import pandas as pd
from matplotlib import pyplot as plt
from math import floor
from pyspark import SparkConf, SparkContext, SQLContext
from pyspark.sql import Row

In [None]:
SUBMIT_ARGS = "--jars /opt/cloudera/parcels/CDH-6.2.0-1.cdh6.2.0.p0.967373/jars/graphframes_graphframes-0.7.0-spark2.4-s_2.11.jar pyspark-shell"

os.environ["PYSPARK_SUBMIT_ARGS"] = SUBMIT_ARGS

## Introducción a dataframes estructurados y operaciones sobre ellos

Como ya se ha mencionado, en los siguientes ejercicis vamos a utilizar datos de Twitter que recolectamos durante las elecciones generales en España del 28 de abril de 2019. Como veremos, los tweets tienen una estructura interna bastante compleja que hemos simplificado un poco en esta práctica.

Lo primero que vamos a aprender es cómo importar este tipo de datos a nuestro entorno. Uno de los tipos de archivos más comunes para guardar este formato de información es [la estructura JSON](https://en.wikipedia.org/wiki/JSON). Esta estructura permite guardar información en un texto plano de diferentes objetos siguiendo una estructura de diccionario donde cada campo tiene asignado una clave y un valor. La estructura puede ser anidada, o sea que una clave puede tener como valor otra estructura de tipo diccionario.

Spark SQL permite leer datos de muchos formatos diferentes. Se os pide que [leáis el fichero JSON](https://spark.apache.org/docs/2.4.0/sql-data-sources-json.html) de la ruta ```/aula_M2.858/data/tweets28a_sample.json```. Este archivo contiene un pequeño *sample*, un 0.1% de la base de datos completa (en un siguiente apartado veremos cómo realizar este *sampleado*). En esta ocasión no se os pide especificar la estructura del dataframe ya que la función de lectura la inferirá automáticamente.

**Ejemplo de lectura (Rellenar con lo correspondiente para la lectura del archivo json)**:

```Python
tweets_sample = spark.read.json(<FILL IN>)

print("Loaded dataset contains %d tweets" % tweets_sample.count())
```

Para mostrar la estructura del dataset que acabamos de cargar, podéis obtener la información acerca de cómo está estructurado el DataTable utilizando el método ```printSchema()```. Tenéis que familiarizaros con esta estructura ya que será la que utilizaremos durante los próximos ejercicios. Recordad también que no todos los tweets tienen todos los campos, como por ejemplo la ubicación (campo ```place```). Cuando esto pasa el campo pasa a ser ```NULL```. Podéis ver más información sobre este tipo de datos en [este enlace](https://developer.twitter.com/en/docs/tweets/data-dictionary/overview/tweet-object).

Ahora debéis introducir el ejemplo de lectura con el `<FILL IN>` relleno según corresponda para la lectura del archivo JSON. Y, a continuación, mostraréis la estructura del dataset utilizando `printSchema()`.

In [None]:
# YOUR CODE HERE
raise NotImplementedError()

In [None]:
assert tweets_sample.count() == 27268, "Incorrect answer"

### Queries sobre dataframes complejos

A continuación vamos a ver como realizar consultas sobre el dataset de los tweets. Vamos a utilizar [sentencias *SQL*](https://www.w3schools.com/sql/default.asp) (como las utilizadas en la mayoría de bases de datos relacionales).

Lo primero que se debe hacer es registrar el dataframe de tweets como una tabla de SQL. Para ello utilizaremos [sqlContext.registerDataFrameAsTable()](https://spark.apache.org/docs/2.4.0/api/python/pyspark.sql.html#pyspark.sql.SQLContext.registerDataFrameAsTable). Para ejecutar comandos sql solo teneis que utilizar el metodo sql() del objecto contexto, en este caso `sqlContext`.

#### Queries a través del pipeline

Las tablas de Spark SQL ofrecen otro mecanismo para aplicar las transformaciones y obtener resultados similares a los que se obtendría aplicando una consulta SQL. Por ejemplo, utilizando el siguiente pipeline obtendremos el texto de todos los tweets en español:

```
tweets_sample.where("lang == 'es'").select("text")
```

Que es equivalente a la siguiente sentencia SQL:

```
SELECT text
FROM tweets_sample
WHERE lang == 'es'
```

Podéis consultar el [API de spark SQL](https://spark.apache.org/docs/2.4.0/api/python/pyspark.sql.html) para encontrar más información sobre como utilizar las diferentes transformaciones en tablas.

### **Ejercicio 5**: Análisis de Tweets mediante DataFrames y consultas SQL (*2 puntos*)

Anteriormente ya has realizado la lectura del conjunto `tweets28a_sample.json` en formato JSON. Ahora deberás asegúrate de registrar el DataFrame como una tabla SQL llamada `tweets_sample`.

***Nota:*** Debido a que es posible que ejecutes estas líneas de codigo várias veces, vamos a tomar la precaución de ejecutar el comando sql para eliminar tablas antes de que las crees, ya que puede existir la posibilidad de que ya existan.

`spark.sql("DROP TABLE IF EXISTS tweets_sample")`

A continuación, se pide crear una tabla y registrarla con el nombre ```users_agg``` con [la información agregada](https://www.w3schools.com/sql/sql_groupby.asp) de los usuarios que tengan definido su idioma (```user.lang```) como español (```es```). En concreto se pide que la tabla contenga las siguientes columnas:
- **screen_name:** nombre del usuario
- **friends_count:** número máximo (ver nota) de personas a las que sigue
- **tweets:** número de tweets realizados
- **followers_count:** número máximo (ver nota) personas que siguen al usuario.

El orden en el cual se deben mostrar los registros es orden descendente acorde al número de tweets.

***Nota:*** Es importante que te fijes que el nombre de *friends* y *followers* puede diferir a lo largo de la adquisición de datos. En este caso vamos a utilizar la función de agregación `MAX` sobre cada uno de estos campos para evitar segmentar el usuario en diversas instancias.

In [None]:
# YOUR CODE HERE
raise NotImplementedError()

In [None]:
# DO NOT USE THIS CELL

A continuación recurriremos al [JOIN de tablas](https://www.w3schools.com/sql/sql_join.asp) para combinar la información entre tablas. Debes combinar la tabla `users_agg` y la tabla `tweets_sample` utilizando un `INNER JOIN` para obtener una nueva tabla con el nombre retweeted con la siguiente información:
- ***screen_name:*** nombre de usuario
- ***friends_count:*** número máximo de personas a las que sigue
- ***followers_count:*** número máximo de personas que siguen al usuario.
- ***tweets:*** número de tweets realizados por el usuario.
- ***retweeted:*** número de retweets obtenidos por el usuario.
- ***ratio_tweet_retweeted:*** ratio de retweets por número de tweets publicados $\frac{retweets}{tweets}$

La tabla resultante tiene que estar ordenada de manera descendente según el valor de la columna `ratio_tweet_retweeted`.

Por último, utilizando queries a través de pipeline, debes crear una tabla `user_retweets` a partir de la tabla `tweets_sample`, utilizando transformaciones que contenga dos columnas:
- ***screen_name:*** nombre de usuario
- ***retweeted:*** número de retweets

Ordena la tabla en orden descendente utilizando el valor de la columna ```retweeted```.

In [None]:
# YOUR CODE HERE
raise NotImplementedError()

In [None]:
# DO NOT USE THIS CELL

## Bases de datos HIVE y operaciones complejas

Hasta ahora hemos estado trabajando con un pequeño sample de los tweets generados (el 0.1%). En esta parte de la actividad vamos a ver como trabajar y tratar con el dataset completo. Para ello vamos a utilizar tanto transformaciones sobre tablas como operaciones sobre RDD cuando sea necesario.

Es importante tener en cuenta que muchas veces los datos con los que trabajamos se utilizarán en diversos proyectos. En lugar de manejar directamente los archivos, es más eficiente y organizado recurrir a una base de datos para gestionar la información. En el ecosistema de Hadoop, una de las bases de datos más utilizadas es [Apache Hive](https://hive.apache.org/). Sin embargo, es crucial entender que Hive no es una base de datos convencional. En realidad, funciona como un metastore que mapea archivos en el sistema de archivos distribuido de Hadoop (HDFS).

Esto significa que Hive no almacena los datos en su propio formato de base de datos, sino que actúa como una interfaz que permite a los usuarios ejecutar consultas SQL sobre los datos almacenados en HDFS. Esto proporciona una forma eficiente de acceder y manipular grandes volúmenes de datos distribuidos sin necesidad de moverlos o convertirlos a un formato tradicional de base de datos.

La manera de acceder a esta base de datos es tal y como se muestra en el siguiente código (debéis ejecutarlo).

In [None]:
# 1. Obtener la lista de objetos 'Table'
tables_list = spark.catalog.listTables()

# 2. Convertir la lista de objetos en una lista de tuplas (Nombre, BD, Temporal)
# Utilizamos una comprensión de lista (list comprehension)
data_for_df = [(t.name, t.database, t.isTemporary) for t in tables_list]

# 3. Definir el esquema manualmente para evitar cualquier inferencia automática
from pyspark.sql.types import StructType, StructField, StringType, BooleanType

manual_schema = StructType([
    StructField("Nombre_Tabla", StringType(), True),
    StructField("Base_Datos", StringType(), True),
    StructField("Es_Temporal", BooleanType(), True)
])

# 4. Crear el DataFrame con los datos y el esquema definido
tables_df = spark.createDataFrame(data_for_df, schema=manual_schema)

# 5. Mostrar el resultado
tables_df.show()

### Más allá de las transformaciones SQL

Algunas veces vamos a necesitar obtener resultados que precisan operaciones que van más allá de lo que podemos conseguir (cómodamente) utilizando el lenguaje SQL. En esta parte de la práctica vamos a practicar cómo pasar de una tabla a un RDD, para hacer operaciones complejas, y luego volver a pasar a una tabla.

Ahora viene la parte interesante. Una tabla puede convertirse en un RDD a través del atributo ```.rdd```. Este atributo guarda la información de la tabla en una lista donde cada elemento es un [objeto del tipo ```Row```](https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/api/pyspark.sql.Row). Los objetos pertenecientes a esta clase pueden verse como diccionarios donde la información de las diferentes columnas queda reflejada en forma de atributo. Por ejemplo, imaginad que tenemos una tabla con dos columnas, nombre y apellido, si utilizamos el atributo ```.rdd``` de dicha tabla obtendremos una lista con objetos del tipo row donde cada objeto tiene dos atributos: nombre y apellido. Para acceder a los atributos solo tenéis que utilizar la sintaxis *punto* de Python, e.g., ```row.nombre``` o ```row.apellido```.

### **Ejercicio 6**: Análisis de Tweets Geolocalizados (*1.5 puntos*)

Dada la tabla de tweets `tweets28a_sample25`, debes crear una variable `tweets` utilizando el objeto `spark` y el método `table()`. Utilizando una sentencia SQL, se requiere extraer información sobre los tweets que contienen datos geolocalizados (es decir, aquellos donde el campo `place` no es nulo) y determinar cuántos tweets se han generado desde cada lugar. Los resultados deben ser presentados en orden descendente por la cantidad de tweets.

**Esquema sentencia sql**
```Python
tweets_place = spark.sql(<FILL IN>)
```

A continuación, crea una tabla llamada `tweets_place` que contenga dos columnas:

- ***name:*** nombre del lugar desde donde se ha generado el tweet.
- ***tweets:*** número total de tweets realizados en dicho lugar.

Finalmente, muestra los 10 lugares con mayor número de tweets en la tabla resultante.

Adicionalmente, crea una tabla llamada `tweets_geo` que contenga únicamente los tweets que tienen información de geolocalización, y asegúrate de que incluya el nombre del lugar. A partir de esta tabla, crea un objeto ```tweets_place_rdd``` que contenga una lista de tuplas con la información ```(name, tweets)``` sobre el nombre del lugar y el número de tweets generados desde allí.

Una vez generado este RDD vamos a crear una tabla. El primer paso es generar por cada tupla un objeto Row que contenga un atributo ```name``` y un atributo ```tweets```. Ahora solo tenéis que aplicar el método ```toDF()``` para generar una tabla. Ordenad las filas de esta tabla por el número de tweets en orden descendente.

El ejercicio deberá realizarse combinando tanto sentencias SQL como RDD en Apache Spark.

In [None]:
# YOUR CODE HERE
raise NotImplementedError()

In [None]:
# DO NOT USE THIS CELL

## Sampling

En muchas ocasiones, antes de lanzar costosos procesos, es una práctica habitual tratar con un pequeño conjunto de los datos para investigar algunas propiedades o simplemente para depurar nuestros algoritmos, a esta tarea se la llama sampling. En esta parte de la práctica vamos a ver los dos principales métodos de sampling y cómo utilizarlos.

### Nota:
Para producir un gráfico de barras utilizando [Pandas](https://pandas.pydata.org/) donde se muestre la información que acabáis de generar. Primero transformad la tabla a pandas utilizando el método `toPandas()`. Plotead la tabla resultante utilizando [la funcionalidad gráfica de pandas.](https://pandas.pydata.org/docs/reference/api/pandas.DataFrame.plot.bar.html)

### Homogéneo

El primer sampling que vamos a ver es [el homogeneo](https://en.wikipedia.org/wiki/Simple_random_sample). Este sampling se basta en simplemente escoger una fracción de la población seleccionando aleatoriamente elementos de esta.

Primero de todo vamos a realizar un sampling homogéneo del 1% de los tweets generados en periodo electoral sin reemplazo. Guardad en una variable ```tweets_sample``` este sampling utilizando el método ```sample``` descrito en la [API de pyspark SQL](https://spark.apache.org/docs/2.4.0/api/python/pyspark.sql.html). El seed que vais a utilizar para inicializar el generador aleatorio es 42.

**Esquema**
```Python
seed = 42
fraction = 0.01

tweets_sample = tweets.<FILL IN>

print("Number of tweets sampled: {0}".format(tweets_sample.count()))
```

### **Ejercicio 7**: Análisis del Patrón de Actividad Horaria en Twitter (*1 puntos*)
A partir de una muestra del 1% de los tweets disponibles, se desea analizar el patrón de uso diario de Twitter, prestando especial atención a la actividad horaria. El objetivo es calcular y visualizar el promedio de tweets generados en cada hora del día. Para ello se debe crear una tabla ```tweets_timestamp``` con la información:
- ***created_at***: timestamp de cuando se publicó el tweet.
- ***hour***: a que hora del dia corresponde.
- ***day***: Fecha en formato MM-dd-YY

La tabla tiene que estar en orden ascendente según la columna `created_at`

**Pista**: Para crear las columnas "hour" y "day" en tu tabla tweets_timestamp, puedes utilizar withColumn(). La función ```hour``` os servirá para extraer la hora del timestamp y la función ```date_format``` os permitirá generar la fecha.

A continuación, utiliza la muestra de tweets para extraer la hora y fecha de publicación, y a partir de esa información, determina cuántos tweets se generan por hora. Asegúrate de ajustar el promedio de tweets para reflejar lo que ocurriría en el conjunto completo de datos y presenta los resultados en un gráfico de barras que muestre la actividad horaria promedio.

In [None]:
# YOUR CODE HERE
raise NotImplementedError()

In [None]:
# DO NOT USE THIS CELL

### Estratificado

En muchas ocasiones el sampling homogéneo no es adecuado ya que por la propia estructura de los datos determinados segmentos pueden estar sobre-representadas. Este es el caso que observamos en los tweets donde las grandes áreas urbanas están sobrerepresentadas si lo comparamos con el volumen de población. En esta actividad vamos a ver cómo aplicar esta técnica al dataset de tweets, para obtener un sampling que respete la proporción de diputados por provincia.

En España, el proceso electoral asigna un volumen de diputados a cada provincia que depende de la población y de un porcentaje mínimo asignado por ley. En el contexto Hive que hemos creado previamente (```spark```) podemos encontrar una tabla (```province_28a```) que contiene información sobre las circunscripciones electorales. Cargad ésta tabla en una variable con nombre ```province```.  Cargad esta tabla en una variable con nombre ```province```.

In [None]:
# YOUR CODE HERE
raise NotImplementedError()

province.limit(20).show()

In [None]:
assert province.count() == 52, "Incorrect answer"

Para hacer un sampling estratificado lo primero que tenemos que hacer es determinar la fracción que queremos asignar a cada categoría. En este caso queremos una fracción que haga que la ratio tweets diputado sea igual para todas las capitales de provincia. Debemos tener en cuenta que la precisión de la geolocalización en Twitter es normalmente a nivel de ciudad. Por eso, para evitar incrementar la complejidad del ejercicio, vamos a utilizar los tweets en capitales de provincia como proxy de los tweets en toda la provincia.

### **Ejercicio 8**: Análisis de la Relación entre Tweets y Diputados por Provincia (*0.75 puntos*)

Lo primero que tenéis que hacer es crear un tabla ```info_tweets_province``` que debe contener:
- ***capital:*** nombre de la capital de provincia.
- ***tweets:*** número de tweets geolocalizados en cada capital
- ***diputados:*** diputados que asignados a la provincia.
- ***ratio_tweets_diputado:*** número de tweets por diputado.

Debéis ordenar la lista por ```ratio_tweets_diputado``` en orden ascendente.

***Nota:*** Podéis realizar este ejercicio de muchas maneras, probablemente la más fácil es utilizar la tabla ```tweets_place``` que habéis generado en el ejercicio 5. Recordad cómo utilizar el ```join()```

In [None]:
# YOUR CODE HERE
raise NotImplementedError()

In [None]:
# VARIABLES DADAS
output = info_tweets_province.first()
maximum_ratio = floor(output.ratio_tweets_diputado * 100) / 100

In [None]:
# DO NOT USE THIS CELL

A continuación, vamos a necesitar es un diccionario con nombre ```ratios``` donde cada capital de provincia es una llave y su valor asociado es la fracción de tweets que vamos a samplear. En este caso lo que queremos es que la ratio de tweets por cada diputado sea similar para cada capital de provincia.

Como queremos que el sampling sea lo más grande posible y no queremos que ninguna capital este infrarepresentada el ratio de tweets por diputado será el valor más pequeño podéis observar en la tabla ```info_tweets_province```, que corresponde a 11.66 tweets por diputado en Teruel. Tenéis este valor guardado en la variable ```maximum_ratio```.

*Nota:* El método ```collectAsMap()``` transforma un PairRDD en un diccionario.

Por último, genera una tabla ```geo_tweets``` con todos los tweets geolocalizados. Ahora ya estamos en disposición de hacer el sampling estratificado por población. Para ello podéis utilizar el método ```sampleBy()```. Utilizad 42 como seed del generador pseudoaleatorio.

In [None]:
# YOUR CODE HERE
raise NotImplementedError()

In [None]:
# DO NOT USE THIS CELL

## Introducción a los datos relacionales

El hecho de trabajar con una base de datos que contiene información generada en una red social nos permite introducir el concepto de datos relacionales. Podemos definir datos relacionales como aquellos en los que existen relaciones entre las entidades que constituyen la base de datos. Si estas relaciones son binarias, relaciones 1 a 1, podemos representar las relaciones como un grafo compuesto por un conjunto de vértices $\mathcal{V}$ y un conjunto de aristas $\mathcal{E}$ que los relacionan.

En el caso de grafos que emergen de manera orgánica, este tipo de estructura va más allá de los grafos regulares que seguramente conocéis. Este tipo de estructuras se conocen como [redes complejas](https://es.wikipedia.org/wiki/Red_compleja). El estudio de la estructura y dinámicas de este tipo de redes ha contribuido a importantes resultados en campos tan dispares como la física, la sociología, la ecología o la medicina.

![complex_network](https://images.squarespace-cdn.com/content/5150aec6e4b0e340ec52710a/1364574727391-XVOFAB9P6GHKTDAH6QTA/lastfm_800_graph_white.png?content-type=image%2Fpng)

En esta última parte de la práctica vamos a trabajar con este tipo de datos. En concreto vamos a modelar uno de las posibles relaciones presentes en el dataset, la red de retweets.

#### Construcción de la edgelist

Lo primero se os pide es que generéis la red. Hay diversas maneras de representar una red compleja, por ejemplo, si estuvierais interesados en trabajar en ellas desde el punto de vista teórico, la manera más habitual de representarlas es utilizando una [matriz de adyacencia](https://es.wikipedia.org/wiki/Matriz_de_adyacencia). En esta práctica vamos a centrarnos en el aspecto computacional, una de las maneras de más eficientes (computacionalmente hablando) de representar una red es mediante su [*edge list*](https://en.wikipedia.org/wiki/Edge_list), una tabla que especifica la relación a parejas entre las entidades.

Las relaciones pueden ser bidireccionales o direccionales y tener algún peso asignado o no (weighted or unweighted). En el caso que nos ocupa, estamos hablando de una red dirigida, un usuario retuitea a otro, y podemos pensarla teniendo en cuenta cuántas veces esto ha pasado.

#### Centralidad de grado

Uno de los descriptores más comunes en el análisis de redes es el grado. El grado cuantifica cuántas aristas están conectadas a cada vértice~s~. En el caso de redes dirigidas como la que acabamos de crear este descriptor está descompuesto en el:
- **in degree**: cuantas aristas apuntan al nodo
- **out degree**: cuantas aristas salen del nodo

Si haces un ranquing de estos valores vais a obtener una medida de centralidad, la [centralidad de grado](https://en.wikipedia.org/wiki/Centrality#Degree_centrality), de cada uno de los nodos.

### **Ejercicio 9**: Análisis de Interacciones de Retweets y Grados de Usuario (*0.75 puntos*)

A partir de una muestra homogénea del 1% de los tweets, con la semilla 42 para garantizar la reproducibilidad, realiza un análisis de las interacciones de retweets entre usuarios en la red social.

**Esquema**
```Python
seed = 42
sample = tweets.<FILL IN>
```
Crea una tabla ```edgelist``` con la siguiente información:
- ***src:*** usuario que retuitea
- ***dst:*** usuario que es retuiteado
- ***weight:*** número de veces que un usuario retuitea a otro.

Filtrar el resultado para que contenga sólo las relaciones con un weight igual o mayor a dos.

A continuación, genera una tabla `outDegree` con la información:
- ***screen_name:*** nombre del usuario.
- ***outDegree:*** out degree del nodo.

Ordenado la tabla por out degree en orden descendente.

Se os pide ahora que generéis una tabla `inDegree` con la información:
- ***screen_name:*** nombre del usuario.
- ***inDegree:*** in degree del nodo.

Ordenad la tabla por in degree en orden descendente.

In [None]:
# YOUR CODE HERE
raise NotImplementedError()

In [None]:
# DO NOT USE THIS CELL

### **Ejercicio 10**: Distribución del Grado de Salida en una Red de Retweets (*0.75 puntos*)

A partir de una muestra del 1% de los tweets, con una semilla de 42 para asegurar la reproducibilidad, realiza un análisis básico de la red de retweets. Tu objetivo es calcular y mostrar la distribución de grados de los usuarios en la red de retweets.

Para ello, sigue estos pasos:

- Crea una tabla de Edgelist: Define una tabla que contenga las relaciones de retweet entre usuarios, donde cada fila representa un retweet realizado de un usuario a otro.

- Calcula el Grado de Salida (Out-Degree): Determina cuántos retweets ha realizado cada usuario (es decir, el número de usuarios a los que cada usuario ha retweeteado).

- Obtén la Distribución de Grado de Salida: Crea una tabla que muestre cuántos usuarios tienen un determinado número de retweets realizados. Ordena los resultados por el grado de salida.

In [None]:
# YOUR CODE HERE
raise NotImplementedError()

In [None]:
# DO NOT USE THIS CELL