El día de la semana con mayor número de Tweets de tu cuenta personal. Para ello deberás descargar de tu cuenta tu historial de Tweets, siguiendo las instrucciones siguientes: https://help.twitter.com/es/managing-your-account/how-to-download-your-twitter-archive 
La descarga la habitarán pasadas 24 horas por motivos de seguridad.
Debes buscar el archivo llamado tweet.js, que tiene formato JSON.

Instalar pyspark


In [30]:
# !pip install pyspark

**Creamos el SparkSession**

Una SparkSession es el objeto principal o la base a partir de la cual cuelga toda la funcionalidad de Apache Spark. Es similar al SparkContext de los RDD, pero en este caso, para trabajar con SparkSQL, los DataFrame y DataSet 


In [31]:
from pyspark.sql import SparkSession
spark = SparkSession.builder\
.master("local")\
.appName("Colab")\
.config('spark.ui.port', '4050')\
.getOrCreate()

Leemos los datos del fichero Json, hay que recordar que los ficheros de datos JSON de twitter actualmente son multilinea

Mostramos el esquema que tiene el dataframe obtenido de leer el archivo

In [32]:
all_data = spark.read.option("multiline", "true").json("ejemploTweets.js")

all_data.printSchema()

root
 |-- tweet: struct (nullable = true)
 |    |-- created_at: string (nullable = true)
 |    |-- display_text_range: array (nullable = true)
 |    |    |-- element: string (containsNull = true)
 |    |-- entities: struct (nullable = true)
 |    |    |-- hashtags: array (nullable = true)
 |    |    |    |-- element: struct (containsNull = true)
 |    |    |    |    |-- indices: array (nullable = true)
 |    |    |    |    |    |-- element: string (containsNull = true)
 |    |    |    |    |-- text: string (nullable = true)
 |    |    |-- media: array (nullable = true)
 |    |    |    |-- element: struct (containsNull = true)
 |    |    |    |    |-- display_url: string (nullable = true)
 |    |    |    |    |-- expanded_url: string (nullable = true)
 |    |    |    |    |-- id: string (nullable = true)
 |    |    |    |    |-- id_str: string (nullable = true)
 |    |    |    |    |-- indices: array (nullable = true)
 |    |    |    |    |    |-- element: string (containsNull = true)
 

Tomamos dos campos, uno de ellos en el que aparece la fecha de creación del Tweet y otro por ejemplo el que presenta el texto completo. 'created_at' 'full_text'

In [33]:
data = all_data.select("tweet.created_at", "tweet.full_text")

Para trabajar mostramos la colección completa del dataset por filas (Row)  


In [34]:
data.collect()

[Row(created_at='Sun Nov 25 15:54:43 +0000 2012', full_text='Tweet 1'),
 Row(created_at='Mon Nov 05 23:58:38 +0000 2012', full_text='Tweet 2'),
 Row(created_at='Sat Oct 06 15:12:10 +0000 2012', full_text='Tweet 3'),
 Row(created_at='Wed Aug 29 15:27:42 +0000 2012', full_text='Tweet 4'),
 Row(created_at='Fri Aug 03 16:18:13 +0000 2012', full_text='Tweet 5'),
 Row(created_at='Sun Jul 22 21:31:54 +0000 2012', full_text='Tweet 6'),
 Row(created_at='Sat Jul 21 21:36:20 +0000 2012', full_text='Tweet 7'),
 Row(created_at='Sun Jul 15 15:36:26 +0000 2012', full_text='Tweet 8'),
 Row(created_at='Sun Jul 08 21:24:32 +0000 2012', full_text='Tweet 9'),
 Row(created_at='Mon Jul 02 07:11:46 +0000 2012', full_text='Tweet 10'),
 Row(created_at='Sun Jun 17 20:02:56 +0000 2012', full_text='Tweet 11'),
 Row(created_at='Wed Jun 06 22:02:21 +0000 2012', full_text='Tweet 12'),
 Row(created_at='Sun Jun 03 13:25:18 +0000 2012', full_text='Tweet 13'),
 Row(created_at='Sun Apr 29 16:11:30 +0000 2012', full_text=

Hacemos una prueba mostrando de cada fila sólamente el primer campo con la fecha de creación del Tweet y los tres primeros caracteres correspondientes al día de la semana.

In [35]:
tweets = data.collect()

for row in tweets:

    print("{0} - {1}".format(" ".join(row["created_at"].split(" ")[1:4]), row["created_at"].split(" ")[0]))

Nov 25 15:54:43 - Sun
Nov 05 23:58:38 - Mon
Oct 06 15:12:10 - Sat
Aug 29 15:27:42 - Wed
Aug 03 16:18:13 - Fri
Jul 22 21:31:54 - Sun
Jul 21 21:36:20 - Sat
Jul 15 15:36:26 - Sun
Jul 08 21:24:32 - Sun
Jul 02 07:11:46 - Mon
Jun 17 20:02:56 - Sun
Jun 06 22:02:21 - Wed
Jun 03 13:25:18 - Sun
Apr 29 16:11:30 - Sun
Apr 28 13:42:36 - Sat
Apr 28 08:19:44 - Sat
Mar 17 13:40:15 - Sat
Mar 16 21:46:15 - Fri
May 01 17:28:01 - Tue
Jul 22 21:19:02 - Sat
Jul 22 17:53:46 - Sat
Jul 22 08:04:28 - Sat


Generamos un nuevo dataframe con los tweets reducidos (en este caso vamos a tomar solo los tres primeros caracteres de la primera columna con la fecha de creación, de forma similar a como lo hicimos en el bloque anterior, y el segundo campo sin modificaciones)

In [36]:
from pyspark.sql.functions import *
from pyspark.sql.types import StringType

def firstChars(date):
    
    return " ".join(date.split(" ")[1:4])
    

reduced_tweets = udf(firstChars, StringType())

all_data_2 = data.withColumn("created_at", reduced_tweets("created_at"))

new_df = all_data_2.select("created_at", "full_text")

new_df.show()

+---------------+---------+
|     created_at|full_text|
+---------------+---------+
|Nov 25 15:54:43|  Tweet 1|
|Nov 05 23:58:38|  Tweet 2|
|Oct 06 15:12:10|  Tweet 3|
|Aug 29 15:27:42|  Tweet 4|
|Aug 03 16:18:13|  Tweet 5|
|Jul 22 21:31:54|  Tweet 6|
|Jul 21 21:36:20|  Tweet 7|
|Jul 15 15:36:26|  Tweet 8|
|Jul 08 21:24:32|  Tweet 9|
|Jul 02 07:11:46| Tweet 10|
|Jun 17 20:02:56| Tweet 11|
|Jun 06 22:02:21| Tweet 12|
|Jun 03 13:25:18| Tweet 13|
|Apr 29 16:11:30| Tweet 14|
|Apr 28 13:42:36| Tweet 15|
|Apr 28 08:19:44| Tweet 16|
|Mar 17 13:40:15| Tweet 17|
|Mar 16 21:46:15| Tweet 18|
|May 01 17:28:01| Tweet 19|
|Jul 22 21:19:02| Tweet 20|
+---------------+---------+
only showing top 20 rows



Creamos un nuevo dataframe a partir del anterior renombrando la primera columna por "dias_semana"

In [37]:
new_df = new_df.withColumnRenamed("created_at", "dias_semana")

new_df.show()

+---------------+---------+
|    dias_semana|full_text|
+---------------+---------+
|Nov 25 15:54:43|  Tweet 1|
|Nov 05 23:58:38|  Tweet 2|
|Oct 06 15:12:10|  Tweet 3|
|Aug 29 15:27:42|  Tweet 4|
|Aug 03 16:18:13|  Tweet 5|
|Jul 22 21:31:54|  Tweet 6|
|Jul 21 21:36:20|  Tweet 7|
|Jul 15 15:36:26|  Tweet 8|
|Jul 08 21:24:32|  Tweet 9|
|Jul 02 07:11:46| Tweet 10|
|Jun 17 20:02:56| Tweet 11|
|Jun 06 22:02:21| Tweet 12|
|Jun 03 13:25:18| Tweet 13|
|Apr 29 16:11:30| Tweet 14|
|Apr 28 13:42:36| Tweet 15|
|Apr 28 08:19:44| Tweet 16|
|Mar 17 13:40:15| Tweet 17|
|Mar 16 21:46:15| Tweet 18|
|May 01 17:28:01| Tweet 19|
|Jul 22 21:19:02| Tweet 20|
+---------------+---------+
only showing top 20 rows



Realizamos el conteo total de tweets que hemos realizado

In [38]:
new_df.count()

22

Obtenemos un nuevo dataframe con los tweets agrupados por día de la semana y ordenados de forma descendente

In [39]:
grouped_new_df = new_df.groupBy(["dias_semana"])

grouped_new_df = grouped_new_df.count()

Tomamos el primer elemento del dataframe ordenado y lo mostramos

In [40]:
grouped_new_df.sort("dias_semana").show(1)

+---------------+-----+
|    dias_semana|count|
+---------------+-----+
|Apr 28 08:19:44|    1|
+---------------+-----+
only showing top 1 row



Mostrar un dataframe con todos los datos, que muestre un array de hashtags, el texto completo y el número de likes que tiene cada uno. Que se muestre en vertical y sin truncar el texto.

In [45]:
new_df_2 = all_data.select("tweet.entities.hashtags", "tweet.full_text", "tweet.favorite_count")

new_df_2.show(truncate=False, vertical=True)

-RECORD 0---------------------------------------------------------------------------------------------------------
 hashtags       | []                                                                                              
 full_text      | Tweet 1                                                                                         
 favorite_count | 0                                                                                               
-RECORD 1---------------------------------------------------------------------------------------------------------
 hashtags       | []                                                                                              
 full_text      | Tweet 2                                                                                         
 favorite_count | 0                                                                                               
-RECORD 2-----------------------------------------------------------------------

Obtener a partir del dataframe anterior los tweets ordenados de forma descendente de los que tienen más *likes* a los que menos, mostrando solo el texto del tweet y el número de likes de forma vertical y sin truncar el texto

In [58]:
new_df_2.sort(new_df_2.favorite_count.desc()).select("full_text", "favorite_count").show(truncate=False, vertical=True)

-RECORD 0------------------
 full_text      | Tweet 19 
 favorite_count | 6        
-RECORD 1------------------
 full_text      | Tweet 20 
 favorite_count | 1        
-RECORD 2------------------
 full_text      | Tweet 10 
 favorite_count | 1        
-RECORD 3------------------
 full_text      | Tweet 21 
 favorite_count | 1        
-RECORD 4------------------
 full_text      | Tweet 1  
 favorite_count | 0        
-RECORD 5------------------
 full_text      | Tweet 4  
 favorite_count | 0        
-RECORD 6------------------
 full_text      | Tweet 2  
 favorite_count | 0        
-RECORD 7------------------
 full_text      | Tweet 5  
 favorite_count | 0        
-RECORD 8------------------
 full_text      | Tweet 6  
 favorite_count | 0        
-RECORD 9------------------
 full_text      | Tweet 8  
 favorite_count | 0        
-RECORD 10-----------------
 full_text      | Tweet 3  
 favorite_count | 0        
-RECORD 11-----------------
 full_text      | Tweet 13 
 favorite_count | 0 

Obtener el número máximo de likes que ha obtenido un tweet

In [73]:
new_df_2.select(max("favorite_count")).show()

+-------------------+
|max(favorite_count)|
+-------------------+
|                  6|
+-------------------+



Obtener el total de likes obteidos por el usuario

In [61]:
new_df_2.select(sum("favorite_count")).show()

+-------------------+
|sum(favorite_count)|
+-------------------+
|                9.0|
+-------------------+



Filtrar y obtener sólo los tweets que tengan el máximo de likes, mostrando el texto completo y el número de likes, y comprobar que coincide con el valor obtenido en los bloques anteriores.

In [65]:
from pyspark.sql.functions import *

max_tweets = all_data.filter(all_data.tweet.favorite_count >= 6).select("tweet.full_text", "tweet.favorite_count").show()



+---------+--------------+
|full_text|favorite_count|
+---------+--------------+
| Tweet 19|             6|
+---------+--------------+



Pasar el Dataframe a un RDD, y mostrar los datos eliminando de la cadena desde la posición 15 hasta la penúltima

In [42]:
from pyspark.context import SparkContext



