# PEC 3: Trending Topics

## 1. Trending Topics Básico

Vamos a aprovechar lo visto sobre Structured Streaming para implementar una versión muy sencilla de "trending topics".

En el directorio `data/tweets` existen una serie de ficheros CSV. Cada línea de los CSV corresponde a un tweet.

Lo que tienes que hacer es un "job" de Spark Structured Streaming que vaya procesando los ficheros uno a uno, y mantenga una tabla en memoria con los 20 términos más comentados.

Vamos a considerar que los tweets nos llegan en el orden correcto, es decir, no es necesario tener en cuenta el "timestamp" de los datos.

**Ten en cuenta que hay que resolver este ejercicio usando Structured Streaming**, no se trata de hacer un job Spark estático.

El resultado de los cálculos sobre el stream debes almacenarlos en una tabla en memoria `trending`.

Por lo tanto, la siguiente query debe ser la que finalmente muestre el resultado (que irá cambiando con el tiempo, según se van procesando nuevos ficheros):

```python
spark.sql("select * from trending").show()
```


In [1]:
import findspark
findspark.init()

import pyspark
from pyspark.sql.functions import explode, split
#spark = pyspark.sql.SparkSession.builder.getOrCreate()

sc = pyspark.SparkContext(appName='app15')
spark = pyspark.SQLContext(sc)

In [2]:
static = spark.read.csv("data/tweets",sep=";",header=True)

In [3]:
static.printSchema()

root
 |-- username: string (nullable = true)
 |-- date: string (nullable = true)
 |-- retweets: string (nullable = true)
 |-- favorites: string (nullable = true)
 |-- text: string (nullable = true)
 |-- geo: string (nullable = true)
 |-- mentions: string (nullable = true)
 |-- hashtags: string (nullable = true)
 |-- id: string (nullable = true)
 |-- permalink: string (nullable = true)



In [4]:
static.take(1)

[Row(username='librarian2277', date='2017-10-14 00:54', retweets='0', favorites='0', text='How to write blog headlines that drive search traffic https:// searchenginewatch.com/2017/10/07/how -to-write-blog-headlines-that-drive-search-traffic/ …', geo=None, mentions=None, hashtags=None, id='919063962876104704', permalink='https://twitter.com/librarian2277/status/919063962876104704')]

In [5]:
lines = spark.readStream.schema(static.schema).option("maxFilesPerTrigger", 1)\
                 .csv("data/tweets")

In [9]:
words = lines.select(
   explode(
       split(lines.text, " ")
   ).alias("word")
)

In [11]:
from pyspark.sql.functions import col,lower
    
words = words.withColumn("word",lower(col("word")))

In [13]:
#activityCounts = streaming.groupBy("mentions").count()
wordCounts = words.groupBy("word").count().orderBy("count",ascending=False)

In [14]:
activityQuery = wordCounts.writeStream.queryName("trending")\
                .format("memory")\
                .outputMode("complete").start()

In [15]:
spark.streams.active

[<pyspark.sql.streaming.StreamingQuery at 0x794f2680b8>]

In [53]:
spark.sql("select * from trending").show(100)

+--------------------+-----+
|                word|count|
+--------------------+-----+
|                    |  152|
|                 and|   19|
|              search|   15|
|                 for|   13|
|                 the|   13|
|                   i|   12|
|                   a|   12|
|            https://|   11|
|             speech.|   11|
|            feminism|   11|
|                  to|   11|
|                  my|    9|
|                poor|    6|
|             http://|    6|
|                  of|    6|
|                this|    6|
|                root|    5|
|                  it|    5|
|               canal|    5|
|                 you|    4|
|                  is|    4|
|                what|    4|
|                 all|    4|
|                know|    4|
|              #hacks|    3|
|                 who|    3|
|               don't|    3|
|do";;;;"917692622...|    3|
|          #multiples|    3|
|                  in|    3|
|                they|    3|
|             

In [None]:
activityQuery.close()

## 2. Eliminando "stopwords"
Ahora el objetivo es sacar de nuevo los trending topics, pero evitando incluir "stopwords". Para ello debes utilizar los datos en `data/stopwords`.

Las "stopwords" son palabras comunes que no aportan mucho significado.

Debes realizar el mismo cálculo de antes sobre el flujo de tweets (de nuevo procesando los ficheros uno a uno), pero esta vez debes evitar que aparezcan stopwords como trending topics. (Ten en cuenta que no deben aparecer stopwords en ninguna combinación de mayúsculas/minúsculas)

In [12]:
txt = open("data/stopwords/stopwords.txt")

In [13]:
stopwords = []
for fila in txt:
    stopwords.append(fila.lower().strip())

In [22]:
words2 = lines.select(
    explode(
       split(lines.text, " ")
   ).alias("word")
)

In [23]:
words2 = words.withColumn("word",lower(col("word")))

In [24]:
wordCounts2 = words2.groupBy("word").count().orderBy("count",ascending=False)

In [25]:
from pyspark.sql.functions import col
wordCounts3 = wordCounts2.filter(col("word").isin(stopwords)==False)

In [26]:
activityQuery2 = wordCounts3.writeStream.queryName("trending2").format("memory").outputMode("complete").start()

In [149]:
spark.sql("select * from trending2  limit 20").show()

+--------------------+-----+
|                word|count|
+--------------------+-----+
|                    |  152|
|              search|   15|
|            https://|   11|
|             speech.|   11|
|            feminism|   11|
|                poor|    6|
|             http://|    6|
|                root|    5|
|               canal|    5|
|              #hacks|    3|
|               don't|    3|
|do";;;;"917692622...|    3|
|          #multiples|    3|
|               can't|    3|
|                   …|    3|
|           #children|    3|
|          #parenting|    3|
|                   &|    3|
|              afford|    3|
|             stomach|    2|
+--------------------+-----+



In [None]:
activityQuery2.stop()

## 3. Evitar signos de puntuación
Ya hemos eliminado las stopwords, ahora sólo nos queda evitar que en nuestros trending topics aparezcan palabras que consisten sólo en signos de puntuación.
¿Cómo lo harías?

In [88]:
import string
from pyspark.sql.functions import regexp_replace
from pyspark.sql.functions import col, lower

In [89]:
words = lines.select(
    explode(
       split(lines.text, " ")
   ).alias("word")
)

In [90]:
words = words.withColumn("word",lower(col("word")))

In [91]:
words3 = words.withColumn("word",regexp_replace(col("word"), '[^a-z]', ''))

In [92]:
wordCounts3 = words3.groupBy("word").count().orderBy("count",ascending=False)

In [93]:
wordCounts3 = wordCounts3.filter(col("word").isin(stopwords)==False)

In [94]:
activityQuery3 = wordCounts3.writeStream.queryName("trending7").format("memory").outputMode("complete").start()

In [99]:
spark.sql("select * from trending7 limit 20").show()

+--------------------+-----+
|                word|count|
+--------------------+-----+
|                    |  184|
|              search|   16|
|                 cnn|   11|
|bbchttpstwitterco...|   11|
|            feminism|   11|
|              speech|   11|
|               https|   11|
|                poor|    7|
|                http|    6|
|                root|    5|
|               canal|    5|
|dohttpstwittercom...|    3|
|           multiples|    3|
|           parenting|    3|
|            children|    3|
|            implants|    3|
|              dental|    3|
|              afford|    3|
|                dont|    3|
|               hacks|    3|
+--------------------+-----+



In [84]:
activityQuery3.stop()