Before you turn this problem in, make sure everything runs as expected. First, **restart the kernel** (in the menubar, select Kernel$\rightarrow$Restart) and then **run all cells** (in the menubar, select Cell$\rightarrow$Run All).

Make sure you fill in any place that says `YOUR CODE HERE` or "YOUR ANSWER HERE", as well as your name and collaborators below:

In [None]:
NAME = "GUILLEM MATA VALLIGNY"
COLLABORATORS = ""

---

# ![Logotip de Spark](http://spark-mooc.github.io/web-assets/images/ta_Spark-logo-small.png)

# Activitat BATCH

## Sistema de fitxers HDFS i extracció de coneixement de fonts de dades heterogènies mitjançant RDDs

En aquesta pràctica començarem amb una breu introducció a HDFS (Hadoop Distributed File System), per entendre com s'emmagatzema i distribueix la informació. Després, ens endinsarem en Spark RDDs i  Spark SQL per processar grans volums de dades de manera eficient. Per finalitzar, treballarem amb dades relacionals i la seva gestió en entorns distribuïts.

### Puntuació de l'activitat:
- **Exercici 1**: Gestió i anàlisi de dades en HDFS *(0.5 punts)*
- **Exercici 2**: Manipulació de RDDs en PySpark *(1.25 punts)*
- **Exercici 3**: Anàlisi de Dades de Tweets en PySpark *(1.25 punts)*
- **Exercici 4**: Optimització de Càlculs amb Persistència *(0.25 punts)*
- **Exercici 5**: Anàlisi de Tweets mitjançant DataFrames i consultes SQL *(2 punts)*
- **Exercici 6**: Anàlisi de Tweets Geolocalitzats *(1.5 punts)*
- **Exercici 7**: Anàlisi del Patró d'Activitat Horària a Twitter *(1 punts)*
- **Exercici 8**: Anàlisi de la Relació entre Tweets i Diputats per Província *(0.75 punts)*
- **Exercici 9**: Anàlisi d'Interaccions de Retweets i Graus d'Usuari *(0.75 punts)*
- **Exercici 10**: Distribució del Grau de Sortida en una Xarxa de Retweets *(0.75 punts)*

# **HDFS (Hadoop Distributed File System)**

<img src="https://hadoop.apache.org/docs/r1.2.1/images/hadoop-logo.jpg">

**HDFS (Hadoop Distributed File System)** és una part essencial de l'ecosistema Big Data d'Apache Hadoop. HDFS està dissenyat per emmagatzemar i gestionar grans volums de dades distribuïdes en diversos nodes d'un clúster, proporcionant alta tolerància a fallades i escalabilitat. En aquest primer exercici, interactuarem amb HDFS mitjançant la línia de comandes dins de l'entorn de **JupyterLab**, el que ens permetrà familiaritzar-nos amb les operacions bàsiques d'aquest sistema de fitxers distribuït.

Per començar, és necessari obrir un terminal des de **JupyterLab**. Un cop obert, podem enviar comandes al sistema de fitxers HDFS, que són molt similars a les comandes de bash en entorns Linux. Algunes de les comandes d'HDFS que executarem començaran amb `hdfs dfs`, seguides de l'operació que desitgem realitzar. Per exemple, si volem llistar els fitxers i directoris en el directori arrel d'HDFS, utilitzarem la comanda ls de la següent manera:

In [1]:
!hdfs dfs -ls /

Found 10 items
drwxr-xr-x   - asolerib   supergroup          0 2020-01-16 22:12 /CFCC
drwxrwxr-x   - egilbl     students            0 2024-10-01 17:24 /aula_B0.476
drwxr-xr-x   - asolerib   supergroup          0 2019-10-28 11:09 /aula_B2.585
drwxr-xr-x   - asolerib   supergroup          0 2019-09-25 22:10 /aula_M2.858
drwxr-xr-x   - hbase      hbase               0 2024-10-22 12:25 /hbase
drwxr-xr-x   - aperezgari supergroup          0 2021-08-05 12:58 /home
drwxr-xr-x   - joant      supergroup          0 2023-12-10 20:28 /path
drwxrwxr-x   - solr       solr                0 2019-07-23 15:49 /solr
drwxrwxrwt   - hdfs       supergroup          0 2024-10-20 10:04 /tmp
drwxrwxr-x   - hdfs       supergroup          0 2024-10-22 12:47 /user


És important que totes les comandes s'executin correctament en l'entorn **JupyterLab** per obtenir els resultats desitjats.

Per consultar la documentació completa de les comandes disponibles en HDFS, pots accedir a la guia oficial en el següent enllaç: [HDFS Command Guide](https://hadoop.apache.org/docs/current/hadoop-project-dist/hadoop-hdfs/HDFSCommands.html)

Al llarg d'aquest exercici, utilitzarem algunes de les comandes més comunes d'HDFS per realitzar operacions com la creació de directoris, la càrrega i descàrrega d'arxius, i la gestió de permisos, entre altres. A mesura que avancem, et familiaritzaràs amb l'estructura d'HDFS i com aprofitar les seves funcionalitats en entorns Big Data.

### **Exercici 1**: Gestió i anàlisi de dades en HDFS (*0.5 punts*)

En aquest exercici, treballaràs amb un fitxer de dades de vendes anomenat `ventas_globales.txt`, emmagatzemat a la següent ruta `/aula_M2.858/data/ventas_globales.txt`. El teu objectiu és realitzar una sèrie de tasques d'anàlisi, gestió i verificació d'integritat de les dades. A continuació es detallen les tasques a realitzar.

- Necessites obtenir informació sobre la mida del fitxer `ventas_globales.txt`, i també hauràs de realitzar una anàlisi preliminar del contingut del fitxer sense descarregar-lo completament.

- A continuació, hauràs de descarregar una còpia del fitxer `ventas_globales.txt` al teu sistema local. És important verificar que el fitxer en HDFS no presenta problemes d'integritat ni blocs corruptes. Assegura't de comprovar l'estat del fitxer. Un cop hagis verificat que el fitxer està correcte i no presenta problemes d'integritat, hauràs de tornar a pujar-lo dins de la carpeta `procesado` que has de crear a la ruta `/user/[el_teu_usuari]/`.

In [13]:
#  obtenir informació sobre la mida del fitxer
!hadoop fs -du -s /aula_M2.858/data/ventas_globales.txt
print("\n") 

#  realitzar una anàlisi preliminar del contingut del fitxer 
!hadoop fs -text /aula_M2.858/data/ventas_globales.txt | head -n 20
print("\n") 

#  descarregar una còpia del fitxer ventas_globales.txt al teu sistema local
!hadoop fs -get /aula_M2.858/data/ventas_globales.txt .

#verificar que el fitxer en HDFS no presenta problemes d'integritat ni blocs corruptes
!hdfs fsck /aula_M2.858/data/ventas_globales.txt


# tornar a pujar-lo dins de la carpeta 'procesado'
!hadoop fs -put ventas_globales.txt /user/gmatav/procesado



1330  3990  /aula_M2.858/data/ventas_globales.txt


ID_Venta;Producto;Cantidad;Precio
1;Smartphone;5;300.00
2;Laptop;3;800.00
3;Tablet;2;250.00
4;Auriculares;7;50.00
5;Reloj inteligente;1;150.00
6;Televisor;4;600.00
7;Cámara;8;400.00
8;Altavoz Bluetooth;6;75.00
9;Monitor;2;200.00
10;Impresora;10;120.00
11;Smartphone;4;300.00
12;Laptop;8;800.00
13;Tablet;6;250.00
14;Auriculares;3;50.00
15;Reloj inteligente;2;150.00
16;Televisor;5;600.00
17;Cámara;7;400.00
18;Altavoz Bluetooth;1;75.00
19;Monitor;3;200.00


get: `ventas_globales.txt': File exists
Connecting to namenode via http://eimtcld.uoc.edu:9870/fsck?ugi=gmatav&path=%2Faula_M2.858%2Fdata%2Fventas_globales.txt
FSCK started by gmatav (auth:SIMPLE) from /213.73.35.119 for path /aula_M2.858/data/ventas_globales.txt at Wed Oct 23 19:53:53 CEST 2024

Status: HEALTHY
 Number of data-nodes:	3
 Number of racks:		1
 Total dirs:			0
 Total symlinks:		0

Replicated Blocks:
 Total size:	1330 B
 Total files:	1
 Total blocks (validated):	1 (avg. bl

# **Apache Spark RDDs (Resilient Distributed Datasets)**

En el marc del processament de grans volums de dades amb Apache Spark, els RDDs, o Resilient Distributed Datasets, juguen un paper fonamental. Un RDD és una col·lecció d'elements que es distribueixen a través d'un clúster de nodes i sobre la qual es poden aplicar operacions que s'executen en paral·lel.

Recordem les seves característiques:

- Immutabilitat: Un cop es crea un RDD, no es pot modificar. En lloc d'això, qualsevol operació que modifiqui les dades generarà un nou RDD.

- Distribució: Els RDDs estan repartits entre els diferents nodes del clúster, permetent un processament paral·lel eficient.

- Tolerància a Fallades: Els RDDs són resistents a fallades. En cas que un node falli, Spark pot reconstruir les dades perdudes a partir de les dades originals i les operacions realitzades.

Aquesta estructura permet un processament eficient i escalable de dades, cosa que és essencial per treballar amb grans volums d'informació en entorns de clúster.

A continuació es mostra el codi que heu d'executar per configurar el vostre entorn de Spark.

> Com a referència a tots els mètodes que es requereixen per implementar aquesta pràctica podeu consultar:
> * [API Python de Spark](https://archive.apache.org/dist/spark/docs/2.4.0/api/python/index.html)

### Configuració de l'entorn python + spark

In [1]:
import findspark
findspark.init()

from pyspark import SparkConf, SparkContext

conf = SparkConf()
conf.setMaster("local[1]")

<pyspark.conf.SparkConf at 0x7fc1c60ceb38>

In [2]:
# Introduïu el nom de l'app ActivitatRDDs_ seguit del vostre nom d'usuari
conf.setAppName("ActivitatRDDs_gmatav")
sc = SparkContext.getOrCreate(conf=conf)
sc.version

'2.4.0-cdh6.2.0'

### **Exercici 2**: Manipulació de RDDs en PySpark (*1.25 punts*)

En aquest exercici, et proporcionem dues llistes de números en les quals realitzaràs diverses operacions sobre elles utilitzant RDDs en PySpark. La solució i l'enfocament queden al teu criteri.
Context:

Tens dos conjunts de números:
- Conjunt 1: Números de l'1 al 20.
- Conjunt 2: Números del 10 al 30.

Descripció:

- Has de crear RDDs a partir d'aquests conjunts de números.

- Has de transformar el primer RDD per associar cada número amb el seu quadrat. Després, filtra les parelles en què el quadrat és superior a 50. A aquest RDD filtrat l'has d'anomenar `filtrados_rdd`.

- Agrupa les dades filtrades en funció de si els números originals són parells o senars. Després, calcula la suma dels quadrats per a cada grup. A aquest RDD de la suma dels quadrats, l'has d'anomenar `suma_cuadrados_rdd`.

- Realitza una unió i intersecció entre els dos RDDs inicials dels conjunts de llistes, hauràs d'anomenar-los `union_rdd` i `interseccion_rdd` respectivament.

- Imprimeix els resultats de cadascuna de les operacions realitzades utilitzant el mètode collect().

In [3]:
# Conjunts de números
conjunt1 = list(range(1, 21))
conjunt2 = list(range(10, 31))

#  crear RDDs a partir d'aquests conjunts de números.
rdd1 = sc.parallelize(conjunt1)
rdd2 = sc.parallelize(conjunt2)

# transformar el primer RDD per associar cada número amb el seu quadrat.
quadrats_rdd = rdd1.map(lambda x: (x, x ** 2))

# Després, es filtra les parelles en què el quadrat és superior a 50
filtrados_rdd = quadrats_rdd.filter(lambda x: x[1] > 50)

# Agrupar les dades filtrades en funció de si els números originals són parells o senars
parells = filtrados_rdd.filter(lambda x: (x[0]%2 != 0))
senars = filtrados_rdd.filter(lambda x: (x[0]%2 == 0))     

# calcula la suma dels quadrats per a cada grup
suma_senars = senars.map(lambda x: x[1]).sum()
suma_parells = parells.map(lambda x: x[1]).sum()

# RDD de la suma dels quadrats,
suma_cuadrados_rdd = sc.parallelize([("parells", suma_parells), ("senars", suma_senars)])

                    
# Realitza una unió i intersecció entre els dos RDDs inicials dels conjunts de llistes
union_rdd = rdd1.union(rdd2)
interseccion_rdd = rdd1.intersection(rdd2)


# Imprimeix els resultats de cadascuna de les operacions realitzades utilitzant el mètode collect().
print("Associar cada número amb el seu quadrat:", quadrats_rdd.collect(), "\n")
print("Filtrar les parelles amb el quadrat superior a 50:", filtrados_rdd.collect(), "\n")
print("Agrupar les dades filtrades si els números originals són parells:", parells.collect(), "\n")
print("Agrupar les dades filtrades si els números originals són senars:", senars.collect(), "\n")
print("La suma dels quadrats del grup dels senars:", suma_senars, "\n")
print("La suma dels quadrats del grup dels parells:", suma_parells, "\n")
print("RDD de la suma dels quadrats:", suma_cuadrados_rdd.collect(), "\n")
print("Unió entre els dos RDD inicials:", union_rdd.collect(), "\n")
print("Intersecció entre els dos RDD inicials:", interseccion_rdd.collect(), "\n")

Associar cada número amb el seu quadrat: [(1, 1), (2, 4), (3, 9), (4, 16), (5, 25), (6, 36), (7, 49), (8, 64), (9, 81), (10, 100), (11, 121), (12, 144), (13, 169), (14, 196), (15, 225), (16, 256), (17, 289), (18, 324), (19, 361), (20, 400)] 

Filtrar les parelles amb el quadrat superior a 50: [(8, 64), (9, 81), (10, 100), (11, 121), (12, 144), (13, 169), (14, 196), (15, 225), (16, 256), (17, 289), (18, 324), (19, 361), (20, 400)] 

Agrupar les dades filtrades si els números originals són parells: [(9, 81), (11, 121), (13, 169), (15, 225), (17, 289), (19, 361)] 

Agrupar les dades filtrades si els números originals són senars: [(8, 64), (10, 100), (12, 144), (14, 196), (16, 256), (18, 324), (20, 400)] 

La suma dels quadrats del grup dels senars: 1484 

La suma dels quadrats del grup dels parells: 1246 

RDD de la suma dels quadrats: [('parells', 1246), ('senars', 1484)] 

Unió entre els dos RDD inicials: [1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 10, 11, 12

In [None]:
# DO NOT USE THIS CELL

### **Exercici 3**: Anàlisi de Dades de Tweets en PySpark (*1.25 punts*)

En aquest exercici, treballaràs amb un fitxer JSON anomenat `tweets_sample.json` que es troba a la ruta `/aula_M2.858/data/tweets_sample.json`. Aquest fitxer conté dades de tweets i mètriques relacionades. Hauràs d'utilitzar PySpark per realitzar una anàlisi de les dades. L'estructura del fitxer JSON inclou informació com el nombre de retweets, likes, seguidors, i més. No obstant això, per a aquest exercici, et centraràs en processar i analitzar el contingut textual dels tweets.

- Carrega el fitxer JSON en un RDD utilitzant el mètode `textFile()`. Examina l'estructura de les dades per identificar com extreure el contingut rellevant.

- Extreu el camp tweets de cadascun dels tweets. Defineix i aplica una funció per netejar el text. Aquesta funció ha d'eliminar la puntuació, convertir el text a minúscules i assegurar que hi hagi un sol espai entre les paraules.

- Divideix el text en paraules i filtra les paraules per quedar-te amb aquelles que tinguin més de 3 caràcters. Després, realitza un recompte de paraules diferents que has d'anomenar `palabras_distintas_rdd`.

- Finalment, troba les 5 paraules més freqüents que contenen la lletra 'z', a aquest RDD l'has d'anomenar `top_palabras_con_z`.

- Imprimeix els resultats de cadascuna de les operacions realitzades.

In [11]:
import json
import re

# Carregar el fitzer JSON en un RDD
text_path = "/aula_M2.858/data/tweets_sample.json"
RDD_tweets = sc.textFile(text_path)

# Examinar l'estructura de les dades i mostra per pantalla
print("Es mostra el contingut de les dades:","\n")
for line in RDD_tweets.take(10):
    print (line)
    
# Extreu el camp tweets de cadascun dels tweets
tweets = RDD_tweets.map(lambda x: json.loads(x)['tweet'])

# Defineix i aplica una funció per netejar el text
def netejar(x):
    punc = '¡!"#$%&\'()*+,-./:;<=>?@[\\]^_`{|}~'
    lowercased_str = x.lower()
    for ch in punc:
        lowercased_str = lowercased_str.replace(ch, '')
    cleansed = re.sub(r'\s+', ' ', lowercased_str).strip()
    
    return cleansed

# Aplicar la funció de neteja al camp 'tweet'
tweets_nets = tweets.map(netejar)
print("\n")
print("Tweets extrets i netejats:", tweets_nets, "\n")
for tweet in tweets_nets.take(10):  # Mostra les primeres 10 tweets
    print(tweet)
    
#Divideix el text en paraules 
paraules_rdd = tweets_nets.flatMap(lambda x: x.split())
print("\n")
print("Text dividit en paraules:", paraules_rdd, "\n")
for paraula in paraules_rdd.take(10):  # Mostra les primeres 10 paraules
    print(paraula)
    
# filtra les paraules per quedar-te amb aquelles que tinguin més de 3 caràcters
mes_de_tres = paraules_rdd.filter(lambda paraula: len(paraula) > 3)

#  recompte de paraules diferents
palabras_distintas_rdd = mes_de_tres.distinct().count()
print("\n")
print("Recompte de paraules diferents que tinguin més de 3 caràcters:", palabras_distintas_rdd, "\n")

# les 5 paraules més freqüents que contenen la lletra 'z'
top_palabras_con_z =  paraules_rdd.filter(lambda paraula: 'z' in paraula).map(lambda paraula: (paraula, 1)).reduceByKey(lambda a, b: a + b).sortBy(lambda x: -x[1]).take(5)
print("5 paraules més freqüents que contenen la lletra 'z':", top_palabras_con_z)





Es mostra el contingut de les dades: 

{"tweet_id": 1, "user": "usuario1", "followers": 150, "retweets": 5, "likes": 10, "tweet": "¡Hola mundo! Este es un tweet de prueba para ver cómo funciona. #prueba #mundo"}
{"tweet_id": 2, "user": "usuario2", "followers": 300, "retweets": 2, "likes": 7, "tweet": "Los datos son el nuevo petróleo. Analiza, visualiza y actúa. #data #análisis"}
{"tweet_id": 3, "user": "usuario3", "followers": 500, "retweets": 15, "likes": 20, "tweet": "Un día productivo en la oficina. ¿Alguna vez has tenido un día así? #productividad"}
{"tweet_id": 4, "user": "usuario4", "followers": 250, "retweets": 10, "likes": 5, "tweet": "¿Sabías que Python es uno de los lenguajes de programación más utilizados? #Python #programación"}
{"tweet_id": 5, "user": "usuario5", "followers": 100, "retweets": 1, "likes": 3, "tweet": "La programación puede ser divertida y emocionante. ¡No te rindas! #programación"}
{"tweet_id": 6, "user": "usuario6", "followers": 400, "retweets": 0, "likes"

In [None]:
# DO NOT USE THIS CELL

### **Exercici 4**: Optimització de Càlculs amb Persistència (*0.25 punts*)

Per reduir els temps d'execució en Spark, és fonamental utilitzar la persistència d'un RDD mitjançant el mètode `persist()`. Aquesta tècnica és particularment útil quan es realitzen múltiples operacions repetides sobre un mateix RDD.

Quan persisteixes un RDD, Spark emmagatzema les dades en memòria (o en disc, depenent del nivell de persistència, per veure més sobre els nivells de persistència aneu a la web [Persistència Spark](https://archive.apache.org/dist/spark/docs/2.4.0/rdd-programming-guide.html#rdd-persistence)) per evitar recomputacions cada vegada que es necessita realitzar una acció sobre el RDD. Això significa que cada node del clúster guarda en la seva memòria les particions del RDD que ha processat, permetent que les següents operacions sobre el RDD siguin molt més ràpides.

**Mesura de Rendiment**

Per mesurar la millora en els temps d'execució, podem utilitzar la funció màgica `%%time` en un entorn Jupyter Notebook, que permet observar:

- Wall clock time: Temps total real que porta executar una tasca, incloent la CPU, el temps d'entrada/sortida (I/O), i les possibles comunicacions entre nodes en el clúster.

- CPU time: Temps efectiu en què la CPU està ocupada executant la tasca, excloent altres latències com la d'entrada/sortida.

En aquest exercici, s'explorarà l'ús de la persistència en RDDs (Resilient Distributed Datasets) utilitzant PySpark. L'objectiu és observar com la persistència afecta el rendiment de les operacions de transformació i acció sobre els RDDs.

- Crea un RDD a partir d'una llista de números que va de l'1 al 10.000.

- Filtra el RDD per obtenir només els números majors a 5.000 i emmagatzema aquest resultat en un nou RDD.

- Aplica una transformació per duplicar els valors del RDD filtrat i guarda'l en un nou RDD.

In [None]:
# YOUR CODE HERE
raise NotImplementedError()

- Utilitza el mètode collect() per recuperar i mostrar els números majors a 5.000 i els seus dobles, i mesura el temps que triga en executar-se aquesta operació utilitzant la funció màgica `%%time`.

In [None]:
# YOUR CODE HERE
raise NotImplementedError()

- Aplica la persistència sobre el RDD de nombres majors a 5.000 per a que ele seu contingut es mantingui en memòria entre les operacions.

In [None]:
# YOUR CODE HERE
raise NotImplementedError()

- Torna a executar el mètode collect() com abans. Compara aquest temps amb el temps de la primera execució. (Pots executar-lo diverses vegades i veure què passa amb el temps de processament.)

In [None]:
# YOUR CODE HERE
raise NotImplementedError()

- Elimina la persistència de l'RDD utilitzant `unpersist()` per alliberar recursos i atura la sessió de Spark al final de l'exercici amb `sc.stop()`.

In [None]:
# YOUR CODE HERE
raise NotImplementedError()

- Al finalitzar l'exercici, analitza i comenta els resultats obtinguts, explicant com la persistència va afectar el rendiment dels teus càlculs.

In [None]:
# YOUR CODE HERE
raise NotImplementedError()

# **Apache Spark Dataframes**

En aquesta part de la pràctica introduirem els elements que ofereix Spark per treballar amb estructures de dades. Veurem des d'estructures molt simples fins a estructures complexes, on els camps poden al seu torn tenir camps niats. En concret utilitzarem dades de Twitter capturades en el context de les eleccions generals a Espanya del 28 d'abril de 2019.

### Configuració de l'entorn

In [None]:
import findspark
findspark.init()

In [None]:
import re
import os
import pandas as pd
from matplotlib import pyplot as plt
from math import floor
from pyspark import SparkConf, SparkContext, SQLContext, HiveContext
from pyspark.sql import Row

In [None]:
# YOUR CODE HERE
raise NotImplementedError()

## Introducció a dataframes estructurats i operacions sobre ells

Com ja s'ha mencionat, en els següents exercicis utilitzarem dades de Twitter que vam recollir durant les eleccions generals a Espanya del 28 d'abril de 2019. Com veurem, els tweets tenen una estructura interna força complexa que hem simplificat una mica en aquesta pràctica.

El primer que aprendrem és com importar aquest tipus de dades al nostre entorn. Un dels tipus d'arxius més comuns per guardar aquest format d'informació és [l'estructura JSON](https://en.wikipedia.org/wiki/JSON). Aquesta estructura permet guardar informació en un text pla de diferents objectes seguint una estructura de diccionari on cada camp té assignat una clau i un valor. L'estructura pot ser niada, és a dir, que una clau pot tenir com a valor una altra estructura de tipus diccionari.

Spark SQL permet llegir dades de molts formats diferents. Se us demana que [llegiu el fitxer JSON](https://archive.apache.org/dist/spark/docs/2.4.0/sql-data-sources-json.html) de la ruta ```/aula_M2.858/data/tweets28a_sample.json```. Aquest arxiu conté una petita mostra, un 0.1% de la base de dades completa (en un següent apartat veurem com realitzar aquest mostreig). En aquesta ocasió no se us demana especificar l'estructura del dataframe ja que la funció de lectura la inferirà automàticament.

**Exemple de lectura (Omplir amb el corresponent per a la lectura de l'arxiu json)**:


```Python
sqlContext = SQLContext(sc)
tweets_sample = sqlContext.read.<FILL IN>

print("Loaded dataset contains %d tweets" % tweets_sample.count())
```

Per mostrar l'estructura del dataset que acabem de carregar, podeu obtenir la informació sobre com està estructurat el DataTable utilitzant el mètode ```printSchema()```. Heu de familiaritzar-vos amb aquesta estructura ja que serà la que utilitzarem durant els propers exercicis. Recordeu també que no tots els tweets tenen tots els camps, com per exemple la ubicació (camp ```place```). Quan això passa el camp passa a ser ```NULL```. Podeu veure més informació sobre aquest tipus de dades en [aquest enllaç](https://developer.twitter.com/en/docs/tweets/data-dictionary/overview/tweet-object).

Ara heu d'introduir l'exemple de lectura amb el `<FILL IN>` omplert segons correspongui per a la lectura de l'arxiu JSON. I, a continuació, mostrareu l'estructura del dataset utilitzant `printSchema()`.

In [None]:
# YOUR CODE HERE
raise NotImplementedError()

In [None]:
# DO NOT USE THIS CELL

### Consultes sobre dataframes complexos

A continuació veurem com realitzar consultes sobre el dataset dels tweets. Utilitzarem [sentències *SQL*](https://www.w3schools.com/sql/default.asp) (com les utilitzades en la majoria de bases de dades relacionals).

El primer que s'ha de fer és registrar el dataframe de tweets com una taula de SQL. Per a això utilitzarem [sqlContext.registerDataFrameAsTable()](https://archive.apache.org/dist/spark/docs/2.4.0/api/python/pyspark.sql.html#pyspark.sql.SQLContext.registerDataFrameAsTable). Per executar comandes sql només heu d'utilitzar el mètode sql() de l'objecte context, en aquest cas `sqlContext`.

#### Consultes a través del pipeline
Les taules de Spark SQL ofereixen un altre mecanisme per aplicar les transformacions i obtenir resultats similars als que s'obtindrien aplicant una consulta SQL. Per exemple, utilitzant el següent pipeline obtindrem el text de tots els tweets en espanyol:

```
tweets_sample.where("lang == 'es'").select("text")
```

Què és equivalent a la següent sentència SQL:

```
SELECT text
FROM tweets_sample
WHERE lang == 'es'
```

Podeu consultar l'[API de spark SQL](https://archive.apache.org/dist/spark/docs/2.4.0/api/python/pyspark.sql.html) per trobar més informació sobre com utilitzar les diferents transformacions en taules.

### **Exercici 5**: Anàlisi de Tweets mitjançant DataFrames i consultes SQL (*2 punts*)

Anteriorment ja has realitzat la lectura del conjunt `tweets28a_sample.json` en format JSON. Ara hauràs d'assegurar-te de registrar el DataFrame com una taula SQL anomenada `tweets_sample`.

***Nota:*** A causa que és possible que executis aquestes línies de codi diverses vegades, prendrem la precaució d'executar la comanda SQL per eliminar taules abans que les creïs, ja que pot existir la possibilitat que ja existeixin.

`sqlContext.sql("DROP TABLE IF EXISTS tweets_sample")`

A continuació, es demana crear una taula i registrar-la amb el nom ```users_agg``` amb [la informació agregada](https://www.w3schools.com/sql/sql_groupby.asp) dels usuaris que tinguin definit el seu idioma (```user.lang```) com a espanyol (```es```). En concret es demana que la taula contingui les següents columnes:
- **screen_name:** nom de l'usuari
- **friends_count:** nombre màxim (veure nota) de persones a les quals segueix
- **tweets:** nombre de tweets realitzats
- **followers_count:** nombre màxim (veure nota) de persones que segueixen l'usuari.

L'ordre en el qual s'han de mostrar els registres és ordre descendent d'acord amb el nombre de tweets.

***Nota:*** És important que tinguis en compte que el nom de *friends* i *followers* pot diferir al llarg de l'adquisició de dades. En aquest cas utilitzarem la funció d'agregació `MAX` sobre cadascun d'aquests camps per evitar segmentar l'usuari en diverses instàncies.

In [None]:
# YOUR CODE HERE
raise NotImplementedError()

In [None]:
# DO NOT USE THIS CELL

A continuació, recorrerem al [JOIN de taules](https://www.w3schools.com/sql/sql_join.asp) per combinar la informació entre taules. Has de combinar la taula `users_agg` i la taula `tweets_sample` utilitzant un `INNER JOIN` per obtenir una nova taula amb el nom `retweeted` amb la següent informació:
- ***screen_name:*** nom d'usuari
- ***friends_count:*** nombre màxim de persones a les quals segueix
- ***followers_count:*** nombre màxim de persones que segueixen l'usuari.
- ***tweets:*** nombre de tweets realitzats per l'usuari.
- ***retweeted:*** nombre de retweets obtinguts per l'usuari.
- ***ratio_tweet_retweeted:*** ràtio de retweets per nombre de tweets publicats $\frac{retweets}{tweets}$

La taula resultant `retweeted` ha d'estar ordenada de manera descendent segons el valor de la columna `ratio_tweet_retweeted`.

Per últim, utilitzant queries a través de pipeline, has de crear una taula `user_retweets` a partir de la taula `tweets_sample`, utilitzant transformacions que contingui dues columnes:
- ***screen_name:*** nom d'usuari
- ***retweeted:*** nombre de retweets

Ordena la taula en ordre descendent utilitzant el valor de la columna ```retweeted```.

In [None]:
# YOUR CODE HERE
raise NotImplementedError()

In [None]:
# DO NOT USE THIS CELL

## Bases de dades HIVE i operacions complexes

Fins ara hem estat treballant amb una petita mostra dels tweets generats (el 0.1%). En aquesta part de l'activitat veurem com treballar i tractar amb el dataset complet. Per això utilitzarem tant transformacions sobre taules com operacions sobre RDD quan sigui necessari.

És important tenir en compte que moltes vegades les dades amb les quals treballem s'utilitzaran en diversos projectes. En lloc de manejar directament els arxius, és més eficient i organitzat recórrer a una base de dades per gestionar la informació. En l'ecosistema de Hadoop, una de les bases de dades més utilitzades és [Apache Hive](https://hive.apache.org/). No obstant això, és crucial entendre que Hive no és una base de dades convencional. En realitat, funciona com un metastore que mapatge arxius en el sistema de fitxers distribuït de Hadoop (HDFS).

Això significa que Hive no emmagatzema les dades en el seu propi format de base de dades, sinó que actua com una interfície que permet als usuaris executar consultes SQL sobre les dades emmagatzemades en HDFS. Això proporciona una manera eficient d'accedir i manipular grans volums de dades distribuïdes sense necessitat de moure-les o convertir-les a un format tradicional de base de dades.

La manera d'accedir a aquesta base de dades és creant un context Hive de manera molt similar a com declarem un context SQL. Primer declararem una variable ```hiveContext``` instanciant-la com un objecte de la classe ```HiveContext```. Acte seguit comprovarem quantes taules estan registrades en aquest context.

**Esquema**
```Python
hiveContext = <FILL IN>
hiveContext.tables().show()
```

Executa aquest esquema amb el FILL IN omplert.

In [None]:
# YOUR CODE HERE
raise NotImplementedError()

Observa que ara mateix tenim diverses taules registrades en aquest context. Algunes d'elles no temporals i dues temporals, les que hem registrat prèviament. Per tant, sqlContext i hiveContext estan connectats (és la mateixa sessió).

### Més enllà de les transformacions SQL

Algunes vegades necessitarem obtenir resultats que precisen operacions que van més enllà del que podem aconseguir (còmodament) utilitzant el llenguatge SQL. En aquesta part de la pràctica veurem com passar d'una taula a un RDD, per fer operacions complexes, i després tornar a passar a una taula.

Ara ve la part interessant. Una taula pot convertir-se en un RDD a través de l'atribut ```.rdd```. Aquest atribut guarda la informació de la taula en una llista on cada element és un [objecte del tipus ```Row```](https://archive.apache.org/dist/spark/docs/2.4.0/api/python/pyspark.sql.html#pyspark.sql.Row). Els objectes pertanyents a aquesta classe poden veure's com diccionaris on la informació de les diferents columnes queda reflectida en forma d'atribut. Per exemple, imagineu que tenim una taula amb dues columnes, nom i cognom, si utilitzem l'atribut ```.rdd``` d'aquesta taula obtindrem una llista amb objectes del tipus row on cada objecte té dos atributs: nom i cognom. Per accedir als atributs només heu d'utilitzar la sintaxi *punt* de Python, per exemple, ```row.nom``` o ```row.cognom```.

### **Exercici 6**: Anàlisi de Tweets Geolocalitzats (*1.5 punts*)

Donada la taula de tweets `tweets28a_sample25`, has de crear una variable `tweets` amb `hiveContext` utilitzant el mètode `table()`. Utilitzant una sentència SQL, es requereix extreure informació sobre els tweets que contenen dades geolocalitzades (és a dir, aquells on el camp `place` no és nul) i determinar quants tweets s'han generat des de cada lloc. Els resultats han de ser presentats en ordre descendent per la quantitat de tweets.

**Esquema sentència sql**
```Python
tweets_place = hiveContext.sql(<FILL IN>)
```
A continuació, crea una taula anomenada `tweets_place` que contingui dues columnes:

- ***name:*** nom del lloc des d'on s'ha generat el tweet.
- ***tweets:*** nombre total de tweets realitzats en aquest lloc.

Finalment, mostra els 10 llocs amb major nombre de tweets a la taula resultant.

Addicionalment, crea una taula anomenada `tweets_geo` que contingui únicament els tweets que tenen informació de geolocalització, i assegura't que inclogui el nom del lloc. A partir d'aquesta taula, crea un objecte ```tweets_place_rdd``` que contingui una llista de tuples amb la informació ```(name, tweets)``` sobre el nom del lloc i el nombre de tweets generats des d'allà.

Un cop generat aquest RDD, crearem una taula que anomenaràs també `tweets_place`. El primer pas és generar per cada tupla un objecte Row que contingui un atribut ```name``` i un atribut ```tweets```. Ara només heu d'aplicar el mètode ```toDF()``` per generar una taula. Ordeneu les files d'aquesta taula pel nombre de tweets en ordre descendent.

L'exercici s'ha de realitzar combinant tant sentències SQL com RDD en Apache Spark.

In [None]:
# Creació de la variable tweets amd hiveContext


# Extreure informació sobre els tweets on el camp place no és null


# Determinar quants tweets s'han generat des de cada lloc

# Presentar els resultats en ordre descendent per la quantitat de tweets




In [None]:
# DO NOT USE THIS CELL

## Mostreig

En moltes ocasions, abans de llançar processos costosos, és una pràctica habitual tractar amb un petit conjunt de les dades per investigar algunes propietats o simplement per depurar els nostres algoritmes, a aquesta tasca se la coneix com a mostreig. En aquesta part de la pràctica veurem els dos principals mètodes de mostreig i com utilitzar-los.

### Nota:
Per produir un gràfic de barres utilitzant [Pandas](https://pandas.pydata.org/) on es mostri la informació que acabeu de generar. Primer transformeu la taula a pandas utilitzant el mètode `toPandas()`. Ploteu la taula resultant utilitzant [la funcionalitat gràfica de pandas.](https://pandas.pydata.org/docs/reference/api/pandas.DataFrame.plot.bar.html)

### Homogeni

El primer mostreig que veurem és [l'homogeni](https://en.wikipedia.org/wiki/Simple_random_sample). Aquest mostreig es basa en simplement escollir una fracció de la població seleccionant aleatòriament elements d'aquesta.

Primer de tot realitzarem un mostreig homogeni de l'1% dels tweets generats en període electoral sense reemplaçament. Guardeu en una variable ```tweets_sample``` aquest mostreig utilitzant el mètode ```sample``` descrit a l'[API de pyspark SQL](https://archive.apache.org/dist/spark/docs/2.4.0/api/python/pyspark.sql.html). La llavor que utilitzareu per inicialitzar el generador aleatori és 42.

**Esquema**
```Python
seed = 42
fraction = 0.01

tweets_sample = tweets.<FILL IN>

print("Number of tweets sampled: {0}".format(tweets_sample.count()))
```


### **Exercici 7**: Anàlisi del Patró d'Activitat Horària a Twitter (*1 punts*)
A partir d'una mostra de l'1% dels tweets disponibles, es desitja analitzar el patró d'ús diari de Twitter, prestant especial atenció a l'activitat horària. L'objectiu és calcular i visualitzar la mitjana de tweets generats en cada hora del dia. Per a això s'ha de crear una taula ```tweets_timestamp``` amb la informació:
- ***created_at***: timestamp de quan es va publicar el tweet.
- ***hour***: a quina hora del dia correspon.
- ***day***: Data en format MM-dd-YY

La taula ha d'estar en ordre ascendent segons la columna `created_at`

**Pista**: Per crear les columnes "hour" i "day" a la teva taula tweets_timestamp, pots utilitzar withColumn(). La funció ```hour``` et servirà per extreure l'hora del timestamp i la funció ```date_format``` et permetrà generar la data.

A continuació, utilitza la mostra de tweets per extreure l'hora i data de publicació, i a partir d'aquesta informació, determina quants tweets es generen per hora, has de crear una taula `tweets_hour_day` a partir d'aquesta informació.

Finalment, només ens queda fer una agregació per hora per aconseguir la mitjana de tweets per hora. Heu de generar una taula ```tweets_hour``` amb la informació:
- ***hour:*** Hora
- ***tweets:*** Mitjana de tweets realitzats

Recordeu que estem treballant amb un sample de l'1% per tant heu de corregir la columna ```tweets``` perquè reflecteixi la mitjana que hauríem d'esperar en el conjunt complet de tweets. La taula ha d'estar ordenada en ordre ascendent d'hora.

In [None]:
# YOUR CODE HERE
raise NotImplementedError()

In [None]:
# DO NOT USE THIS CELL

### Estratificat

En moltes ocasions el mostreig homogeni no és adequat ja que per la pròpia estructura de les dades determinats segments poden estar sobre-representades. Aquest és el cas que observem en els tweets on les grans àrees urbanes estan sobrerepresentades si ho comparem amb el volum de població. En aquesta activitat veurem com aplicar aquesta tècnica al dataset de tweets, per obtenir un mostreig que respecti la proporció de diputats per província.

A Espanya, el procés electoral assigna un volum de diputats a cada província que depèn de la població i d'un percentatge mínim assignat per llei. En el context Hive que hem creat prèviament (```hiveContext```) podem trobar una taula (```province_28a```) que conté informació sobre les circumscripcions electorals. Carregueu aquesta taula en una variable amb nom ```province```.

In [None]:
# YOUR CODE HERE
raise NotImplementedError()

province.limit(20).show()

In [None]:
assert province.count() == 52, "Incorrect answer"

Per fer un mostreig estratificat, primer hem de determinar la fracció que volem assignar a cada categoria. En aquest cas, volem una fracció que faci que la ràtio tweets per diputat sigui igual per a totes les capitals de província. Hem de tenir en compte que la precisió de la geolocalització a Twitter és normalment a nivell de ciutat. Per això, per evitar incrementar la complexitat de l'exercici, utilitzarem els tweets en capitals de província com a proxy dels tweets en tota la província.

### **Exercici 8**: Anàlisi de la Relació entre Tweets i Diputats per Província (*0.75 punts*)

El primer que heu de fer és crear una taula ```info_tweets_province``` que ha de contenir:
- ***capital:*** nom de la capital de província.
- ***tweets:*** nombre de tweets geolocalitzats en cada capital.
- ***diputats:*** diputats assignats a la província.
- ***ratio_tweets_diputat:*** nombre de tweets per diputat.

Heu d'ordenar la llista per ```ratio_tweets_diputado``` en ordre ascendent.

***Nota:*** Podeu realitzar aquest exercici de moltes maneres, probablement la més fàcil és utilitzar la taula ```tweets_place``` que heu generat en l'exercici 5. Recordeu com utilitzar el ```join()```.

In [None]:
# YOUR CODE HERE
raise NotImplementedError()

# VARIABLES DONADES (Las farem servir després)
output = info_tweets_province.first()
maximum_ratio = floor(output.ratio_tweets_diputado * 100) / 100

In [None]:
# DO NOT USE THIS CELL

A continuació, necessitem un diccionari amb el nom ```ratios``` on cada capital de província és una clau i el seu valor associat és la fracció de tweets que anem a mostrejar. En aquest cas, el que volem és que la ràtio de tweets per cada diputat sigui similar per a cada capital de província.

Com que volem que el mostreig sigui el més gran possible i no volem que cap capital estigui infrarepresentada, la ràtio de tweets per diputat serà el valor més petit que podeu observar a la taula ```info_tweets_province```, que correspon a 11.66 tweets per diputat a Teruel. Teniu aquest valor guardat a la variable ```maximum_ratio```.

*Nota:* El mètode ```collectAsMap()``` transforma un PairRDD en un diccionari.

Finalment, genera una taula ```geo_tweets``` amb tots els tweets geolocalitzats. Ara ja estem en disposició de fer el mostreig estratificat per població. Per a això podeu utilitzar el mètode ```sampleBy()```. Utilitzeu 42 com a llavor del generador pseudoaleatori.

In [None]:
# YOUR CODE HERE
raise NotImplementedError()

In [None]:
# DO NOT USE THIS CELL

## Introducció a les dades relacionals

El fet de treballar amb una base de dades que conté informació generada en una xarxa social ens permet introduir el concepte de dades relacionals. Podem definir dades relacionals com aquelles en què existeixen relacions entre les entitats que constitueixen la base de dades. Si aquestes relacions són binàries, relacions 1 a 1, podem representar les relacions com un graf compost per un conjunt de vèrtexs $\mathcal{V}$ i un conjunt d'arestes $\mathcal{E}$ que els relacionen.

En el cas de grafs que emergeixen de manera orgànica, aquest tipus d'estructura va més enllà dels grafs regulars que segurament coneixeu. Aquest tipus d'estructures es coneixen com a [xarxes complexes](https://ca.wikipedia.org/wiki/Xarxa_complexa). L'estudi de l'estructura i dinàmiques d'aquest tipus de xarxes ha contribuït a importants resultats en camps tan dispars com la física, la sociologia, l'ecologia o la medicina.

![complex_network](https://images.squarespace-cdn.com/content/5150aec6e4b0e340ec52710a/1364574727391-XVOFAB9P6GHKTDAH6QTA/lastfm_800_graph_white.png?content-type=image%2Fpng)

En aquesta última part de la pràctica treballarem amb aquest tipus de dades. En concret modelarem una de les possibles relacions presents en el dataset, la xarxa de retweets.

#### Construcció de l'edgelist

El primer que se us demana és que genereu la xarxa. Hi ha diverses maneres de representar una xarxa complexa, per exemple, si estiguéssiu interessats a treballar-hi des del punt de vista teòric, la manera més habitual de representar-les és utilitzant una [matriu d'adjacència](https://ca.wikipedia.org/wiki/Matriu_d%27adjacència). En aquesta pràctica ens centrarem en l'aspecte computacional, una de les maneres més eficients (computacionalment parlant) de representar una xarxa és mitjançant la seva [*edge list*](https://en.wikipedia.org/wiki/Edge_list), una taula que especifica la relació a parelles entre les entitats.

Les relacions poden ser bidireccionals o direccionals i tenir algun pes assignat o no (weighted or unweighted). En el cas que ens ocupa, estem parlant d'una xarxa dirigida, un usuari retuiteja a un altre, i podem pensar-la tenint en compte quantes vegades això ha passat.

#### Centralitat de grau

Un dels descriptors més comuns en l'anàlisi de xarxes és el grau. El grau quantifica quantes arestes estan connectades a cada vèrtex~s~. En el cas de xarxes dirigides com la que acabem de crear aquest descriptor està descompost en el:
- **in degree**: quantes arestes apunten al node
- **out degree**: quantes arestes surten del node

Si fas un rànquing d'aquests valors obtindràs una mesura de centralitat, la [centralitat de grau](https://en.wikipedia.org/wiki/Centrality#Degree_centrality), de cadascun dels nodes.

### **Exercici 9**: Anàlisi d'Interaccions de Retweets i Graus d'Usuari (*0.75 punts*)

A partir d'una mostra homogènia de l'1% dels tweets, amb la llavor 42 per garantir la reproductibilitat, realitza una anàlisi de les interaccions de retweets entre usuaris a la xarxa social.

**Esquema**
```Python
seed = 42
sample = tweets.<FILL IN>
```
Crea una taula ```edgelist``` amb la següent informació:
- ***src:*** usuari que retuiteja
- ***dst:*** usuari que és retuitejat
- ***weight:*** nombre de vegades que un usuari retuiteja a un altre.

Filtra el resultat perquè contingui només les relacions amb un weight igual o superior a dos.

A continuació, genera una taula `outDegree` amb la informació:
- ***screen_name:*** nom de l'usuari.
- ***outDegree:*** out degree del node.

Ordena la taula per out degree en ordre descendent.

Se us demana ara que genereu una taula `inDegree` amb la informació:
- ***screen_name:*** nom de l'usuari.
- ***inDegree:*** in degree del node.

Ordena la taula per in degree en ordre descendent.

In [None]:
# YOUR CODE HERE
raise NotImplementedError()

In [None]:
# DO NOT USE THIS CELL

### **Exercici 10**: Distribució del Grau de Sortida en una Xarxa de Retweets (*0.75 punts*)

A partir d'una mostra de l'1% dels tweets, amb una llavor de 42 per assegurar la reproductibilitat, realitza una anàlisi bàsica de la xarxa de retweets. El teu objectiu és calcular i mostrar la distribució de graus dels usuaris en la xarxa de retweets.

Per a això, segueix aquests passos:

- Crea una taula de Edgelist: Defineix una taula `edgelist` que contingui les relacions de retweet entre usuaris, on cada fila representa un retweet realitzat d'un usuari a un altre.

- Calcula el Grau de Sortida (Out-Degree): Determina quants retweets ha realitzat cada usuari (és a dir, el nombre d'usuaris als quals cada usuari ha retuitejat). Anomena aquesta variable `outDegree`.

- Obté la Distribució de Grau de Sortida: Crea una taula `outDegree_distribution` que mostri quants usuaris tenen un determinat nombre de retweets realitzats. Ordena els resultats pel grau de sortida.

In [None]:
# YOUR CODE HERE
raise NotImplementedError()

In [None]:
# DO NOT USE THIS CELL