# Spark SQL: twitter trends
Cayetano Rodríguez Medina

Primero hay que importar el contexto de Spark y configurarlo. Le ponemos un nombre a la aplicación y especificamos cuál es el master de Spark al que nos queremos conectar.

In [2]:
from pyspark import SparkContext
from pyspark import SparkConf
conf = SparkConf().setAppName("trends").setMaster("spark://tfmspark.ddns.net:7077")
sc = SparkContext.getOrCreate(conf=conf)

Añadimos algunos import que hacen falta. Especialmente los de tweepy para poder extraer los datos de Twitter.

In [4]:
import os
import sys
from tweepy import API
from tweepy import OAuthHandler

Autenticación en la API de Twitter

In [5]:
# Variables that contains the user credentials to access Twitter API 
ACCESS_TOKEN = 'XXXXXXXXXXXXXXXX'
ACCESS_SECRET = 'XXXXXXXXXXXXXXXX'
CONSUMER_KEY = 'XXXXXXXXXXXXXX'
CONSUMER_SECRET = 'XXXXXXXXXXXXXXXXX'

In [6]:
auth = OAuthHandler(CONSUMER_KEY, CONSUMER_SECRET)
auth.set_access_token(ACCESS_TOKEN, ACCESS_SECRET)

In [7]:
client = API(auth)

Obtenemos todas las posibles tendencias, es decir, los países de donde podemos obtener los temas más comentados actualmente.

In [8]:
trends = sc.parallelize(client.trends_available())

In [9]:
trends.collect()

[{'country': '',
  'countryCode': None,
  'name': 'Worldwide',
  'parentid': 0,
  'placeType': {'code': 19, 'name': 'Supername'},
  'url': 'http://where.yahooapis.com/v1/place/1',
  'woeid': 1},
 {'country': 'Canada',
  'countryCode': 'CA',
  'name': 'Winnipeg',
  'parentid': 23424775,
  'placeType': {'code': 7, 'name': 'Town'},
  'url': 'http://where.yahooapis.com/v1/place/2972',
  'woeid': 2972},
 {'country': 'Canada',
  'countryCode': 'CA',
  'name': 'Ottawa',
  'parentid': 23424775,
  'placeType': {'code': 7, 'name': 'Town'},
  'url': 'http://where.yahooapis.com/v1/place/3369',
  'woeid': 3369},
 {'country': 'Canada',
  'countryCode': 'CA',
  'name': 'Quebec',
  'parentid': 23424775,
  'placeType': {'code': 7, 'name': 'Town'},
  'url': 'http://where.yahooapis.com/v1/place/3444',
  'woeid': 3444},
 {'country': 'Canada',
  'countryCode': 'CA',
  'name': 'Montreal',
  'parentid': 23424775,
  'placeType': {'code': 7, 'name': 'Town'},
  'url': 'http://where.yahooapis.com/v1/place/3534',

Podemos intuir que para cada país existe un código que podremos usar para pedir las tendencias en dicho país, pero es un poco tedioso tener que buscar un país en concreto (ej.- España).

Vamos a usar Spark SQL para solucionar esto.

In [10]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("Spark SQL trends").getOrCreate()

Creamos un DataFrame a partir del RDD anterior, que contiene el JSON con todos los países.

In [11]:
df = spark.read.json(trends)

Podemos observar la misma salida anterior pero en forma de tabla.

In [12]:
df.show()

+--------------------+--------------+-----------+-----------+--------+---------+--------------------+-----+
|     _corrupt_record|       country|countryCode|       name|parentid|placeType|                 url|woeid|
+--------------------+--------------+-----------+-----------+--------+---------+--------------------+-----+
|{'parentid': 0, '...|          null|       null|       null|    null|     null|                null| null|
|                null|        Canada|         CA|   Winnipeg|23424775| [7,Town]|http://where.yaho...| 2972|
|                null|        Canada|         CA|     Ottawa|23424775| [7,Town]|http://where.yaho...| 3369|
|                null|        Canada|         CA|     Quebec|23424775| [7,Town]|http://where.yaho...| 3444|
|                null|        Canada|         CA|   Montreal|23424775| [7,Town]|http://where.yaho...| 3534|
|                null|        Canada|         CA|    Toronto|23424775| [7,Town]|http://where.yaho...| 4118|
|                null|      

Con Spark SQL podemos inferir el esquema de la tabla de datos.

In [13]:
df.printSchema()

root
 |-- _corrupt_record: string (nullable = true)
 |-- country: string (nullable = true)
 |-- countryCode: string (nullable = true)
 |-- name: string (nullable = true)
 |-- parentid: long (nullable = true)
 |-- placeType: struct (nullable = true)
 |    |-- code: long (nullable = true)
 |    |-- name: string (nullable = true)
 |-- url: string (nullable = true)
 |-- woeid: long (nullable = true)



Podemos ejecutar filtros y acciones sobre la tabla como si se tratara de una base de datos relacional. Por ejemplo, podemos filtrar por nombre e identificador, de todas las localizaciones cuyo país sea España. Así podremos ver las tendencias actuales en España.

In [14]:
df.select("name", "woeid").filter(df['country']=='Spain').show()

+----------+--------+
|      name|   woeid|
+----------+--------+
| Barcelona|  753692|
|    Bilbao|  754542|
|Las Palmas|  764814|
|    Madrid|  766273|
|    Malaga|  766356|
|    Murcia|  768026|
|     Palma|  769293|
|   Seville|  774508|
|  Valencia|  776688|
|  Zaragoza|  779063|
|     Spain|23424950|
+----------+--------+



Observamos que hay varias provincias españolas para las que podemos pedir las tendencias, o bien, en el país completo.

En primer lugar visualizaremos las tendencias en todo el país.

In [15]:
spain_trends = client.trends_place(23424950)

Anteriormente creamos el DataFrame a partir de un RDD directamente. Ahora vamos a demostrar que es posible también hacerlo a través de un fichero. Para ello, almacenaremos el objeto JSON con las tendencias en un fichero.

In [28]:
import json
with open('spain_trends.json', 'w') as outfile:
    json.dump(spain_trends[0]['trends'], outfile)

Creamos el DataFrame a partir del fichero .json.

In [97]:
df_spain = spark.read.json('spain_trends.json')

Ahora vemos el contenido en forma de tabla.

In [98]:
df_spain.show()

+-----------------+----------------+--------------------+------------+--------------------+
|             name|promoted_content|               query|tweet_volume|                 url|
+-----------------+----------------+--------------------+------------+--------------------+
|  #HayQueEcharlos|            null|   %23HayQueEcharlos|       44849|http://twitter.co...|
|     #VivaLaVida1|            null|      %23VivaLaVida1|        null|http://twitter.co...|
|         Dumoulin|            null|            Dumoulin|       22874|http://twitter.co...|
|           Mc Men|            null|        %22Mc+Men%22|        null|http://twitter.co...|
|     Rubén Castro|            null|%22Rub%C3%A9n+Cas...|        null|http://twitter.co...|
|  #TJCamisetaGuay|            null|   %23TJCamisetaGuay|        null|http://twitter.co...|
|         Hamburgo|            null|            Hamburgo|        null|http://twitter.co...|
|      Xabi Alonso|            null|   %22Xabi+Alonso%22|       38118|http://twi

E inferimos el esquema.

In [99]:
df_spain.printSchema()

root
 |-- name: string (nullable = true)
 |-- promoted_content: string (nullable = true)
 |-- query: string (nullable = true)
 |-- tweet_volume: long (nullable = true)
 |-- url: string (nullable = true)



Finalmente, filtramos únicamente la columna que contiene el nombre de las tendencias, y nos quedamos con las 10 primeras.

In [108]:
df_spain.select('name').show(10)

+-----------------+
|             name|
+-----------------+
|  #HayQueEcharlos|
|     #VivaLaVida1|
|         Dumoulin|
|           Mc Men|
|     Rubén Castro|
|  #TJCamisetaGuay|
|         Hamburgo|
|      Xabi Alonso|
|#CuatroAlAbordaje|
|   #LigaIberdrola|
+-----------------+
only showing top 10 rows



Podemos comparar las tendencias españolas con las tendencias mundiales (identificador = 1)

In [109]:
world_trends = client.trends_place(1)

In [111]:
with open('world_trends.json', 'w') as outfile:
    json.dump(world_trends[0]['trends'], outfile)

In [112]:
df_world = spark.read.json('world_trends.json')

In [113]:
df_world.select('name').show(10)

+----------------+
|            name|
+----------------+
|      #بنت_ترامب|
|#お前らガチ泣きしたシーン晒せよ|
|     #Bundesliga|
|       #lovelive|
|#NamoreAlguémQue|
|        Millwall|
|        Dumoulin|
|        Dortmund|
|       NOT TODAY|
|     Enes Kanter|
+----------------+
only showing top 10 rows



Podemos observar que aparecen tendencias en distintos idiomas (inglés, árabe, japonés...). Si comparamos ambos resultados, vemos que hay una tendencia en común, pero que el resto son distintas. Se trata de "Dumoulin", un ciclista que ha ganado la etapa 14 del Giro de Italia.