In [1]:
import os
import pandas as pd

from pyspark import SparkContext, SparkConf
from pyspark.sql import SparkSession
import pyspark.sql.functions as F

import sagemaker
from sagemaker import get_execution_role
import sagemaker_pyspark

import boto3

role = get_execution_role()

In [2]:
jars = sagemaker_pyspark.classpath_jars()
classpath = ":".join(sagemaker_pyspark.classpath_jars())
spark = (SparkSession.builder
         .config("spark.driver.extraClassPath", classpath)
         .config("spark.executor.memory","60g")
         .config("spark.driver.memory","60g")
         .config("spark.driver.maxResultSize", "0")
         .master("local[*]").getOrCreate())

spark.conf.set("spark.sql.caseSensitive", "true")

In [3]:
spark

### Dataset original

In [4]:
!aws s3api list-objects-v2 --bucket politicos-dataset-us-east-1 --prefix "3-tweets/2020/10/13/22/" --max-keys 10 --query 'Contents[].{Archivo: Key, Size: Size}' --output table

-----------------------------------------------------------------------------------------------------------------------------------------
[0m[0m|                                                             ListObjectsV2                                                             |[0m[0m
[0m[0m+----------------------------------------------------------------------------------------------------------------------------+----------+[0m[0m
[0m[0m|                                                           Archivo                                                          |  Size    |[0m[0m
[0m[0m+----------------------------------------------------------------------------------------------------------------------------+----------+[0m[0m
[0m[0m|  [1m[34m3-tweets/2020/10/13/22/build-dataset-tweetstream-UglIqrgbkwTD-3-2020-10-13-22-00-28-0c6fecce-5bba-439d-be4b-1446a64d26f4[0m  |  [1m[34m483909[0m  |[0m[0m
[0m[0m|  [1m[34m3-tweets/2020/10/13/22/build-dataset-tweetstre

In [5]:
!aws s3api list-objects-v2 --bucket politicos-dataset-us-east-1 --prefix "4-augmented-tweets/2020/10/13/23/" --max-keys 10 --query 'Contents[].{Archivo: Key, Size: Size}' --output table

------------------------------------------------------------------------------------------------------------------------------------------------------------
[0m[0m|                                                                       ListObjectsV2                                                                      |[0m[0m
[0m[0m+-----------------------------------------------------------------------------------------------------------------------------------------------+----------+[0m[0m
[0m[0m|                                                                    Archivo                                                                    |  Size    |[0m[0m
[0m[0m+-----------------------------------------------------------------------------------------------------------------------------------------------+----------+[0m[0m
[0m[0m|  [1m[34m4-augmented-tweets/2020/10/13/23/build-dataset-augmentedtweetstream-Y9b0NIMbBwld-3-2020-10-13-23-00-21-f0a5fb9a-3c9d-4edb-88e8-a2eb

In [6]:
bucket='politicos-dataset-us-east-1'
region = 'us-east-1'
accounts_prefix = '2-valid-twitter-accounts/'
tweets_prefix = '4-augmented-tweets'

### Carga de los datos de Políticos

In [7]:
spark._jsc.hadoopConfiguration().set('fs.s3a.endpoint', 's3.{}.amazonaws.com'.format(region))
valid_accounts = (spark.read.format("csv")
                    .option("inferSchema",True) 
                    .option("header",True)
                    .option("delimiter",";")
                    .load('s3a://{}/{}/'.format(bucket, accounts_prefix)))
valid_accounts.printSchema()
valid_accounts.show(1, False, True)

root
 |-- target_id: integer (nullable = true)
 |-- Cargo: string (nullable = true)
 |-- Nombre: string (nullable = true)
 |-- screen_name: string (nullable = true)
 |-- Partido: string (nullable = true)
 |-- Coalicion: string (nullable = true)
 |-- newest_tweet: string (nullable = true)
 |-- oldest_tweet: string (nullable = true)
 |-- since_id: string (nullable = true)
 |-- max_id: string (nullable = true)

-RECORD 0----------------------------------
 target_id    | 1                          
 Cargo        | Presidente de la República 
 Nombre       | Sebastián Piñera Echenique 
 screen_name  | sebastianpinera            
 Partido      | RN                         
 Coalicion    | Chile Vamos                
 newest_tweet | null                       
 oldest_tweet | null                       
 since_id     | null                       
 max_id       | null                       
only showing top 1 row



### Carga de los Tweets

In [8]:
%%time
tweets = (spark.read.format("json")
                    .option("inferSchema",True) 
                    .load('s3a://{}/{}/2020/*/*/*/'.format(bucket, tweets_prefix)))

CPU times: user 9.54 ms, sys: 969 µs, total: 10.5 ms
Wall time: 1min


In [9]:
tweets.printSchema()
tweets.show(1, False, True)

root
 |-- Cargo: string (nullable = true)
 |-- NLP_FEATURES: struct (nullable = true)
 |    |-- entities: array (nullable = true)
 |    |    |-- element: struct (containsNull = true)
 |    |    |    |-- BeginOffset: long (nullable = true)
 |    |    |    |-- EndOffset: long (nullable = true)
 |    |    |    |-- Score: double (nullable = true)
 |    |    |    |-- Text: string (nullable = true)
 |    |    |    |-- Type: string (nullable = true)
 |    |-- key_phrases: array (nullable = true)
 |    |    |-- element: struct (containsNull = true)
 |    |    |    |-- BeginOffset: long (nullable = true)
 |    |    |    |-- EndOffset: long (nullable = true)
 |    |    |    |-- Score: double (nullable = true)
 |    |    |    |-- Text: string (nullable = true)
 |    |-- sentiment: string (nullable = true)
 |    |-- sentiment_score: struct (nullable = true)
 |    |    |-- Mixed: double (nullable = true)
 |    |    |-- Negative: double (nullable = true)
 |    |    |-- Neutral: double (nullable = tr

In [10]:
print("Count: " + str(tweets.count()))

Count: 1173079


### Remover Duplicados

In [11]:
tweets = tweets.dropDuplicates(['id_str'])
print("Count: " + str(tweets.count()))

Count: 1078000


### Removeremos estados de Cita (quoted) o Retweets

In [12]:
pure_tweets = tweets.filter((tweets['quoted_text']== '') & (tweets['retweeted_text']== ''))
pure_tweets.count()

488541

In [13]:
pure_tweets.groupby('quoted_text').agg(F.count('id_str')).toPandas()

Unnamed: 0,quoted_text,count(id_str)
0,,488541


In [14]:
pure_tweets.groupby('retweeted_text').agg(F.count('id_str')).toPandas()

Unnamed: 0,retweeted_text,count(id_str)
0,,488541


### Limpieza de columnas 

In [15]:
just_needed = pure_tweets.select('created_at', 'id_str', 'hashtags', 'screen_name', 'target_id', 'text', 'NLP_FEATURES.entities', 'NLP_FEATURES.key_phrases', 'NLP_FEATURES.sentiment')

In [16]:
just_needed.show(3)

+-------------------+-------------------+--------+---------------+---------+--------------------+--------------------+--------------------+---------+
|         created_at|             id_str|hashtags|    screen_name|target_id|                text|            entities|         key_phrases|sentiment|
+-------------------+-------------------+--------+---------------+---------+--------------------+--------------------+--------------------+---------+
|2018/05/25 14:35:01|1000022428632932352|      []|MirthArancibiaC|       84|https://t.co/tALH...|[[0, 23, 0.999001...|                  []|  NEUTRAL|
|2018/05/25 18:07:01|1000075779777204224|      []|rodrigodiputado|      242|https://t.co/C73v...|[[0, 23, 0.998825...|                  []|  NEUTRAL|
|2018/05/25 18:31:37|1000081972276981762|      []| camila_vallejo|      682|Las estudiantes d...|[[22, 24, 0.98871...|[[0, 15, 0.999974...|  NEUTRAL|
+-------------------+-------------------+--------+---------------+---------+--------------------+---

### Filtrado Por Fecha

In [17]:
dated_tweets = (
    just_needed
    .withColumn('dt', F.to_timestamp(just_needed.created_at, 'yyyy/MM/dd HH:mm:ss'))
    .drop('created_at')
)
dated_tweets.show(3)

+-------------------+--------+---------------+---------+--------------------+--------------------+--------------------+---------+-------------------+
|             id_str|hashtags|    screen_name|target_id|                text|            entities|         key_phrases|sentiment|                 dt|
+-------------------+--------+---------------+---------+--------------------+--------------------+--------------------+---------+-------------------+
|1000022428632932352|      []|MirthArancibiaC|       84|https://t.co/tALH...|[[0, 23, 0.999001...|                  []|  NEUTRAL|2018-05-25 14:35:01|
|1000075779777204224|      []|rodrigodiputado|      242|https://t.co/C73v...|[[0, 23, 0.998825...|                  []|  NEUTRAL|2018-05-25 18:07:01|
|1000081972276981762|      []| camila_vallejo|      682|Las estudiantes d...|[[22, 24, 0.98871...|[[0, 15, 0.999974...|  NEUTRAL|2018-05-25 18:31:37|
+-------------------+--------+---------------+---------+--------------------+--------------------+--

In [18]:
import datetime 
days = 500 #Para relevancia reciente

maxima_fecha=dated_tweets.select("dt").rdd.max()[0]
since_date = maxima_fecha-datetime.timedelta(days=days)

print ('- Fecha del último tweet: {}'.format(maxima_fecha))
print ('- Fecha del primer tweet: {} ({} dias)'.format(since_date, days))

- Fecha del último tweet: 2020-10-13 22:15:00
- Fecha del primer tweet: 2019-06-01 22:15:00 (500 dias)


In [19]:
tweets_SDF = dated_tweets.filter(F.col('dt')>since_date)
tweets_SDF.count()

239892

### Join con información políticos

In [20]:
partidos = valid_accounts.select('screen_name', 'Partido', 'Coalicion').withColumn('sn', F.col('screen_name')).drop('screen_name')
partidos.show(3)

+-------+-----------+---------------+
|Partido|  Coalicion|             sn|
+-------+-----------+---------------+
|     RN|Chile Vamos|sebastianpinera|
|EVOPOLI|Chile Vamos|        gblumel|
|     RN|Chile Vamos| TeodoroRiberaN|
+-------+-----------+---------------+
only showing top 3 rows



In [21]:
tweets_politicos_SDF = tweets_SDF.join(partidos, tweets_SDF.screen_name == partidos.sn).drop('sn')
tweets_politicos_SDF.show(3)

+-------------------+--------------+---------------+---------+--------------------+--------------------+--------------------+---------+-------------------+-------+-------------+
|             id_str|      hashtags|    screen_name|target_id|                text|            entities|         key_phrases|sentiment|                 dt|Partido|    Coalicion|
+-------------------+--------------+---------------+---------+--------------------+--------------------+--------------------+---------+-------------------+-------+-------------+
|1135182089081577472|            []|     Raul_Soto1|      290|3."Consejo Técnic...|[[0, 1, 0.7970888...|[[0, 1, 0.9998706...| NEGATIVE|2019-06-02 13:51:15|     DC|Nueva Mayoría|
|1135183693323079680|[ProviRun2019]|  evelynmatthei|      631|Aquí las primeras...|[[9, 26, 0.977073...|[[5, 26, 0.999997...| POSITIVE|2019-06-02 13:57:37|    UDI|  Chile Vamos|
|1135197237108203520|            []|diputadonaranjo|      344|Pdte Piñera se ha...|[[0, 11, 0.868242...|[[0, 1

### Limpieza de los Links dentro del text

In [22]:
df = tweets_politicos_SDF.withColumn('text_clean', F.regexp_replace('text', '(https?:\/\/t\.co\/)([\w]*)', ''))

In [23]:
df.select('text','text_clean').show(3, False, True)

-RECORD 0--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
 text       | 3."Consejo Técnico" q efectivamente puede tener carácter público, es distinto al "Ente Estatal" para administrar el 4% adicional que pedimos en trabajo del @PDC_Chile y la oposición. Gob necesitaba q alguien se lo adjudicara y lo hicieron pasar por tal, @DiputadosDC piso palito 
 text_clean | 3."Consejo Técnico" q efectivamente puede tener carácter público, es distinto al "Ente Estatal" para administrar el 4% adicional que pedimos en trabajo del @PDC_Chile y la oposición. Gob necesitaba q alguien se lo adjudicara y lo hicieron pasar por tal, @DiputadosDC piso palito 
-RECORD 1-------------------------------------------------------------------------------------------------------------

In [24]:
## Veamos cuantas filas quedaron con 0 texto
df.filter((df['text_clean']== '')).count()

1851

### Tokenizacion para filtrado por largo de texto

In [25]:
from pyspark.ml.feature import Tokenizer
Tok=Tokenizer().setInputCol("text_clean").setOutputCol("tokenized")
df_tok=Tok.transform(df)
df_tok.show(2, False, True)

-RECORD 0-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
 id_str      | 1135182089081577472                                                                                                                                                                                                                                                                                                                
 hashtags    | []                                                                                                                                                                                                                                                                                                                 

In [26]:
df_final = df_tok.filter(F.size('tokenized')>5) #Solo tweets con más de 5 Tokens que puedan decir algo

In [27]:
df_final.count()

170306

### Checkpoint

In [28]:
df_final.write.mode('overwrite').parquet('tweets_politicos')

In [29]:
df_final = spark.read.parquet('tweets_politicos')

In [30]:
bucket = 'politicos-dataset-us-east-1'
region = 'us-east-1'
prefix = 'gpt2_fine_tune/tweets_politicos'
tweets = spark.read.parquet('tweets_politicos')
tweets.write.mode('overwrite').parquet('s3a://{}/{}/parquet'.format(bucket,prefix))