In [1]:
from pyspark.sql import SparkSession
from datetime import datetime

In [2]:
warehouse_location = '/user/war_tweets'

In [3]:
spark = SparkSession \
    .builder \
    .appName("App de Spark para Tweets") \
    .config("spark.sql.warehouse.dir", warehouse_location) \
    .getOrCreate()

In [4]:
spark

In [17]:
!hdfs dfs -mkdir /user/war_tweets
!wget https://storage.googleapis.com/public-bucket-up-2022/war.txt
!hdfs dfs -copyFromLocal war.txt /user/war_tweets
!hdfs dfs -ls /user/war_tweets

--2022-04-18 17:09:16--  https://storage.googleapis.com/public-bucket-up-2022/war.txt
Resolving storage.googleapis.com (storage.googleapis.com)... 142.251.6.128, 142.250.128.128, 142.250.103.128, ...
Connecting to storage.googleapis.com (storage.googleapis.com)|142.251.6.128|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 23663541134 (22G) [text/plain]
Saving to: ‘war.txt’


2022-04-18 17:12:24 (121 MB/s) - ‘war.txt’ saved [23663541134/23663541134]

Found 1 items
-rw-r--r--   2 root hadoop 23663541134 2022-04-18 17:15 /user/war_tweets/war.txt


In [18]:
path = warehouse_location + "/war.txt"
tweets = spark.read.json(path, lineSep="\n")

22/04/18 17:21:19 WARN org.apache.spark.sql.catalyst.util.package: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.sql.debug.maxToStringFields'.


## Manejando acciones y transformaciones

In [6]:
tweets.columns

['_type',
 'card',
 'cashtags',
 'content',
 'conversationId',
 'coordinates',
 'date',
 'hashtags',
 'id',
 'inReplyToTweetId',
 'inReplyToUser',
 'lang',
 'likeCount',
 'media',
 'mentionedUsers',
 'outlinks',
 'place',
 'quoteCount',
 'quotedTweet',
 'renderedContent',
 'replyCount',
 'retweetCount',
 'retweetedTweet',
 'source',
 'sourceLabel',
 'sourceUrl',
 'tcooutlinks',
 'url',
 'user']

In [7]:
tweets.printSchema()
#este esquema esta inferido, si tenemos muchisimos datos, nosotros tenemos que definir el esquema para que las operaciones sean correctas.

root
 |-- _type: string (nullable = true)
 |-- card: struct (nullable = true)
 |    |-- _type: string (nullable = true)
 |    |-- description: string (nullable = true)
 |    |-- thumbnailUrl: string (nullable = true)
 |    |-- title: string (nullable = true)
 |    |-- url: string (nullable = true)
 |-- cashtags: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- content: string (nullable = true)
 |-- conversationId: long (nullable = true)
 |-- coordinates: struct (nullable = true)
 |    |-- _type: string (nullable = true)
 |    |-- latitude: double (nullable = true)
 |    |-- longitude: double (nullable = true)
 |-- date: string (nullable = true)
 |-- hashtags: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- id: long (nullable = true)
 |-- inReplyToTweetId: long (nullable = true)
 |-- inReplyToUser: struct (nullable = true)
 |    |-- _type: string (nullable = true)
 |    |-- created: string (nullable = true)
 |    |-- description: 

In [8]:
tweets.show()

                                                                                

+--------------------+--------------------+--------+--------------------+-------------------+-----------+--------------------+--------------------+-------------------+-------------------+--------------------+----+---------+--------------------+--------------------+--------------------+-----+----------+--------------------+--------------------+----------+------------+--------------+--------------------+-------------------+--------------------+--------------------+--------------------+--------------------+
|               _type|                card|cashtags|             content|     conversationId|coordinates|                date|            hashtags|                 id|   inReplyToTweetId|       inReplyToUser|lang|likeCount|               media|      mentionedUsers|            outlinks|place|quoteCount|         quotedTweet|     renderedContent|replyCount|retweetCount|retweetedTweet|              source|        sourceLabel|           sourceUrl|         tcooutlinks|                 url|  

In [9]:
#lazy evaluation
tweets.describe()

                                                                                

DataFrame[summary: string, _type: string, content: string, conversationId: string, date: string, id: string, inReplyToTweetId: string, lang: string, likeCount: string, quoteCount: string, renderedContent: string, replyCount: string, retweetCount: string, retweetedTweet: string, source: string, sourceLabel: string, sourceUrl: string, url: string]

In [10]:
#obteniendo los resultados
tweets.describe().show()



+-------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+-------+------------------+-------------------+--------------------+------------------+------------------+--------------+--------------------+--------------------+--------------------+--------------------+
|summary|               _type|             content|      conversationId|                date|                  id|    inReplyToTweetId|   lang|         likeCount|         quoteCount|     renderedContent|        replyCount|      retweetCount|retweetedTweet|              source|         sourceLabel|           sourceUrl|                 url|
+-------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+-------+------------------+-------------------+--------------------+------------------+------------------+--------------+--------------------+--------------------+--------------------

                                                                                

In [38]:
%%time
tweets.count()



CPU times: user 221 ms, sys: 62.3 ms, total: 284 ms
Wall time: 3min 13s


                                                                                

6798932

In [11]:
#se puede obtener un objeto "columna", que es como cuando indizamos dataframes, esto regresa una columna
tweets["content"]

Column<'content'>

In [13]:
tweets.select("content").show(10)

+--------------------+
|             content|
+--------------------+
|Africa's Airline ...|
|@aangelgarciaa90 ...|
|@stevebyrnelive D...|
|@Elsa_Guerra_R @u...|
|@MalarkeyIndeed @...|
|Slovak 💙💛 for U...|
|🇷🇺🇺🇦 | #Guerr...|
|@ANIABELLO_R Veng...|
|@Ali_Optimale @Ia...|
|@DavidSacks Accep...|
+--------------------+
only showing top 10 rows



In [24]:
#pero si se queren ver los resultados, entonces se SELECCIONA la columna, esto regresa un DataFrame
tweets.select("lang").show(10)

+----+
|lang|
+----+
|  en|
|  es|
|  en|
|  es|
|  en|
|  en|
|  es|
|  es|
|  en|
|  en|
+----+
only showing top 10 rows



In [22]:
tweets.filter(tweets.lang == "es").show(10)

[Stage 14:>                                                         (0 + 1) / 1]

+--------------------+--------------------+--------+--------------------+-------------------+-----------+--------------------+--------------------+-------------------+-------------------+--------------------+----+---------+--------------------+--------------------+--------------------+-----+----------+-----------+--------------------+----------+------------+--------------+--------------------+-------------------+--------------------+--------------------+--------------------+--------------------+
|               _type|                card|cashtags|             content|     conversationId|coordinates|                date|            hashtags|                 id|   inReplyToTweetId|       inReplyToUser|lang|likeCount|               media|      mentionedUsers|            outlinks|place|quoteCount|quotedTweet|     renderedContent|replyCount|retweetCount|retweetedTweet|              source|        sourceLabel|           sourceUrl|         tcooutlinks|                 url|                user

                                                                                

In [26]:
tweets.select("user")

DataFrame[user: struct<_type:string,created:string,description:string,descriptionUrls:array<struct<indices:array<bigint>,tcourl:string,text:string,url:string>>,displayname:string,favouritesCount:bigint,followersCount:bigint,friendsCount:bigint,id:bigint,label:struct<_type:string,badgeUrl:string,description:string,longDescription:string,url:string>,linkTcourl:string,linkUrl:string,listedCount:bigint,location:string,mediaCount:bigint,profileBannerUrl:string,profileImageUrl:string,protected:boolean,rawDescription:string,statusesCount:bigint,url:string,username:string,verified:boolean>]

In [28]:
tweets.select("user.created").show(10)

+--------------------+
|             created|
+--------------------+
|2011-09-17T00:58:...|
|2018-07-15T08:39:...|
|2010-08-20T03:45:...|
|2011-06-21T03:18:...|
|2009-09-23T15:17:...|
|2012-01-19T16:34:...|
|2016-07-17T08:56:...|
|2021-09-01T00:41:...|
|2015-03-11T13:41:...|
|2019-06-11T17:06:...|
+--------------------+
only showing top 10 rows



In [16]:
#seleccionando múltiples usuarios
tweets.select(["user.displayname", "user.description"]).show(10)

+--------------------+--------------------+
|         displayname|         description|
+--------------------+--------------------+
|               Skift|Breaking news, in...|
|       Kwondeu♥️💛💜|Portugal, Francia...|
|            Tralfraz|Still trying to f...|
|  Edgar Vega Suriaga|Universidad Andin...|
|     Derisive One 🏴|“The World is my ...|
|        Amy M. Wicks|Carbs, cocktails,...|
|  En Vivo - Noticias|Cubrimiento espec...|
|              Sergio|Amante de la natu...|
|Giedrius Cikanavi...|                    |
|       David Dickson|Space Resources p...|
+--------------------+--------------------+
only showing top 10 rows



In [17]:
#Existe entonces SPARK SQL, que puede manejar los dataframes como si fuera SQL
tweets.createOrReplaceTempView("war_tweets")

In [32]:
spark.sql("SELECT user.displayname, user.description FROM war_tweets").show(10)


+--------------------+--------------------+
|         displayname|         description|
+--------------------+--------------------+
|               Skift|Breaking news, in...|
|       Kwondeu♥️💛💜|Portugal, Francia...|
|            Tralfraz|Still trying to f...|
|  Edgar Vega Suriaga|Universidad Andin...|
|     Derisive One 🏴|“The World is my ...|
|        Amy M. Wicks|Carbs, cocktails,...|
|  En Vivo - Noticias|Cubrimiento espec...|
|              Sergio|Amante de la natu...|
|Giedrius Cikanavi...|                    |
|       David Dickson|Space Resources p...|
+--------------------+--------------------+
only showing top 10 rows



In [33]:
#ahora una con where
spark.sql("SELECT count(*) from war_tweets where retweetCount > 100").show()



+--------+
|count(1)|
+--------+
|   30147|
+--------+



                                                                                

In [30]:
tweets.filter(tweets.retweetCount > 100).count()

                                                                                

30147

In [34]:
#otra manera
tweets.filter("retweetCount > 100").count()

                                                                                

30147

In [None]:
tweets.groupBy("user.id").sum("retweetCount").show()

[Stage 25:>                                                         (0 + 1) / 1]

+----------+-----------------+
|        id|sum(retweetCount)|
+----------+-----------------+
|3859068195|               51|
|1872759138|                0|
|  21056056|                1|
|2247960367|                1|
|2440523790|                9|
| 469323640|                0|
| 271597395|               53|
|   6519522|              126|
| 419133954|               20|
|3372876645|                2|
| 472201774|                2|
|   8275012|                0|
| 636311847|                0|
| 192336675|                0|
| 150783792|                7|
|3430448104|                0|
|2874866717|               23|
|1286880216|                0|
|2560184623|                0|
| 858202051|                8|
+----------+-----------------+
only showing top 20 rows



                                                                                

In [37]:
%%time
from pyspark.sql.functions import sum as _sum
from pyspark.sql.functions import avg as _avg
from pyspark.sql.functions import count as _count
from pyspark.sql.functions import col 

tweets.groupby("user.id") \
    .agg(_count("user.id").alias("total_tweets"),
        _sum("retweetCount").alias("total_retweets"),
         _avg("retweetCount").alias("avg_retweets")) \
    .where( (col("avg_retweets") >= 100) & (col("total_tweets") > 30)).show()
         #(df.state  == "OH") & (df.gender  == "M")

[Stage 31:>                                                         (0 + 1) / 1]

+-------------------+------------+--------------+------------------+
|                 id|total_tweets|total_retweets|      avg_retweets|
+-------------------+------------+--------------+------------------+
| 946018506268700672|          55|         14064|255.70909090909092|
| 818893114979061761|          57|         21977|385.56140350877195|
|           14293310|         111|         11959|107.73873873873873|
|           17384585|          60|         10073|167.88333333333333|
|          314587185|          63|         12635|200.55555555555554|
|          459872442|          83|         18341|220.97590361445782|
|         2467139606|          34|          9308| 273.7647058823529|
|             742143|         144|         42473| 294.9513888888889|
|          153170209|          86|          8957|104.15116279069767|
|           14135350|          64|         17174|         268.34375|
|          613212190|          35|          7414|211.82857142857142|
|          105327432|         116|

                                                                                

In [40]:
spark.sql("""
            select * from 
            (
                select user.id, count(*) as total_tweets, sum(retweetCount) as total_retweets, avg(retweetCount) as avg_retweets
                from war_tweets
                group by user.id 
            ) as s
            where s.avg_retweets >= 100 and s.total_retweets > 30
            """).show()

[Stage 24:>                                                         (0 + 1) / 1]

+-------------------+------------+--------------+------------------+
|                 id|total_tweets|total_retweets|      avg_retweets|
+-------------------+------------+--------------+------------------+
|1399468386183172097|           1|           117|             117.0|
|1329493178416721920|           3|          1747| 582.3333333333334|
| 946018506268700672|          55|         14064|255.70909090909092|
|1486369615487852547|           1|           151|             151.0|
|1083551928821448710|           5|           957|             191.4|
|1238583088998830081|          21|         24048| 1145.142857142857|
|          277587469|          56|         14447|257.98214285714283|
|           22771961|          10|          6364|             636.4|
|         3028689160|           1|           118|             118.0|
|1088371662108676096|           5|          4963|             992.6|
|1430211561491206147|          30|          7238|241.26666666666668|
|           16193763|           3|

                                                                                

In [55]:
#fechas
from pyspark.sql.functions import (dayofmonth, hour,  
                                   dayofyear, month, year,
                                   weekofyear, 
                                   format_number,
                                   date_format, datediff, current_date)
tweets.select(['date', \
               dayofmonth(tweets['date']), \
               hour(tweets['date']), \
               month(tweets['date']), \
               year(tweets['date']), \
               weekofyear(tweets['date'])]).show()

+--------------------+----------------+----------+-----------+----------+----------------+
|                date|dayofmonth(date)|hour(date)|month(date)|year(date)|weekofyear(date)|
+--------------------+----------------+----------+-----------+----------+----------------+
|2022-03-27T23:59:...|              27|        23|          3|      2022|              12|
|2022-03-27T23:59:...|              27|        23|          3|      2022|              12|
|2022-03-27T23:59:...|              27|        23|          3|      2022|              12|
|2022-03-27T23:59:...|              27|        23|          3|      2022|              12|
|2022-03-27T23:59:...|              27|        23|          3|      2022|              12|
|2022-03-27T23:59:...|              27|        23|          3|      2022|              12|
|2022-03-27T23:59:...|              27|        23|          3|      2022|              12|
|2022-03-27T23:59:...|              27|        23|          3|      2022|              12|

In [61]:
#https://sparkbyexamples.com/pyspark/pyspark-sql-date-and-timestamp-functions/

tweets.select('user.created', current_date().alias("current_date"), datediff(current_date(), tweets["user.created"]).alias("difference")).show()

+--------------------+------------+----------+
|             created|current_date|difference|
+--------------------+------------+----------+
|2011-09-17T00:58:...|  2022-04-14|      3862|
|2018-07-15T08:39:...|  2022-04-14|      1369|
|2010-08-20T03:45:...|  2022-04-14|      4255|
|2011-06-21T03:18:...|  2022-04-14|      3950|
|2009-09-23T15:17:...|  2022-04-14|      4586|
|2012-01-19T16:34:...|  2022-04-14|      3738|
|2016-07-17T08:56:...|  2022-04-14|      2097|
|2021-09-01T00:41:...|  2022-04-14|       225|
|2015-03-11T13:41:...|  2022-04-14|      2591|
|2019-06-11T17:06:...|  2022-04-14|      1038|
|2014-09-16T03:53:...|  2022-04-14|      2767|
|2017-03-27T14:33:...|  2022-04-14|      1844|
|2019-12-11T07:36:...|  2022-04-14|       855|
|2009-07-01T23:59:...|  2022-04-14|      4670|
|2020-03-10T14:20:...|  2022-04-14|       765|
|2022-03-20T20:52:...|  2022-04-14|        25|
|2012-07-31T18:53:...|  2022-04-14|      3544|
|2014-11-22T16:30:...|  2022-04-14|      2700|
|2020-05-20T1

In [62]:
#en años
tweets.select('user.created', current_date().alias("current_date"), (datediff(current_date(), tweets["user.created"])/365.25).alias("difference")).show()

+--------------------+------------+-------------------+
|             created|current_date|         difference|
+--------------------+------------+-------------------+
|2011-09-17T00:58:...|  2022-04-14| 10.573579739904176|
|2018-07-15T08:39:...|  2022-04-14| 3.7481177275838466|
|2010-08-20T03:45:...|  2022-04-14| 11.649555099247092|
|2011-06-21T03:18:...|  2022-04-14|   10.8145106091718|
|2009-09-23T15:17:...|  2022-04-14| 12.555783709787816|
|2012-01-19T16:34:...|  2022-04-14| 10.234086242299794|
|2016-07-17T08:56:...|  2022-04-14| 5.7412731006160165|
|2021-09-01T00:41:...|  2022-04-14| 0.6160164271047228|
|2015-03-11T13:41:...|  2022-04-14|  7.093771389459275|
|2019-06-11T17:06:...|  2022-04-14|  2.841889117043121|
|2014-09-16T03:53:...|  2022-04-14|  7.575633127994524|
|2017-03-27T14:33:...|  2022-04-14|  5.048596851471594|
|2019-12-11T07:36:...|  2022-04-14| 2.3408624229979464|
|2009-07-01T23:59:...|  2022-04-14| 12.785763175906913|
|2020-03-10T14:20:...|  2022-04-14| 2.0944558521

In [None]:
#ejemplos
#ttps://sparkbyexamples.com/

#documentación SPARK
#ttps://spark.apache.org/docs/

## Tarea:

*    Obtener la fecha mínima y la fecha máxima disponible en el dataset
*    Obtener el usuario ('display name' requerido) más popular el día 27 de marzo
*    Obtener los hashtagas más populares, además de "Putin" "Ucrania" y "Ukraine" (**Bonus:** hacer un gráfico de barras con el top 10)
*    ¿Cuántos usuarios tienen tweets repetidos?

# **LA LINEA MÁS IMPORTANTE DE TODO EL CÓDIGO!!!!!!**

In [19]:
spark.stop()