# R_9 Twitter Pipeline

In [1]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.functions import col


# Create a spark session (which will run spark jobs)
spark = (
    SparkSession.builder.appName("COMP90024_A2_EDA")
    .config("spark.sql.repl.eagerEval.enabled", True)
    .config("spark.sql.parquet.cacheMetadata", "true")
    .getOrCreate()
)

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


23/05/02 13:32:05 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [2]:
# Read In Original
spark_json = spark.read.json('../data/raw/BigTwitterFile/twitter-huge.json')

                                                                                

### Extract Key Columns

In [3]:
# important features to extract out of the original dataframe
df_selected_columns = spark_json.select('doc._id', 'doc.data.created_at', 'doc.includes', 
                     'doc.data.lang', 'doc.data.text', 'doc.data.author_id')

### Filter no geolocation and non english

In [5]:
# drop columns with no geolocation
column_name = 'includes'
df_drop_no_geo = df_selected_columns.filter(col(column_name).isNotNull())

df_drop_no_geo.show(5)

                                                                                

+-------------------+--------------------+--------------------+----+--------------------+------------------+
|                _id|          created_at|            includes|lang|                text|         author_id|
+-------------------+--------------------+--------------------+----+--------------------+------------------+
|1491734461951909890|2022-02-10T11:23:...|{"places":[{"full...|  hi|@AshramGzb @Ashra...|858950980989140993|
|1491734528779763719|2022-02-10T11:23:...|{"places":[{"full...|  hi|@AshramGzb @naren...|858950980989140993|
|1491567527322808321|2022-02-10T00:19:...|{"places":[{"full...|  en|My life is hittin...|          45472006|
|1491693811663515654|2022-02-10T08:41:...|{"places":[{"full...|  en|@TobyRayEnglish @...|952342256823943168|
|1491674087378219009|2022-02-10T07:23:...|{"places":[{"full...|  en|@JadeArchaeobot @...|          25033901|
+-------------------+--------------------+--------------------+----+--------------------+------------------+
only showing top 5 

In [7]:
# drop non english coliumns
column_name_2 = 'lang'
string_value = 'en'
df_drop_no_geo_eng = df_drop_no_geo.filter(col(column_name_2) == string_value)

df_drop_no_geo_eng.show(1)

+-------------------+--------------------+--------------------+----+--------------------+---------+
|                _id|          created_at|            includes|lang|                text|author_id|
+-------------------+--------------------+--------------------+----+--------------------+---------+
|1491567527322808321|2022-02-10T00:19:...|{"places":[{"full...|  en|My life is hittin...| 45472006|
+-------------------+--------------------+--------------------+----+--------------------+---------+
only showing top 1 row



### Extract Tweets containing AI Keywords

In [1]:
import pandas as pd
words_df = pd.read_excel('../data/raw/Keywords/Voting Keyword.xlsx')

words = list(words_df['keywords'])

In [9]:
# add column for political text
from functools import reduce
from pyspark.sql.functions import when, col, instr

column_name = 'text'
new_column_name = 'is_political'


condition = reduce(lambda a, b: a | b, [instr(col(column_name), kw) > 0 for kw in words])

df_drop_no_geo_eng_withPolCol = df_drop_no_geo_eng.withColumn(new_column_name, when(condition, 1).otherwise(0))

In [11]:
# change to date
from pyspark.sql.functions import substring, to_date

df_drop_no_geo_eng_withPolCol_date = df_drop_no_geo_eng_withPolCol.withColumn("date", to_date(substring("created_at", 1, 10)))

df_drop_no_geo_eng_withPolCol_date.show(5)

+-------------------+--------------------+--------------------+----+--------------------+-------------------+------------+----------+
|                _id|          created_at|            includes|lang|                text|          author_id|is_political|      date|
+-------------------+--------------------+--------------------+----+--------------------+-------------------+------------+----------+
|1491567527322808321|2022-02-10T00:19:...|{"places":[{"full...|  en|My life is hittin...|           45472006|           0|2022-02-10|
|1491693811663515654|2022-02-10T08:41:...|{"places":[{"full...|  en|@TobyRayEnglish @...| 952342256823943168|           0|2022-02-10|
|1491674087378219009|2022-02-10T07:23:...|{"places":[{"full...|  en|@JadeArchaeobot @...|           25033901|           0|2022-02-10|
|1491721359587627008|2022-02-10T10:31:...|{"places":[{"full...|  en|@JadeArchaeobot @...|         3103790508|           0|2022-02-10|
|1491595343838220291|2022-02-10T02:10:...|{"places":[{"full...

In [12]:
from pyspark.sql.functions import regexp_extract

# extract bounding box coord
df_drop_no_geo_eng_withPolCol_date_coord = df_drop_no_geo_eng_withPolCol_date.withColumn("coord", regexp_extract("includes", r"(-?\d+\.\d+,-?\d+\.\d+,-?\d+\.\d+,-?\d+\.\d+)", 1))


In [13]:
from pyspark.sql.functions import split

# get the coordinate x and y for the bounding boxinto into separate columns
df_drop_no_geo_eng_withPolCol_date_coord = df_drop_no_geo_eng_withPolCol_date_coord.withColumn("x1", split("coord", ",").getItem(0).cast('float'))
df_drop_no_geo_eng_withPolCol_date_coord = df_drop_no_geo_eng_withPolCol_date_coord.withColumn("y1", split("coord", ",").getItem(1).cast('float'))
df_drop_no_geo_eng_withPolCol_date_coord = df_drop_no_geo_eng_withPolCol_date_coord.withColumn("x2", split("coord", ",").getItem(2).cast('float'))
df_drop_no_geo_eng_withPolCol_date_coord = df_drop_no_geo_eng_withPolCol_date_coord.withColumn("y2", split("coord", ",").getItem(3).cast('float'))

In [14]:
from pyspark.sql.functions import mean

# get the centre coordinates for the bounding box
df_drop_no_geo_eng_withPolCol_date_coord = df_drop_no_geo_eng_withPolCol_date_coord.withColumn("x_cent", (df_drop_no_geo_eng_withPolCol_date_coord["x1"] + df_drop_no_geo_eng_withPolCol_date_coord["x2"]) / 2)
df_drop_no_geo_eng_withPolCol_date_coord = df_drop_no_geo_eng_withPolCol_date_coord.withColumn("y_cent", (df_drop_no_geo_eng_withPolCol_date_coord["y1"] + df_drop_no_geo_eng_withPolCol_date_coord["y2"]) / 2)

In [16]:
# select the remaining important cols and export
df_drop_no_geo_eng_withPolCol_date_coord = df_drop_no_geo_eng_withPolCol_date_coord.select('_id', 'date', 'x_cent', 'y_cent', 
                     'text', 'author_id', 'is_political')
df_drop_no_geo_eng_withPolCol_date_coord.write.format("parquet").mode("overwrite").save("../data/curated/Twitter/twitter.parquet")

# Sentiment Analysis for text

In [26]:
import pandas as pd

df = pd.read_parquet('../data/curated/Twitter/twitter.parquet')
df

Unnamed: 0,_id,date,x_cent,y_cent,text,author_id,is_political
0,1491567527322808321,2022-02-10,150.520142,-23.339474,My life is hitting a big change soon. Keen to ...,45472006,0
1,1491693811663515654,2022-02-10,153.369354,-27.954222,@TobyRayEnglish @MikeDel21893959 @aSinister @b...,952342256823943168,0
2,1491674087378219009,2022-02-10,151.926880,-27.573589,@JadeArchaeobot @HarvardGSAS oh ewwww,25033901,0
3,1491721359587627008,2022-02-10,115.928314,-32.150101,@JadeArchaeobot @HarvardGSAS I'm so sorry you ...,3103790508,0
4,1491595343838220291,2022-02-10,150.931976,-33.848244,@BehnamAkhavan @EngAustralia @Eng_IT_Sydney @S...,1249497357944672257,0
...,...,...,...,...,...,...,...
2517272,1557517139002925056,2022-08-10,144.954147,-37.824257,Day two has kicked off #EduTECHAU!\n\nMeet our...,1468040114781655040,0
2517273,1557448918329266176,2022-08-10,152.993195,-27.382143,@bluboy43 @TaikaWaititi People will still like...,20742804,0
2517274,1557499571642593280,2022-08-10,150.931976,-33.848244,@AMCELL @puck_fair What’s happening here?,1010068200,0
2517275,1557502623947030528,2022-08-10,150.931976,-33.848244,@AMCELL @puck_fair That’s sad.,1010068200,0


In [38]:
# add sentiment analysis
from nltk.sentiment import SentimentIntensityAnalyzer
import numpy as np

def get_sentiment(x):
    """ Helper to extract sentiment """

    sia = SentimentIntensityAnalyzer()

    sia_out = sia.polarity_scores(x)

    neg = sia_out['neg']
    pos = sia_out['pos']
    compound = sia_out['compound']

    return neg, pos, compound

In [39]:
df[["neg_score", "pos_score", "compound_score", "sentiment"]] = df["text"].apply(lambda x: pd.Series(get_sentiment(x)))
df.to_parquet('../data/curated/twitter_sentiment.parquet')