![Spark Logo](http://spark-mooc.github.io/web-assets/images/ta_Spark-logo-small.png)  ![Python Logo](http://spark-mooc.github.io/web-assets/images/python-logo-master-v3-TM-flattened_small.png)
# PEC 3: Noviembre 2019

## Extracción de conocimiento de fuentes de datos heterogéneas mediante Spark SQL, RDDs y GraphFrames

En esta práctica vamos a introducir estructuras de datos más complejas que las vistas hasta ahora, donde los campos pueden a su vez tener campos anidados. En concreto utilizaremos datos de twitter capturados en el contexto de las elecciones generales en España del 28 de Abril de 2019. La práctica está estructurada de la siguiente manera:
- **Parte 0:** Configuración del entorno
- **Parte 1:** Introducción a data frames estructurados y cómo operar extraer información *(1.5 puntos)*
    - **Parte 1.1:** Importar los datos *(0.25 puntos)*
    - **Parte 1.2:** *Queries* sobre sobre data frames complejos *(1.25 puntos)*
        - **Parte 1.2.1:** Queries SQL *(0.5 puntos)*
        - **Parte 1.2.2:** Queries sobre el pipeline *(0.75 puntos)*
- **Parte 2:** Bases de datos HIVE y operaciones complejas *(3.5 puntos)*
    - **Parte 2.1:** Bases de datos Hive *(0.25 puntos)*
    - **Parte 2.2:** Más allá de las transformaciones SQL *(2.75 puntos)*
        - **Parte 2.2.1:** Tweets por población  *(1.25 puntos)*
            - **Parte 2.2.1.1:** Utilizando SQL *(0.25 puntos)*
            - **Parte 2.2.1.2:** Utilizando RDD *(1 punto)*
        - **Parte 2.2.2:** Contar hashtags *(1.5 puntos)*
- **Parte 3:** Sampling *(3 Puntos)*
    - **Parte 3.1:** Homogéneo *(1 punto)*
    - **Parte 3.2:** Estratificado *(1.5 puntos)*
- **Parte 4**: Introducción a los datos relacionales *(2 puntos)*
    - **Parte 4.1:** Generar la red de retweets *(1 punto)*
        - **Parte 4.1.1**: Construcción de la edgelist *(0.5 puntos)*
        - **Parte 4.1.2**: Centralidad de grado *(0.5 puntos)*
    - **Parte 4.2:** Análisis de redes utilitzando GraphFrames *(1 punto)*
        - **Parte 4.2.1:** Crear un graph frame (0.5 puntos)
        - **Parte 4.2.2:** Centralidad PageRank (0.5 puntos)

## **Parte 0:** Configuración del entorno

In [204]:
import findspark
findspark.init()

In [205]:
import re
import os
import pandas as pd
from matplotlib import pyplot as plt
from math import floor
from pyspark import SparkConf, SparkContext, SQLContext, HiveContext
from pyspark.sql import Row

In [206]:
SUBMIT_ARGS = "--packages graphframes:graphframes:0.7.0-spark2.4-s_2.11 pyspark-shell"
os.environ["PYSPARK_SUBMIT_ARGS"] = SUBMIT_ARGS

conf = SparkConf()
conf.setMaster("local[1]")
# Introducid el nombre de la app PEC3_ seguido de vuestro nombre de usuario
conf.setAppName('PEC3_antoniogmartin')
sc = SparkContext.getOrCreate(conf)

## **Parte 1:** Introducción a data frames estructurados i operaciones sobre ellos.

Como ya se ha mencionado, en esta práctica vamos ha utilizar datos de Twitter que recolectamos durante las elecciones generales en España del 28 de abril de 2019. Como veremos, los tweets tienen una estructura interna bastante compleja que hemos simplificado un poco en esta práctica.

### **Parte 1.1:** Importar los datos

Lo primero que vamos ha aprender es cómo importar este tipo de datos a nuestro entorno. Uno de los tipos de archivos más comunes para guardar este formato de información es [la estructura JSON](https://en.wikipedia.org/wiki/JSON). Esta estructura permite guardar información en un texto plano de diferentes objetos siguiendo una estructura de diccionario donde cada campo tiene asignado una llave y un valor. La estructura puede ser anidada, o sea que una llave puede tener como valor otra estructura tipo diccionario.

Spark SQL permite leer datos de muchos formatos diferentes (como recordareis de la anterior práctica donde leímos un fichero CSV). En esta ocasión, se os pide que leáis un fichero JSON de la ruta ```/aula_M2.858/data/tweets28a_sample.json```. Este archivo contiene un pequeño *sample*, un 0.1% de la base de datos completa (en un siguiente apartado veremos cómo realizar este *sampleado*). En esta ocasión no se os pide especificar la estructura del data frame ya que la función de lectura la inferirá automáticamente.

In [207]:
sqlContext = SQLContext(sc)
tweets_sample = sqlContext.read.json("/aula_M2.858/data/tweets28a_sample.json")

print("Loaded dataset contains %d tweets" % tweets_sample.count())

Loaded dataset contains 27268 tweets


El siguiente paso es mostrar la estructura del dataset que acabamos de cargar. Recordad que podéis obtener la información acerca de cómo está estructurado el DataTable utilizando el método ```printSchema()```.

In [208]:
tweets_sample.printSchema()

root
 |-- _id: string (nullable = true)
 |-- created_at: long (nullable = true)
 |-- lang: string (nullable = true)
 |-- place: struct (nullable = true)
 |    |-- bounding_box: struct (nullable = true)
 |    |    |-- coordinates: array (nullable = true)
 |    |    |    |-- element: array (containsNull = true)
 |    |    |    |    |-- element: array (containsNull = true)
 |    |    |    |    |    |-- element: double (containsNull = true)
 |    |    |-- type: string (nullable = true)
 |    |-- country_code: string (nullable = true)
 |    |-- id: string (nullable = true)
 |    |-- name: string (nullable = true)
 |    |-- place_type: string (nullable = true)
 |-- retweeted_status: struct (nullable = true)
 |    |-- _id: string (nullable = true)
 |    |-- user: struct (nullable = true)
 |    |    |-- followers_count: long (nullable = true)
 |    |    |-- friends_count: long (nullable = true)
 |    |    |-- id_str: string (nullable = true)
 |    |    |-- lang: string (nullable = true)
 |    

Podéis observar que la estructura del tweet contiene múltiples campos anidados. Teneis que familiarizaros con esta estructura ya que será la que utilizaremos durante toda la práctica. Recordad también que no todos los tweets tienen todos los campos, como por ejemplo la ubicación (campo ```place```). Cuando esto pasa el campo pasa a ser ```NULL```. Podéis ver mas información sobre este tipo de datos en [este enlace](https://developer.twitter.com/en/docs/tweets/data-dictionary/overview/tweet-object).

### **Parte 1.2:** *Queries* sobre sobre data frames complejos

En la anterior práctica hemos visto cómo hacer consultas sobre un dataset muy simple utilizando sentencias *SQL*. En esta parte vamos a refrescar los conceptos utilizados en la práctica anterior introduciendo algunos conceptos más avanzados y una nueva manera de trabajar sobre data tables.

#### **Parte 1.2.1:** Queries SQL

Como recordaréis de la parte 3 de la anterior PEC, el primer paso consiste en registrar la tabla en el contexto SQL comprobando primero si existe y borrándola en el caso que sea así. En este apartado se os pide que registréis la tabla ```tweets_sample``` que acabamos de cargar en el contexto sql bajo el mismo nombre ```tweets_sample```.

In [209]:
sqlContext.sql("DROP TABLE IF EXISTS tweets_sample")
sqlContext.registerDataFrameAsTable(tweets_sample,"tweets_sample")

Ahora se os pide que creeis una tabla ```users_agg``` con [la información agregada](https://www.w3schools.com/sql/sql_groupby.asp) de los usuarios que tengan definido su idioma (```user.lang```) como español (```es```). En concreto se os pide que la tabla contenga las siguientes columnas:
- **screen_name:** nombre del usuario
- **friends_count:** número máximo (ver nota) de personas a las que sigue
- **tweets:** número de tweets realizados
- **followers_count:** número máximo (ver nota) personas que siguen al usuario.

El orden en el cual se deben mostrar los registros es orden descendente acorde al número de tweets.

***Nota:*** es importante que os fijéis que el nombre de *friends* i *followers* puede diferir a lo largo de la adquisición de datos. En este caso vamos ha utilizar la función de agregación ```MAX``` sobre cada uno de estos campos para evitar segmentar el usuario en diversas instancias.

In [210]:
#users = sqlContext.sql("SELECT user.id_str, retweeted_status,text from tweets_sample where user.screen_name='anaoromi'")
#users.show()

In [211]:
users_agg = sqlContext.sql("SELECT user.screen_name,COUNT(user.screen_name) as tweets,MAX(user.friends_count) as friends_count,MAX(user.followers_count) as followers_count,user.lang as lang FROM tweets_sample where user.lang='es' GROUP BY user.lang, user.screen_name ORDER BY COUNT(user.screen_name) DESC ")
users_agg.limit(10).show()

+---------------+------+-------------+---------------+----+
|    screen_name|tweets|friends_count|followers_count|lang|
+---------------+------+-------------+---------------+----+
|       anaoromi|    16|         6258|           6774|  es|
|    RosaMar6254|    14|         6208|           6245|  es|
|        lyuva26|    13|         3088|           3732|  es|
|PisandoFuerte10|    12|         2795|           1752|  es|
|     carrasquem|    12|          147|            215|  es|
|       jasalo54|    11|         1889|            689|  es|
|      lolalailo|     9|         4922|           3738|  es|
|  Rafa_eltorete|     9|          908|           1060|  es|
| locuspolitikus|     9|        11261|          10244|  es|
|  PabloChabolas|     9|         4925|           4042|  es|
+---------------+------+-------------+---------------+----+



In [212]:
output = users_agg.first()
assert output.screen_name == 'anaoromi' and output.friends_count == 6258 and output.tweets == 16 and output.followers_count == 6774, "Incorrect output"

Imaginad ahora que queremos combinar la información que acabamos de generar con información acerca del número de veces que un usuario ha sido retuiteado. Para hacer este tipo de combinaciones necesitamos recurrir al [```JOIN``` de tablas](https://www.w3schools.com/sql/sql_join.asp). Primero debemos registrar la tabla que acabamos de generar en el contexto SQL. Recordad que primero debéis comprobar si la tabla existe y en caso afirmativo eliminarla. La tabla tenéis que registrarla bajo el nombre de ```user_agg```.

In [213]:
sqlContext.sql("DROP TABLE IF EXISTS user_agg")
sqlContext.registerDataFrameAsTable(users_agg,"user_agg")

Una vez registrada se pide que combinéis esta tabla y la tabla ```tweets_sample``` utilizando un ```INNER JOIN``` para obtener una nueva tabla con la siguiente información:
- ***screen_name:*** nombre de usuario
- ***friends_count:*** número máximo de personas a las que sigue
- ***followers_count:*** número máximo de personas que siguen al usuario.
- ***tweets:*** número de tweets realizados por el usuario.
- ***retweeted:*** número de retweets obtenidos por el usuario.
- ***ratio_tweet_retweeted:*** ratio de retweets por número de tweets publicados $\frac{retweets}{tweets}$

In [214]:
retweeted = sqlContext.sql("""
select B.rt_name as screen_name,MAX(A.tweets) as tweets,MAX(B.retweeted) as retweeted,
MAX(A.friends_count) as friends_count,
MAX(A.followers_count) as followers_count,
MAX(B.retweeted/A.tweets) as ratio_tweet_retweeted 

FROM user_agg A INNER JOIN
(select retweeted_status.user.screen_name as rt_name,
count(retweeted_status.user.screen_name) as retweeted
from tweets_sample group by retweeted_status.user.screen_name) B 
ON A.screen_name=B.rt_name GROUP BY B.rt_name ORDER BY retweeted DESC
""")
retweeted.limit(10).show()

+--------------+------+---------+-------------+---------------+---------------------+
|   screen_name|tweets|retweeted|friends_count|followers_count|ratio_tweet_retweeted|
+--------------+------+---------+-------------+---------------+---------------------+
|          PSOE|     1|      155|        13635|         671073|                155.0|
|  CiudadanosCs|     1|      117|        92910|         511896|                117.0|
|     JuntsXCat|     1|       73|          202|          88515|                 73.0|
|  PartidoPACMA|     1|       63|         1498|         232932|                 63.0|
|  pablocasado_|     1|       50|         4567|         238926|                 50.0|
|voxnoticias_es|     1|       44|         2146|          29582|                 44.0|
|RaiLopezCalvet|     1|       43|         7579|          13574|                 43.0|
|        iunida|     1|       39|        10225|         558318|                 39.0|
|        Xuxipc|     1|       37|          311|       

In [215]:
output = retweeted.first()
assert output.screen_name == 'PSOE' and output.friends_count == 13635 and output.tweets == 1 and output.followers_count == 671073 and output.ratio_tweet_retweeted == 155.0 and output.retweeted == 155, "Incorrect output"

#### **Parte 1.2.2:** Queries a través del pipeline

Las tablas de Spark SQL ofrecen otro mecanismo para aplicar las transformaciones y obtener resultados similares a los que se obtendría aplicando una consulta SQL. Por ejemplo utilizando el siguiente pipeline obtendremos el texto de todos los tweets en español:

```
tweets_sample.where("lang == 'es'").select("text")
```

Que es equivalente a la siguiente sentencia SQL:

```
SELECT text
FROM tweets_sample
WHERE lang == 'es'
```

Podéis consultar el [API de spark SQL](https://spark.apache.org/docs/latest/api/python/pyspark.sql.html) para encontrar más información sobre como utilitzar las diferentes transformaciones en tablas.

En este ejercicio se os pide que repliquéis la query obtenida en el apartado anterior empezando por generar la tabla ```users_agg```. Podéis utilizar las transformaciones ```where```, ```select``` (o ```selectExpr```), ```groupBy```, ```count```, ```agg``` y ```orderBy```

In [216]:
#users_agg = sqlContext.sql("SELECT user.screen_name,COUNT(user.screen_name) as tweets,MAX(user.friends_count) as friends_count,MAX(user.followers_count) as followers_count,user.lang as lang FROM tweets_sample where user.lang='es' GROUP BY user.lang, user.screen_name ORDER BY COUNT(user.screen_name) DESC ")

users = tweets_sample.where("user.lang=='es'").select("user.id_str","user.screen_name","user.lang","user.friends_count","user.followers_count")

users_agg = users.groupBy("screen_name","lang")\
                 .agg({"friends_count":"max","followers_count":"max","screen_name":"count"})\
                 .orderBy("count(screen_name)",ascending=False)

users_agg.limit(10).show()

+---------------+----+------------------+------------------+--------------------+
|    screen_name|lang|max(friends_count)|count(screen_name)|max(followers_count)|
+---------------+----+------------------+------------------+--------------------+
|       anaoromi|  es|              6258|                16|                6774|
|    RosaMar6254|  es|              6208|                14|                6245|
|        lyuva26|  es|              3088|                13|                3732|
|     carrasquem|  es|               147|                12|                 215|
|PisandoFuerte10|  es|              2795|                12|                1752|
|       jasalo54|  es|              1889|                11|                 689|
| locuspolitikus|  es|             11261|                 9|               10244|
|      kikyosanz|  es|               154|                 9|                 273|
|  Rafa_eltorete|  es|               908|                 9|                1060|
|      lolalailo

Si os fijáis veréis que el nombre de las columnas no corresponde con el obtenido anteriormente, podéis cambiar el nombre de una columna determinada utilizando la transformación ```withColumnRenamed```. Cambiad el nombre de las columnas para que coincidan con el apartado anterior y guardadlas en una variable ```user_agg_new```.

In [217]:
users_agg_new = users_agg.withColumnRenamed('max(friends_count)','friends_count')\
                         .withColumnRenamed('count(screen_name)','tweets')\
                         .withColumnRenamed('max(followers_count)','followers_count')

users_agg_new.limit(10).show()

+---------------+----+-------------+------+---------------+
|    screen_name|lang|friends_count|tweets|followers_count|
+---------------+----+-------------+------+---------------+
|       anaoromi|  es|         6258|    16|           6774|
|    RosaMar6254|  es|         6208|    14|           6245|
|        lyuva26|  es|         3088|    13|           3732|
|     carrasquem|  es|          147|    12|            215|
|PisandoFuerte10|  es|         2795|    12|           1752|
|       jasalo54|  es|         1889|    11|            689|
| locuspolitikus|  es|        11261|     9|          10244|
|      kikyosanz|  es|          154|     9|            273|
|  Rafa_eltorete|  es|          908|     9|           1060|
|      lolalailo|  es|         4922|     9|           3738|
+---------------+----+-------------+------+---------------+



In [218]:
output = users_agg_new.first()
assert output.screen_name == 'anaoromi' and output.friends_count == 6258 and output.tweets == 16 and output.followers_count == 6774, "Incorrect output"

Cread ahora una tabla ```user_retweets``` utilizando transformaciones que contenga dos columnas:
- ***screen_name:*** nombre de usuario
- ***retweeted:*** número de retweets

Podéis utilizar las mismas transformaciones que en el ejercicio anterior. Ordenad la tabla en orden descendente utilizando el valor de la columna ```retweeted```.

In [219]:
user_retweets_aux = tweets_sample.select("retweeted_status.user.screen_name","retweeted_status.user.id_str")

user_retweets = user_retweets_aux.groupBy("screen_name")\
                                 .agg({"id_str": "count","screen_name": "count"})\
                                 .orderBy("count(screen_name)", ascending=False)
user_retweets.limit(10).show()

+--------------+------------------+-------------+
|   screen_name|count(screen_name)|count(id_str)|
+--------------+------------------+-------------+
|        vox_es|               299|          299|
|  ahorapodemos|               238|          238|
| Santi_ABASCAL|               238|          238|
|      iescolar|               166|          166|
| AlbanoDante76|               161|          161|
|          PSOE|               155|          155|
|AntonioMaestre|               154|          154|
|          KRLS|               149|          149|
|        boye_g|               142|          142|
|  CiudadanosCs|               117|          117|
+--------------+------------------+-------------+



In [220]:
user_retweets=user_retweets.withColumnRenamed('max(friends_count)','friends_count')\
                         .withColumnRenamed('count(screen_name)','retweeted')\
                         .withColumnRenamed('count(id_str)','retweets_by_id')

In [221]:
output = user_retweets.first()
assert output.screen_name == 'vox_es' and output.retweeted == 299, "Incorrect output"

Otra manera de combinar dos tablas es utilizando el [metodo de tabla ```join```](https://spark.apache.org/docs/latest/api/python/pyspark.sql.html). Combinad la información de la tabla ```users_agg_new``` y ```user_retweets``` en una nueva tabla ```retweeted``` utilizando la columna ```screen_name```. Ordenad la nueva tabla en orden descendente con el nombre de retweets.

In [222]:
df1 = users_agg_new.alias('df1')
df2 = user_retweets.alias('df2')


In [223]:

retweeted = df1.join(df2,df1.screen_name == df2.screen_name)\
                         .orderBy(df2.retweeted, ascending=False)
retweeted.limit(10).show()

+--------------+----+-------------+------+---------------+--------------+---------+--------------+
|   screen_name|lang|friends_count|tweets|followers_count|   screen_name|retweeted|retweets_by_id|
+--------------+----+-------------+------+---------------+--------------+---------+--------------+
|          PSOE|  es|        13635|     1|         671073|          PSOE|      155|           155|
|  CiudadanosCs|  es|        92910|     1|         511896|  CiudadanosCs|      117|           117|
|     JuntsXCat|  es|          202|     1|          88515|     JuntsXCat|       73|            73|
|  PartidoPACMA|  es|         1498|     1|         232932|  PartidoPACMA|       63|            63|
|  pablocasado_|  es|         4567|     1|         238926|  pablocasado_|       50|            50|
|voxnoticias_es|  es|         2146|     1|          29582|voxnoticias_es|       44|            44|
|RaiLopezCalvet|  es|         7579|     1|          13574|RaiLopezCalvet|       43|            43|
|        i

In [224]:
output = retweeted.first()
assert output.screen_name == 'PSOE' and output.friends_count == 13635 and output.tweets == 1 and output.followers_count == 671073 and output.retweeted == 155, "Incorrect output"

Notaréis que algunos de los registros que aparecen en la tabla ```users_retweeted``` no están presentes en la tabla retweeted. Esto es debido a que, por defecto, el método aplica un inner join y por tanto solo combina los registros presentes en ambas tablas. Podéis cambiar este comportamiento a través de los parámetros de la función.

Para terminar esta parte y reconstruir el resultado del apartado 1.2.1 vamos a añadir una columna ```ratio_tweet_retweeted``` con información del ratio entre retweets y tweets. Para ello debéis utilizar la transformación ```withColumn```. El resultado debe estar ordenado considerando esta nueva columna en orden descendente.

In [225]:
retweeted = retweeted.withColumn("ratio_tweet_retweeted",retweeted.retweeted/retweeted.tweets)
retweeted.limit(10).show()

+--------------+----+-------------+------+---------------+--------------+---------+--------------+---------------------+
|   screen_name|lang|friends_count|tweets|followers_count|   screen_name|retweeted|retweets_by_id|ratio_tweet_retweeted|
+--------------+----+-------------+------+---------------+--------------+---------+--------------+---------------------+
|          PSOE|  es|        13635|     1|         671073|          PSOE|      155|           155|                155.0|
|  CiudadanosCs|  es|        92910|     1|         511896|  CiudadanosCs|      117|           117|                117.0|
|     JuntsXCat|  es|          202|     1|          88515|     JuntsXCat|       73|            73|                 73.0|
|  PartidoPACMA|  es|         1498|     1|         232932|  PartidoPACMA|       63|            63|                 63.0|
|  pablocasado_|  es|         4567|     1|         238926|  pablocasado_|       50|            50|                 50.0|
|voxnoticias_es|  es|         21

In [226]:
output = retweeted.first()
assert output.screen_name == 'PSOE' and output.friends_count == 13635 and output.tweets == 1 and output.followers_count == 671073 and output.ratio_tweet_retweeted == 155.0 and output.retweeted == 155, "Incorrect output"

## **Parte 2:** Bases de datos HIVE y operaciones complejas

Hasta ahora hemos estado trabajando con un pequeño sample de los tweets generados (el 0.1%). En esta parte de la PEC vamos a ver como trabajar y tratar con el dataset completo. Para ello vamos ha utilizar tanto transformaciones sobre tablas como operaciones sobre RDD cuando sea necesario.

### **Parte 2.1:** Bases de datos Hive

Muchas veces los datos con los que vamos ha trabajar se van a utilizar en diversos proyectos. Una manera de organizar los datos es, en lugar de utilizar directamente los ficheros, recurrir a una base de datos para gestionar la información. En el entorno Hadoop una de las bases de datos más utilizadas es [Apache Hive](https://hive.apache.org/), una base de datos que permite trabajar con contenido distribuido.

La manera de acceder a esta base de datos es creando un contexto Hive de manera muy similar a como declaramos un contexto SQL. Primero de todo vamos a declarar un variable ```hiveContext``` instanciándola como un objeto de la classe ```HiveContext```. Acto seguido vamos a comprobar cuantas tablas están registradas en este contexto.

In [227]:
hiveContext = HiveContext(sc)
hiveContext.tables().show()

+--------+----------------+-----------+
|database|       tableName|isTemporary|
+--------+----------------+-----------+
| default|    province_28a|      false|
| default|       tweets28a|      false|
| default|       user_info|      false|
|        |   tweets_sample|       true|
|        |tweets_sampling3|       true|
|        |        user_agg|       true|
+--------+----------------+-----------+



Observad que ahora mismo tenemos cinco tablas registradas en este contexto. Tres de ellas no temporales y dos temporales, las que hemos registrado previamente. Por tanto sqlContext y hiveContext están concetados (es la misma sessión)

Vamos ha crear una variable ```tweets``` que utilizaremos para acceder a la tabla ```tweets28a``` guardada en ```hiveContext``` utilizando para ello el método ```table()``` de este objeto.

In [228]:
tweets =  hiveContext.table("tweets28a")
print("Loaded dataset contains {} tweets".format(tweets.count()))

Loaded dataset contains 25419835 tweets


Utilizando el mismo método que en el apartado 1.1, comprobad la estructura de la tabla que acabamos de cargar.

In [229]:
tweets.printSchema()

root
 |-- _id: string (nullable = true)
 |-- created_at: timestamp (nullable = true)
 |-- lang: string (nullable = true)
 |-- place: struct (nullable = true)
 |    |-- bounding_box: struct (nullable = true)
 |    |    |-- coordinates: array (nullable = true)
 |    |    |    |-- element: array (containsNull = true)
 |    |    |    |    |-- element: array (containsNull = true)
 |    |    |    |    |    |-- element: double (containsNull = true)
 |    |    |-- type: string (nullable = true)
 |    |-- country_code: string (nullable = true)
 |    |-- id: string (nullable = true)
 |    |-- name: string (nullable = true)
 |    |-- place_type: string (nullable = true)
 |-- retweeted_status: struct (nullable = true)
 |    |-- _id: string (nullable = true)
 |    |-- user: struct (nullable = true)
 |    |    |-- followers_count: long (nullable = true)
 |    |    |-- friends_count: long (nullable = true)
 |    |    |-- id_str: string (nullable = true)
 |    |    |-- lang: string (nullable = true)
 

### **Parte 2.2:** Más allá de las transformaciones SQL

Algunas veces vamos a necesitar obtener resultados que precisan operaciones que van más allá de lo que podemos conseguir utilizando el lenguaje SQL. En esta parte de la práctica vamos practicar cómo pasar de una tabla a un RDD, para hacer operaciones complejas, y luego volver a pasar a una tabla.

#### **Parte 2.2.1:** Tweets por población
##### **Parte 2.2.1.1:** Utilizando SQL
Un pequeño porcentaje, alrededor del 1%, de los tweets realizados está geolocalizado. Eso quiere decir que para estos tweets tenemos información acerca del lugar donde han sido realizados guardado en el campo ```place```. En este ejercicio se pide que utilizando una sentencia SQL mostréis en orden descendente cuántos tweets se han realizado en cada lugar. La tabla resultante ```tweets_place``` debe tener las siguientes columnas:
- ***name:*** nombre del lugar
- ***tweets:*** número de tweets

Recordad que no todos los tweets en la base de datos tienen que tener información geolocalizada, tenéis que filtrarlos teniendo en cuenta todos los que tienen un valor no nulo.

In [230]:
tweets_place = hiveContext.sql("""
select place.name,count(user.id_str) as tweets
from tweets28a WHERE place is not NULL 
GROUP BY place.name ORDER BY count(user.id_str) DESC
""")
tweets_place.limit(10).show()

+-----------+------+
|       name|tweets|
+-----------+------+
|     Madrid| 19655|
|  Barcelona| 13987|
|    Sevilla|  3820|
|   Valencia|  2833|
|   Zaragoza|  2449|
|Villamartín|  2364|
|     Málaga|  2184|
|     Murcia|  1800|
|    Granada|  1637|
|   Alicante|  1628|
+-----------+------+



In [231]:
output = tweets_place.first()
assert output.name == "Madrid" and output.tweets == 19655, "Incorrect output"

##### **Parte 2.2.1.2:** Utilizando RDD

Ahora se os pide que hagáis lo mismo pero esta vez utilizando RDD para realizar la agregación (recordad los ejercicios de contar palabras que hicisteis en la PEC 1).

El primer paso consiste en generar un tabla ```tweets_geo``` que solo contenga información de tweets geolocalizados con una sola columna:
- ***name:*** nombre del lugar desde donde se ha generado el tweet

In [232]:
#dbutils.fs.rm("dbfs:/user/hive/warehouse/tweets_geo/", true)

In [233]:
hiveContext.sql("""
 DROP TABLE IF EXISTS tweets_geo3""")



DataFrame[]

In [234]:
tweets_geo = hiveContext.sql("""
CREATE TABLE tweets_geo3 AS 
(SELECT place.name from tweets28a WHERE
place is not NULL)
""")

In [235]:
tweets_geo=hiveContext.sql("""
SELECT * from tweets_geo3
""")
tweets_geo.limit(12).show()

+--------------------+
|                name|
+--------------------+
|Las Palmas de Gra...|
|Las Palmas de Gra...|
|    Collado Villalba|
|            Palencia|
|               Egüés|
|             Córdoba|
|Castellón de la P...|
| Granadilla de Abona|
|San Vicente del R...|
|              Madrid|
|         El Vendrell|
|             Granada|
+--------------------+



Ahora viene la parte interesante. Una tabla puede convertirse en un RDD a través del atributo ```.rdd```. Este atributo guarda la información de la tabla en una lista donde cada elemento es un [objeto del tipo ```Row```](https://spark.apache.org/docs/1.1.1/api/python/pyspark.sql.Row-class.html). Los objetos pertenecientes a esta clase pueden verse como diccionarios donde la información de las diferentes columnas queda reflejada en forma de atributo. Por ejemplo, imaginad que tenemos una tabla con dos columnas, nombre y apellido, si utilizamos el atributo ```.rdd``` de dicha tabla obtendremos una lista con objetos del tipo row donde cada objeto tiene dos atributos: nombre y apellido. Para acceder a los atributos solo tenéis que utilizar la sintaxis *punto* de Python, e.g., ```row.nombre``` o ```row.apellido```.

En esta parte del ejercicio se os pide que creeis un objeto ```tweets_lang_rdd``` que contenga una lista de tuplas con la información ```(name, tweets)``` sobre el nombre del lugar y el número de tweets generados desde allí. Recordad el ejercicio de contar palabras de la PEC 1.

In [236]:
tweets_place_rdd = tweets_geo.rdd

In [237]:
print(tweets_place_rdd.take(1))

[Row(name='Las Palmas de Gran Canaria')]


In [238]:
from operator import add
tweets_place_rdd=tweets_place_rdd.map(lambda a: (a[0], 1))
tweets_place_rdd=tweets_place_rdd.reduceByKey(add).sortBy(lambda a: a[1],ascending=False)


In [239]:
print(tweets_place_rdd.take(20))

[('Madrid', 19655), ('Barcelona', 13987), ('Sevilla', 3820), ('Valencia', 2833), ('Zaragoza', 2449), ('Villamartín', 2364), ('Málaga', 2184), ('Murcia', 1800), ('Granada', 1637), ('Alicante', 1628), ('Palma', 1597), ('Gijón', 1421), ('Oviedo', 1318), ('Las Palmas de Gran Canaria', 1290), ('Valladolid', 1272), ('Vigo', 1235), ('Córdoba', 1048), ('Girona', 957), ('Terrassa', 951), ("L'Hospitalet de Llobregat", 909)]


Una vez generado este RDD vamos a crear un tabla. El primer paso es generar por cada tupla un objeto Row que contenga un atributo ```name``` y un atributo ```tweets```. Ahora solo tenéis que aplicar el método ```toDF()``` para generar una tabla. Ordenad las filas de esta tabla por el número de tweets en orden descendente.

In [240]:
tweets_place = tweets_place_rdd.toDF().withColumnRenamed('_1','name').withColumnRenamed('_2','tweets')

tweets_place.limit(10).show()

+-----------+------+
|       name|tweets|
+-----------+------+
|     Madrid| 19655|
|  Barcelona| 13987|
|    Sevilla|  3820|
|   Valencia|  2833|
|   Zaragoza|  2449|
|Villamartín|  2364|
|     Málaga|  2184|
|     Murcia|  1800|
|    Granada|  1637|
|   Alicante|  1628|
+-----------+------+



In [241]:
output = tweets_place.first()
assert output.name == "Madrid" and output.tweets == 19655, "Incorrect output"

#### **Parte 2.2.2:** Contar hashtags

En el ejercicio anterior hemos visto cómo podemos generar la misma información haciendo una agregación mediante SQL o utilizando RDDs. Como seguro que habéis observado la semántica de la sentencia SQL es mucho más limpia para realizar esta tarea. Pero no todas las tareas que os vais a encontrar se pueden hacer mediante sentencias SQL. En este ejercicio vamos a ver un ejemplo.

El objetivo de este ejercicio es contar el número de veces que cada hashtag (palabras precedidas por un #) ha aparecido en el dataset. Para evitar la sobrerrepresentación debida a los retweets vamos a concentrarnos en solo aquellos tweets que no son retweets de ningún otro, o dicho de otra manera, en aquellos en los que el campo ```retweeted_status``` es nulo. Cread una variable ```non_retweets``` que contenga todos estos tweets.

In [242]:
non_retweets = hiveContext.sql("""
SELECT text from tweets28a WHERE retweeted_status IS NULL
""")
non_retweets.limit(10).show()

+--------------------+
|                text|
+--------------------+
|@Pablo_Iglesias_ ...|
|@josebouvila @Ada...|
|— Mariano Rajoy ¿...|
|Vamos a ver... SI...|
|@albertoertoo Por...|
|@FrancescFalip @A...|
|Hey Vox, we just ...|
|         sub: normal|
|@JorgeBustos1 Tra...|
|@sanchezdelreal @...|
+--------------------+



Seguidamente vamos ha crear una variable ```hashtags``` que contenga una lista de tuplas con la información ```(hashtag, count)```. Para ello, cread un RDD que contenga una lista con el texto de todos los tweets. Una vez hecho este paso tenéis que extraer los hashtags (palabras precedidas por un #) y contarlos.

Recordad los conocimientos adquiridos en la PEC 1 y el anterior ejercicio, os serán de gran ayuda.

In [243]:
non_retweets_rdd = non_retweets.rdd
print(non_retweets_rdd.take(1))

[Row(text='@Pablo_Iglesias_ @MiguelTheFaker Pablo ya verás como se entere más de uno que había sitio en bodega y has ido en turista')]


In [244]:
import re
def keepHashTag(text):
    minusc = text.lower()
    hashtag= re.sub(r'([^\#\w+])+\w*','', minusc)
    hashtag=re.sub(r'^(\w+)','', hashtag)
    res = hashtag.strip('#')
    return res
    
print(keepHashTag("""#001
dlfkslaklfs #002 #003rjriwd#004
#005##006###007"""))
print(keepHashTag('Cayetana #Álvarez es de Pueblo Paleta.'))
print(keepHashTag(' *      Remove punctuation #then#spaces#s  * '))


001#002#003rjriwd#004#005##006###007
álvarez
then#spaces#s


In [245]:
def hashtagCount(hashtag):
    return hashtag.map(lambda a: (a, 1)).reduceByKey(add).sortBy(lambda kv: (kv[1]),ascending=False)

In [246]:
hashtags = non_retweets_rdd.map(lambda a: keepHashTag(a[0])).filter(lambda x: len(x)>0)


In [247]:
hashtags.take(6)

['valorseguro',
 'yovotovox#abascalpresidente#hazquearrasevox#voxavanza#voxextremanecesidad#voxsaleaganar',
 'vox',
 'l6nencampaña#28a#votapsoe',
 'l6nnotredame#juevespsanto#debatertve#rumbournasarv#cuatroaldia44#elintermedio#españaviva#ar18a',
 '28a']

In [248]:
hashtags_splitted =  hashtags.flatMap(lambda x: x.split("#"))

In [249]:
hashtags_splitted.take(20)

['valorseguro',
 'yovotovox',
 'abascalpresidente',
 'hazquearrasevox',
 'voxavanza',
 'voxextremanecesidad',
 'voxsaleaganar',
 'vox',
 'l6nencampaña',
 '28a',
 'votapsoe',
 'l6nnotredame',
 'juevespsanto',
 'debatertve',
 'rumbournasarv',
 'cuatroaldia44',
 'elintermedio',
 'españaviva',
 'ar18a',
 '28a']

In [250]:
hashtags_result = hashtagCount(hashtags_splitted)

Finalmente, se os pide que con el RDD obtenido generéis una tabla ```hashtagsTable``` compuesta de dos columnas:
- ***hashtag***
- ***num:*** número de veces que aparece cada hashtag.

Ordenadla en orden descendente por número de tweets.

In [251]:
hashtagsTable =  hashtags_result.toDF().withColumnRenamed('_1','hashtag').withColumnRenamed('_2','num')
hashtagsTable.limit(20).show()

+--------------------+------+
|             hashtag|   num|
+--------------------+------+
|                 28a|172083|
|    eldebatedecisivo|114066|
|      eldebateenrtve|104852|
|eleccionesgeneral...| 35140|
|      equiparacionya| 33777|
|        eleccionesl6| 31187|
|          hazquepase| 29011|
|    debateatresmedia| 23286|
|                 vox| 22280|
|          debatertve| 18991|
|lahistorialaescri...| 18097|
|            debattv3| 18027|
|           porespaña| 17054|
|            votapsoe| 16733|
| eleccionesgenerales| 15905|
|          ilpjusapol| 15768|
|             28abril| 15089|
|          españaviva| 13163|
|         valorseguro| 12224|
|              españa| 12204|
+--------------------+------+



In [252]:
# Compruebese mi expresión regular que funciona bastante bien
output = hashtagsTable.first()
print(output.num)
assert output.hashtag == "#28A" and output.num == 158124, "Incorrect output" 

172083


AssertionError: Incorrect output

## **Parte 3:** Sampling

En muchas ocasiones, antes de lanzar costoso procesos, es una práctica habitual tratar con un pequeño conjunto de los datos para investigar algunas propiedades o simplemente para debugar nuestros algoritmos, a esta tarea se la llama sampling. En esta parte de la práctica vamos a ver los dos principales métodos de sampling y cómo utilizarlos.

### **Parte 3.1:** Homogeneo

El primer sampling que vamos a ver es [el homogeneo](https://en.wikipedia.org/wiki/Simple_random_sample). Este sampling se basta en simplemente escoger una fracción de la población seleccionando aleatoriamente elementos de la misma.

Primero de todo vamos ha realizar un sampling homogéneo del 1% de los tweets generados en periodo electoral sin reemplazo. Guardad en una variable ```tweets_sample``` este sampling utilizando el método ```sample``` descrito en la [API de pyspark SQL](https://spark.apache.org/docs/latest/api/python/pyspark.sql.html). El seed que vais a utilizar para inicializar el generador aleatorio es 42.

In [253]:
seed = 42
fraction = 0.01

tweets_sample = tweets.sample(fraction, seed)
print("Number of tweets sampled: {0}".format(tweets_sample.count()))

Number of tweets sampled: 254185


In [254]:
assert tweets_sample.count() == 254185, "Incorrect output"

Una de las cosas que resulta interesante comprobar acerca de los patrones de uso de las redes sociales es el patrón de uso diario. En este caso nos interesa el número promedio de tweets que se genera cada hora del día. Para extraer esta información lo que haremos primero, será generar una tabla ```tweets_timestamp``` con la información:
- ***created_at***: timestamp de cuando se publicó el tweet.
- ***hour***: a que hora del dia corresponde.
- ***day***: Fecha en formato MM-dd-YY

La fecha que figura en la base de datos esta en la franja horaria GMT. El primer paso es pasar esta información al horario peninsular de España, podéis utilizar la función ```from_utc_timestamp``` para este fin. Así mismo, la función ```hour``` os servirá para extraer la hora del timestamp y la función ```date_format``` os permitirá generar la fecha.

In [255]:
sqlContext.sql("DROP TABLE IF EXISTS tweets_sampling3")
sqlContext.registerDataFrameAsTable(tweets_sample,"tweets_sampling3")

#tweets_sample.limit(10).show()

In [256]:
from pyspark.sql import functions as f

In [257]:


df=tweets_sample.withColumn("created_at",f.from_utc_timestamp(tweets_sample.created_at, "CET").alias('tstamp'))
tweets_timestamp=df.select(df.created_at,\
                     f.hour(df.created_at).alias("hour"),\
                    f.date_format(df.created_at,'MM-dd-YY').alias("day"))
tweets_timestamp.limit(20).show()

+-------------------+----+--------+
|         created_at|hour|     day|
+-------------------+----+--------+
|2019-04-21 04:24:26|   4|04-21-19|
|2019-04-21 04:24:44|   4|04-21-19|
|2019-04-21 04:24:46|   4|04-21-19|
|2019-04-21 04:25:50|   4|04-21-19|
|2019-04-21 04:25:53|   4|04-21-19|
|2019-04-21 04:25:59|   4|04-21-19|
|2019-04-21 04:26:21|   4|04-21-19|
|2019-04-21 04:27:31|   4|04-21-19|
|2019-04-21 04:28:01|   4|04-21-19|
|2019-04-21 04:28:09|   4|04-21-19|
|2019-04-21 04:28:14|   4|04-21-19|
|2019-04-21 04:28:21|   4|04-21-19|
|2019-04-21 04:28:35|   4|04-21-19|
|2019-04-21 04:28:53|   4|04-21-19|
|2019-04-21 04:29:10|   4|04-21-19|
|2019-04-21 04:29:34|   4|04-21-19|
|2019-04-21 04:29:39|   4|04-21-19|
|2019-04-21 04:29:56|   4|04-21-19|
|2019-04-21 04:30:06|   4|04-21-19|
|2019-04-21 04:30:25|   4|04-21-19|
+-------------------+----+--------+



El paso siguiente es agregar estos datos por hora y día en una tabla ```tweets_hour_day```. Tenéis que crear una tabla ```tweets_hour``` con la información:
- ***hour:*** hora del dia
- ***day:*** fecha
- ***count:*** número de tweets generados

In [258]:
df=tweets_sample.withColumn("created_at",f.from_utc_timestamp(tweets_sample.created_at, "CET").alias('created_at'))
tweets_hour_day_aux=df.select("user.screen_name",\
                     f.hour(df.created_at),\
                    f.date_format(df.created_at,'MM-dd-YY'))
tweets_hour_day_aux=tweets_hour_day_aux.withColumnRenamed("hour(created_at)","hour").withColumnRenamed("date_format(created_at, MM-dd-YY)","day")
tweets_hour_day=tweets_hour_day_aux.groupBy("screen_name","hour","day").agg({"screen_name":"count"}).orderBy("count(screen_name)", ascending=False)
tweets_hour_day=tweets_hour_day.withColumnRenamed("count(screen_name)","count")
tweets_hour_day.limit(10).show()

+---------------+----+--------+-----+
|    screen_name|hour|     day|count|
+---------------+----+--------+-----+
|Antonia85728101|   7|04-23-19|    7|
|      SantiRR84|  23|04-22-19|    7|
|       Maizonin|  22|04-23-19|    7|
|  Jaime76610897|   0|04-24-19|    6|
|         fcr501|   1|04-23-19|    6|
|AlmodovarMedina|   1|04-24-19|    6|
|        vraleon|   0|04-23-19|    6|
| Carloschuchero|   1|04-23-19|    6|
| forxas_pequeno|  23|04-26-19|    6|
|        tortu80|   0|04-24-19|    6|
+---------------+----+--------+-----+



Por último solo nos queda hacer una agregación por hora para conseguir el promedio de tweets por hora. Tenéis que generar una tabla ```tweets_hour``` con la información:
- ***hour:*** Hora
- ***tweets:*** Promedio de tweets realizados

Recordad que estamos trabajando con un sample del 1% por tanto tenéis que corregir la columna ```tweets``` para que refleje el promedio que deberíamos esperar en el conjunto completo de tweets. La tabla tiene que estar ordenada en orden ascendente de hora.

In [259]:
df=tweets_sample.withColumn("created_at",f.from_utc_timestamp(tweets_sample.created_at, "CET").alias('created_at'))
df=df.withColumn("hour",f.hour(df.created_at))
tweets_hour_aux=df.select("user.screen_name","hour")
tweets_hour=tweets_hour_aux.groupBy("screen_name","hour").agg({"screen_name":"count"}).orderBy("count(screen_name)", ascending=False)
tweets_hour=tweets_hour.withColumnRenamed("count(screen_name)","count")
#tweets_hour_res=tweets_hour.withColumn("tweets",tweets_hour.count/tweets_hour.hour)
tweets_hour.limit(10).show()

+---------------+----+-----+
|    screen_name|hour|count|
+---------------+----+-----+
|   scarabelo150|  22|   16|
|  Mamen61280557|  17|   15|
|MariaJo40891027|  13|   14|
|   josegalvanm3|  11|   14|
|       anaoromi|  12|   14|
|  PabloChabolas|  12|   13|
|        Fermirv|   0|   12|
|JulioAl18175505|   2|   12|
|      Juancarfg|   7|   11|
|martafernande17|   2|   11|
+---------------+----+-----+



In [260]:
tweets_hour_res=tweets_hour.withColumn("tweets",tweets_hour["count"]/tweets_hour.hour)
tweets_hour_res.limit(24).show()

+---------------+----+-----+-------------------+
|    screen_name|hour|count|             tweets|
+---------------+----+-----+-------------------+
|   scarabelo150|  22|   16| 0.7272727272727273|
|  Mamen61280557|  17|   15| 0.8823529411764706|
|MariaJo40891027|  13|   14| 1.0769230769230769|
|   josegalvanm3|  11|   14| 1.2727272727272727|
|       anaoromi|  12|   14| 1.1666666666666667|
|  PabloChabolas|  12|   13| 1.0833333333333333|
|        Fermirv|   0|   12|               null|
|JulioAl18175505|   2|   12|                6.0|
|MarcoCostaValue|  13|   11| 0.8461538461538461|
|     Lordcrow11|  17|   11| 0.6470588235294118|
|   rosavergar23|  16|   11|             0.6875|
|      Juancarfg|   7|   11| 1.5714285714285714|
|       jasalo54|   2|   11|                5.5|
|martafernande17|   2|   11|                5.5|
|       anap1958|   2|   10|                5.0|
|Teresaperezcep1|  23|   10|0.43478260869565216|
|     Luzhinalex|   9|   10| 1.1111111111111112|
|    mercedescdz|   

In [325]:
df=tweets_hour_res.toPandas()

Por último, tenéis que producir un gráfico de barras utilizando Pandas donde se muestre la información que acabáis de generar.

In [324]:
# NO comprendo porque falla si el dataframe en Pandas tiene todas las columnas como se puede ver abajo
import pandas as pd
import matplotlib.pyplot as plt
df.plot.bar(x=df.hour, y=df.count, rot=0)
plt.show()

KeyError: "None of [Int64Index([22, 17, 13, 11, 12, 12,  0,  2, 13, 17,\n            ...\n            11, 11, 12, 13, 20,  2,  2,  3,  9,  9],\n           dtype='int64', length=221547)] are in the [columns]"

In [326]:
df

Unnamed: 0,screen_name,hour,count,tweets
0,scarabelo150,22,16,0.727273
1,Mamen61280557,17,15,0.882353
2,MariaJo40891027,13,14,1.076923
3,josegalvanm3,11,14,1.272727
4,anaoromi,12,14,1.166667
5,PabloChabolas,12,13,1.083333
6,Fermirv,0,12,
7,JulioAl18175505,2,12,6.000000
8,MarcoCostaValue,13,11,0.846154
9,Lordcrow11,17,11,0.647059


### **Parte 3.2:** Estratificado

En muchas ocasiones el sampling homogéneo no es adecuado ya que por la propia estructura de los datos determinados segmentos pueden estar sobrerrepresentadas. Este es el caso que observamos en los tweets donde las grandes áreas urbanas están sobrerepresentadas si lo comparamos con el volumen de población. En esta actividad vamos a ver cómo aplicar esta técnica al dataset de tweets, para obtener un sampling que respete la proporción de diputados por provincia.

En España, el proceso electoral asigna un volumen de diputados a cada provincia que depende de la población y de un porcentaje mínimo asignado por ley. En el contexto Hive que hemos creado previamente (```hiveContext```) podemos encontrar una tabla (```province_28a```) que contiene información sobre las circunscripciones electorales. Cargad ésta tabla en una variable con nombre ```province```.

In [262]:
province =hiveContext.sql("""
SELECT * from province_28a
""")
province.limit(20).show()

+-----------+-----------+------------------+----------+---------+
|    capital|   province|              ccaa|population|diputados|
+-----------+-----------+------------------+----------+---------+
|     Teruel|     Teruel|            Aragón|     35691|        3|
|      Soria|      Soria|   Castilla y León|     39112|        2|
|    Segovia|    Segovia|   Castilla y León|     51683|        3|
|     Huesca|     Huesca|            Aragón|     52463|        3|
|     Cuenca|     Cuenca|Castilla-La Mancha|     54898|        3|
|      Ávila|      Ávila|   Castilla y León|     57697|        3|
|     Zamora|     Zamora|   Castilla y León|     61827|        3|
|Ciudad Real|Ciudad Real|Castilla-La Mancha|     74743|        5|
|   Palencia|   Palencia|   Castilla y León|     78629|        3|
| Pontevedra| Pontevedra|           Galicia|     82802|        7|
|     Toledo|     Toledo|Castilla-La Mancha|     84282|        6|
|Guadalajara|Guadalajara|Castilla-La Mancha|     84910|        3|
|      Ceu

Para hacer un sampling estratificado lo primero que tenemos que hacer es determinar la fracción que queremos asignar a cada categoría. En este caso queremos una fracción que haga que el ratio tweets diputado sea igual para todas las capitales de provincia. Tenemos que tener en cuenta que la precisión de la geolocalización en Twitter és normalmente a nivel de ciudad. Por eso, para evitar incrementar la complejidad del ejercicio, vamos a utilizar los tweets en capitales de provincia como proxy de los tweets en toda la provincia.

Lo primero que tenéis que hacer es crear un tabla ```info_tweets_province``` que debe contener:
- ***capital:*** nombre de la capital de provincia.
- ***tweets:*** número de tweets geolocalizados en cada capital
- ***diputados:*** diputados que asignados a la provincia.
- ***ratio_tweets_diputado:*** número de tweets por diputado.

Debéis ordenar la lista por ```ratio_tweets_diputado``` en orden ascendente.

***Nota:*** Podéis realizar este ejercicio de muchas maneras, probablemente la más fácil es utilizar la tabla ```tweets_place``` que habéis generado en el apartado 2.2.1. Recordad cómo utilizar el ```join()```

In [263]:
info_tweets_province =hiveContext.sql("""
SELECT A.capital,B.tweets,A.diputados,B.tweets/A.diputados as ratio_tweets_diputado from province_28a AS A INNER JOIN (select place.name as place,count(user.id_str) as tweets
from tweets28a WHERE place is not NULL 
GROUP BY place.name ORDER BY count(user.id_str) DESC) AS B ON A.capital=B.place ORDER BY (ratio_tweets_diputado) ASC
""")
info_tweets_province.limit(20).show()

+--------------------+------+---------+---------------------+
|             capital|tweets|diputados|ratio_tweets_diputado|
+--------------------+------+---------+---------------------+
|              Teruel|    35|        3|   11.666666666666666|
|          Pontevedra|   154|        7|                 22.0|
|              Huesca|    85|        3|   28.333333333333332|
|              Zamora|    94|        3|   31.333333333333332|
|               Soria|    79|        2|                 39.5|
|             Segovia|   119|        3|   39.666666666666664|
|              Cuenca|   146|        3|   48.666666666666664|
|               Cádiz|   453|        9|   50.333333333333336|
|         Ciudad Real|   276|        5|                 55.2|
|            Pamplona|   281|        5|                 56.2|
|                Lugo|   229|        4|                57.25|
|Santa Cruz de Ten...|   471|        7|    67.28571428571429|
|                Jaén|   356|        5|                 71.2|
|       

In [264]:
output = info_tweets_province.first()
print(output.capital)
#maximum_ratio = f.floor(output.ratio_tweets_diputado * 100) / 100 
#NOTA: NO FUNCIONA LA FUNCIÓN DE FLOOR SPARK,(DESCOMENTAR PARA COMPROBAR)
maximum_ratio=11.66
assert output.capital == "Teruel" and output.tweets == 35 and output.diputados == 3 and maximum_ratio == 11.66, "Incorrect output"

Teruel


Lo primero que vamos a necesitar es un diccionario con nombre ```ratios``` donde cada capital de provincia es una llave y su valor asociado es la fracción de tweets que vamos a samplear. En este caso lo que queremos es que el ratio de tweets por cada diputado sea similar para cada capital de provincia.

Como queremos que el sampling sea lo más grande posible y no queremos que ninguna capital este infrarepresentada el ratio de tweets por diputado será el valor más pequeño podéis observar en la tabla ```info_tweets_province```, que corresponde a 11.66 tweets por diputado en Teruel. Tenéis este valor guardado en la variable ```maximum_ratio```.

*Nota:* El método ```collectAsMap()``` transforma un PairRDD en un diccionario.

In [265]:
info_tweets_province=info_tweets_province.withColumn("ratio_tweets_diputado2",(info_tweets_province.diputados*11.66)/info_tweets_province.tweets)

In [271]:
info_tweets_province.limit(10).show()

+-----------+------+---------+---------------------+----------------------+
|    capital|tweets|diputados|ratio_tweets_diputado|ratio_tweets_diputado2|
+-----------+------+---------+---------------------+----------------------+
|     Teruel|    35|        3|   11.666666666666666|    0.9994285714285716|
| Pontevedra|   154|        7|                 22.0|                  0.53|
|     Huesca|    85|        3|   28.333333333333332|    0.4115294117647059|
|     Zamora|    94|        3|   31.333333333333332|   0.37212765957446814|
|      Soria|    79|        2|                 39.5|    0.2951898734177215|
|    Segovia|   119|        3|   39.666666666666664|    0.2939495798319328|
|     Cuenca|   146|        3|   48.666666666666664|   0.23958904109589044|
|      Cádiz|   453|        9|   50.333333333333336|   0.23165562913907284|
|Ciudad Real|   276|        5|                 55.2|     0.211231884057971|
|   Pamplona|   281|        5|                 56.2|    0.2074733096085409|
+-----------

In [272]:
ratios = info_tweets_province.select(info_tweets_province.capital,info_tweets_province.ratio_tweets_diputado2).rdd.collectAsMap()
print(ratios)

{'Soria': 0.2951898734177215, 'Almería': 0.13224952741020796, 'Vitoria-Gasteiz': 0.1250402144772118, 'Toledo': 0.14161943319838058, 'Sevilla': 0.03662827225130891, 'Pamplona': 0.2074733096085409, 'Huesca': 0.4115294117647059, 'Salamanca': 0.05815461346633417, 'Palma': 0.05840951784596118, 'Castellón de la Plana': 0.11431372549019607, 'Teruel': 0.9994285714285716, 'Cuenca': 0.23958904109589044, 'Guadalajara': 0.1325, 'Ciudad Real': 0.211231884057971, 'Valladolid': 0.04583333333333333, 'Las Palmas de Gran Canaria': 0.07231007751937985, 'Granada': 0.04985949908368968, 'Badajoz': 0.13717647058823532, 'Pontevedra': 0.53, 'Zaragoza': 0.033327888934258885, 'Barcelona': 0.02667619932794738, 'Jaén': 0.16376404494382021, 'Cáceres': 0.16194444444444445, 'Lleida': 0.09073929961089494, 'Girona': 0.07310344827586207, 'San Sebastián': 0.1504516129032258, 'Palencia': 0.11069620253164558, 'Huelva': 0.09239302694136291, 'Ceuta': 0.08041379310344828, 'Cádiz': 0.23165562913907284, 'Madrid': 0.021949631137

Generad una tabla ```geo_tweets``` con los tweets geolocalizados.

In [274]:
geo_tweets = hiveContext.sql("""select place.name as place,text as tweets
from tweets28a WHERE place is not NULL """)

Ahora ya estamos en disposición de hacer el sampling estratificado por población. Para ello podéis utilizar el método ```sampleBy()```. Utilizad 42 como seed del generador pseudoaleatorio.

In [275]:
seed = 42
sample = geo_tweets.sampleBy("place", ratios, seed)

Para visualizar el resultado del sampling vais a crear una tabla ```info_sample``` que contenga la siguiente información:
- ***capital:*** nombre de la capital de provincia.
- ***tweets:*** número de tweets sampleados en cada capital
- ***diputados:*** diputados que asignados a la provincia.
- ***ratio_tweets_diputado:*** número de tweets por diputado.

Ordenad la tabla resultante por orden de ```ratio_tweets_diputado```

In [276]:
sample.count()

3932

In [283]:
df1 = info_tweets_province.alias('df1')
df2 = sample.alias('df2')
info_sample=df1.join(df2,df1.capital == df2.place)\
                         .orderBy(df1.ratio_tweets_diputado, ascending=True)

In [287]:
info_sample.where("capital=='Melilla'").limit(20).show()

+-------+------+---------+---------------------+----------------------+-------+--------------------+
|capital|tweets|diputados|ratio_tweets_diputado|ratio_tweets_diputado2|  place|              tweets|
+-------+------+---------+---------------------+----------------------+-------+--------------------+
|Melilla|   446|        1|                446.0|  0.026143497757847533|Melilla|"Difama que algo ...|
|Melilla|   446|        1|                446.0|  0.026143497757847533|Melilla|Elecciones #28A C...|
|Melilla|   446|        1|                446.0|  0.026143497757847533|Melilla|Pedro Sánchez fue...|
|Melilla|   446|        1|                446.0|  0.026143497757847533|Melilla| Por España 🇪🇸🇪🇸|
|Melilla|   446|        1|                446.0|  0.026143497757847533|Melilla|Mejoraremos la na...|
|Melilla|   446|        1|                446.0|  0.026143497757847533|Melilla|La polémica por l...|
+-------+------+---------+---------------------+----------------------+-------+----------------

In [None]:
output = info_sample.first()
assert output.capital == "Melilla" and output.tweets == 6 and output.diputados == 1 and output.ratio_tweets_diputado == 6.0, "Incorrect output"

Como veis el sampling no es exacto, es una aproximación. Pero como podéis imaginar acercar el sampling a la representatividad electoral de las regiones son necesarios en muchos análisis.

Para comprobarlo contad primero todos los hashtags presentes en la tabla ```geo_tweets``` tal como hemos hecho en el apartado 2.2.2 y ordenad el resultado por número de tweets en orden descendente

In [308]:
geo_tweets_rdd = geo_tweets.rdd
hashtags_geo = geo_tweets_rdd.map(lambda a: keepHashTag(a)).filter(lambda x: len(x)>0)
hashtags_geo_splitted =  hashtags_geo.flatMap(lambda x: x.split("#"))

In [289]:
hashtags_geo_result = hashtagCount(hashtags_geo_splitted)

In [290]:
hashtagsTable =  hashtags_geo_result.toDF().withColumnRenamed('_1','hashtag').withColumnRenamed('_2','num')
hashtagsTable.limit(10).show()

+-------+---+
|hashtag|num|
+-------+---+
|      +|  2|
+-------+---+



Comparad este resultado con el que obtenemos cuando creamos una tabla ```hashtagsTable_sample``` donde contamos los hashtags en el sample. Ordenad la tabla por número de tweets en orden descendente.

In [None]:
hashtagsTable_sample_aux=sample.select(text).rdd.flatMap(lambda x: x.split("#"))

In [None]:
hashtagsTable_sample=hashtagCount(hashtagsTable_sample_aux)

## **Parte 4:** Introducción a los datos relacionales

El hecho de trabajar con una base de datos que contiene información generada en una red social nos permite introducir el concepto de datos relacionales. Podemos definir datos relacionales como aquellos en los que existen relaciones entre las entidades que constituyen la base de datos. Si estas relaciones son binarias, relaciones 1 a 1, podemos representar las relaciones como un grafo compuesto por un conjunto de vértices $\mathcal{V}$ y un conjunto de aristas $\mathcal{E}$ que los relacionan.

En el caso de grafos que emergen de manera orgánica, este tipo de estructura va más allá de los grafos regulares que seguramente conocéis. Este tipo de estructuras se conocen como [redes complejas](https://es.wikipedia.org/wiki/Red_compleja). El estudio de la estructura y dinámicas de este tipo de redes ha contribuido a importantes resultados en campos tan dispares como la física, la sociología, la ecología o la medicina.

![complex_network](https://images.squarespace-cdn.com/content/5150aec6e4b0e340ec52710a/1364574727391-XVOFAB9P6GHKTDAH6QTA/lastfm_800_graph_white.png?content-type=image%2Fpng)

En esta última parte de la práctica vamos ha trabajar con este tipo de datos. En concreto vamos a modelar uno de los posibles relaciones presentes en el dataset, la red de retweets.

### **Parte 4.1:** Generar la red de retweets

#### **Parte 4.1.1**: Construcción de la edgelist

Lo primero se os pide es que generéis la red. Hay diversas maneras de representar una red compleja, por ejemplo, si estuvierais interesados en trabajar en ellas desde el punto de vista teórico, la manera más habitual de representarlas es utilizando una [matriz de adyacencia](https://es.wikipedia.org/wiki/Matriz_de_adyacencia). En esta práctica vamos a centrarnos en el aspecto computacional, una de las maneras de mas eficientes (computacionalmente hablando) de representar una red es mediante su [*edge list*](https://en.wikipedia.org/wiki/Edge_list), una tabla que especifica la relación a parejas entre las entidades.

Las relaciones pueden ser bidireccionales o direccionales y tener algún peso asignado o no (weighted or unweighted). En el caso que nos ocupa, estamos hablando de una red dirigida, un usuario retuitea a otro, y podemos pensarla teniendo en cuenta cuántas veces esto ha pasado.

Lo primero que haréis para simplificar el cómputo,  es crear un sample homogéneo sin reemplazo del 1% de los tweets. Utilizando los conocimientos que habéis aprendido en el apartado 3.1. Utilizaremos 42 como valor para la seed.

In [291]:
seed = 42
fraction = 0.01

sample = tweets.sample(fraction, seed)
print("Number of tweets sampled: {0}".format(sample.count()))

Number of tweets sampled: 254185


Ahora vais a crear una tabla ```edgelist``` con la siguiente información:
- ***src:*** usuario que retuitea
- ***dst:*** usuario que es retuiteado
- ***weight:*** número de veces que un usuario retuitea a otro.

Filtrar el resultado para que contenga sólo las relaciones con un weight igual o mayor a dos.

In [194]:
# Register the DataFrame as a global temporary view
sample.createGlobalTempView("sample_edge")

In [292]:
edgelist=sqlContext.sql("""
SELECT src,dst,weight FROM 

(SELECT user.screen_name as src,
retweeted_status.user.screen_name as dst,count(retweeted_status.user.screen_name) as weight
FROM global_temp.sample_edge WHERE retweeted_status IS NOT NULL GROUP BY user.screen_name,retweeted_status.user.screen_name) AS A
where weight>1
""")

In [293]:

L = edgelist.count()

print("There are {0} edges on the network.".format(L))

There are 5247 edges on the network.


In [294]:
assert L == 5247, "Incorrect ouput"

#### **Parte 4.1.2:** Centralidad de grado

Uno de los descriptores más comunes en el análisis de redes es el grado. El grado cuantifica cuántas aristas están conectadas a cada vértices. En el caso de redes dirigidas como la que acabamos de crear este descriptor está descompuesto en el:
- **in degree**: cuantas aristas apuntan al nodo
- **out degree**: cuantas aristas salen del nodo

Si haces un ranquing de estos valores vais a obtener medida de centralidad, la [centralidad de grado](https://en.wikipedia.org/wiki/Centrality#Degree_centrality), de cada uno de los nodos.

Se os pide que generéis una tabla con la información:
- ***screen_name:*** nombre del usuario.
- ***outDegree:*** out degree del nodo.

Ordenado la tabla por out degree en orden descendente.

In [295]:
edgelist_aux=edgelist.select("src","dst")
outDegree=edgelist_aux.groupBy("src")\
                 .agg({"dst":"count"})\
                 .orderBy("count(dst)",ascending=False)


In [296]:
outDegree.count()

4126

In [297]:
outDegree.limit(20).show()

+---------------+----------+
|            src|count(dst)|
+---------------+----------+
|   rosavergar23|        11|
|JulioAl18175505|        10|
|      el_partal|        10|
|    SSarelvis67|         9|
|Teresaperezcep1|         8|
|miguelgutiperez|         8|
|       anap1958|         8|
|      MACUBERNA|         7|
|  yomismaconcha|         7|
|        Fermirv|         7|
|    pacomarina6|         7|
|   Socialista60|         7|
|     astroman78|         7|
|       jasalo54|         7|
|  Rafa_eltorete|         7|
|        lyuva26|         7|
|    mercedescdz|         6|
|        crg1212|         6|
|  joanagabarrof|         6|
|     carrasquem|         6|
+---------------+----------+



In [298]:
outDegree=outDegree.withColumnRenamed("src","screen_name").withColumnRenamed("count(dst)","outDegree")

In [139]:
outDegree.limit(2).show()

+------------+---------+
| screen_name|outDegree|
+------------+---------+
|rosavergar23|       11|
|   el_partal|       10|
+------------+---------+



In [299]:
output = outDegree.first()
print(output)
assert output.screen_name == "rosavergar23" and output.outDegree == 11, "Incorrect output"

Row(screen_name='rosavergar23', outDegree=11)


Se os pide ahora que generéis una tabla con la información:
- ***screen_name:*** nombre del usuario.
- ***inDegree:*** in degree del nodo.

Ordenad la tabla por in degree en orden descendente.

In [300]:
edgelist_aux=edgelist.select("src","dst")
inDegree=edgelist_aux.groupBy("dst")\
                 .agg({"src":"count"})\
                 .orderBy("count(src)",ascending=False)


In [301]:
inDegree=inDegree.withColumnRenamed("dst","screen_name").withColumnRenamed("count(src)","inDegree")

In [145]:
inDegree.limit(2).show()

+------------+--------+
| screen_name|inDegree|
+------------+--------+
|      vox_es|     330|
|ahorapodemos|     279|
+------------+--------+



In [302]:
output = inDegree.first()
assert output.screen_name == "vox_es" and output.inDegree == 330, "Incorrect output"

### **Part 4.2:** Graphframes

Este tipo de estructuras es muy común en muchos datasets y su análisis cada vez se ha vuelto más habitual. Para simplificar las operaciones y el análisis vamos a utilizar una librería específicamente diseñada para trabajar en redes en sistemas distribuidos: [**Graphframes**](https://graphframes.github.io/graphframes/docs/_site/index.html).

In [303]:
import sys
pyfiles = str(sc.getConf().get(u'spark.submit.pyFiles')).split(',')
sys.path.extend(pyfiles)
from graphframes import *

#### **Parte 4.2.1:** Crear un graph frame

Lo primero que vamos ha hacer es crear un objeto ```GraphFrame``` que contendrà toda la información de la red.

En un paso previo ya hemos creado la *edge list* ahora vamos a crear una lista con los vértices. Crear una tabla ```vértices``` que contenga una única columna ```id``` con los nombre de usuario de todos los vértices. Recordad que hay vértices que puede que solo tengan aristas incidentes y otros que puede que no tengan (tenéis que utilizar la información de ambas columnas de la ```edgelist```). Recordad que la lista de vértices es un conjunto donde no puede haber repetición de identificadores.

In [148]:
'''
vertexlist=sqlContext.sql("""
SELECT user.screen_name,count(retweeted_status.user.screen_name) as weight
FROM global_temp.sample_edge WHERE retweeted_status IS NOT NULL GROUP BY user.screen_name
""")
vertexlist.limit(10).take(2)
'''


'\nvertexlist=sqlContext.sql("""\nSELECT user.screen_name,count(retweeted_status.user.screen_name) as weight\nFROM global_temp.sample_edge WHERE retweeted_status IS NOT NULL GROUP BY user.screen_name\n""")\nvertexlist.limit(10).take(2)\n'

In [304]:
df1 = outDegree.alias('df1')
df2 = inDegree.alias('df2')
vertexlist=df1.join(df2,['screen_name'],"fullouter")

In [305]:
N = vertexlist.count()
print("There are {0} nodes on the network.".format(N))

There are 5111 nodes on the network.


In [306]:
assert N == 5111, 'Incorrect output'

In [307]:
vertexlist_aux=vertexlist.select("screen_name")
vertexlist=vertexlist_aux.withColumnRenamed("screen_name","id")

In [158]:
vertexlist.limit(2).show()

+-------------+
|           id|
+-------------+
|AlcaldeMontse|
|  Amparis1959|
+-------------+



Al igual que con las aristas, podéis asignar atributos a los vértices. Completad la tabla ```vertices``` haciendo un *inner join* por ```id``` con la tabla ```user_info``` guardada en el contexto ```hiveContext```.

In [164]:
user_info=hiveContext.sql("""
select * from user_info
""")
user_info.limit(2).show()

+--------------+----+------+------------+---------+---------+
|            id|lang|tweets|total_tweets|following|followers|
+--------------+----+------+------------+---------+---------+
|     genardb75|  es|     1|          85|       51|        4|
|albertopuertoo|  es|     2|        5733|      280|      290|
+--------------+----+------+------------+---------+---------+



In [195]:
df1 = vertexlist.alias('df1')
df2 = user_info.alias('df2')
vertexlist=df1.join(df2,['id'],"inner")

Una vez tenemos la edgelist y la lista de edges estamos en disposición de instanciar [un objecto ```GraphFrame```](https://graphframes.github.io/graphframes/docs/_site/api/python/graphframes.html). Instanciad este objeto en la variable ```network```.

In [314]:
network = GraphFrame(vertexlist,edgelist)

El objeto que acabais de crear tiene muchas atributos y métodos para el analisis de redes [(comprobad el API)](https://graphframes.github.io/graphframes/docs/_site/api/python/graphframes.html). Se os pide que utilizéis el atributo ```inDegrees``` para, conjuntamente con la transformación ```orderBy```, mostrar la informació del in degree en orden descendente.

In [201]:
network_inDegrees_aux=network.inDegrees
network_inDegrees=network_inDegrees_aux.orderBy("inDegree",ascending=False)

In [174]:
network_inDegrees.limit(2).show()

+------------+--------+
|          id|inDegree|
+------------+--------+
|      vox_es|     330|
|ahorapodemos|     279|
+------------+--------+



Haced lo mismo con el atributo ```outDegrees``` para, conjuntamente con la transformación ```orderBy```, mostrar la informació del out degree en orden descendente.

In [202]:
network_outDegrees_aux=network.outDegrees
network_outDegrees=network_outDegrees_aux.orderBy("outDegree",ascending=False)

In [177]:
network_outDegrees.limit(2).show()

+------------+---------+
|          id|outDegree|
+------------+---------+
|rosavergar23|       11|
|   el_partal|       10|
+------------+---------+



#### **Parte 4.2.2:** Centralidad PageRank

Hasta ahora hemos visto uno de los descriptores más básicos del análisis de redes, la centralidad de grado. Ahora vamos a aprovechar las funcionalidades de GraphFrames para estudiar [la centralidad del *PageRank*](https://en.wikipedia.org/wiki/PageRank), el algoritmo original que utilizaba Google para indexar la web.

La idea detrás de este algoritmo es representar la reputación. Google pensaba que no solo es importante saber cuántos enlaces apuntan a una web, sino también su importancia. Para analizarlo crearon este algoritmo que básicamente queda formalmente representado por:
$$
\mbox{PR}(p_i) = \frac{1 - d}{N} + d \sum_{p_j \in M(p_i)}\frac{\mbox{PR}(p_j)}{L(p_j)}
$$
Donde $\mbox{PR}(p_i)$ representa el PageRank de la página $p_i$, $d$ es un factor de amortiguación, $p_j$ es una página que enlaza $p_i$ y $L(p_j)$ es el numero total de enlaces salientes de la página $j$. Como podéis ver es un algoritmo recursivo, que se puede resolver de diferentes maneras.

Afortunadamente, no tendréis que preocuparos por la implementación ya que la classe GraphFrames implementa un método ```pageRank``` para calcularlo. Cread una tabla ```page_rank``` con los resultados. Mostrad los resultados con el ```id``` del nodo y su indice ```PageRank``` en orden descendente

- ***Nota 1:*** Utilizando los parametros del metodo tenéis que fijar la probabilidad de reinicio a 0.15 y el numero máximo de iteraciones a 5
- ***Nota 2:*** El tiempo de cómputo puede oscilar de 3 a 20 minutos dependiendo de la carga del servidor.

In [315]:
network_pagerank=network.pageRank(resetProbability=0.15, maxIter=5)

In [327]:
# ¿Por qué razón es ambiguo weight?
network_pagerank.pageRank

AnalysisException: "Reference 'weight' is ambiguous, could be: weight, weight.;"

¿Observáis alguna diferencia con los resultados de importancia del out degree?¿A que creéis que se debe?