# Analysing IRA tweets

In [1]:
import pandas as pd
import numpy as np
import scipy as sp
import seaborn as sns
import re
import matplotlib.pyplot as plt
import glob

import findspark
findspark.init()

from pyspark import SparkContext

from pyspark.sql import *
from pyspark.sql.functions import to_timestamp
from pyspark.mllib.stat import Statistics
from pyspark.sql.functions import explode

%matplotlib inline

spark = SparkSession.builder.getOrCreate()

In [2]:
# Set up data directory
DATA_DIR = 'data_new/'

## 1. Getting to know our main dataset

### Reading the data

In [3]:
tweets_text_df = spark.read.format("csv").option("header", "true").load(DATA_DIR+'iran_troll_tweet_text.csv')
tweets_stats_df = spark.read.format("csv").option("header", "true").load(DATA_DIR+'iran_troll_tweet_stats.csv')
tweets_meta_df = spark.read.format("csv").option("header", "true").load(DATA_DIR+'iran_troll_tweet_metadata.csv')
tweets_user_df = spark.read.format("csv").option("header", "true").load(DATA_DIR+'iran_troll_user.csv')

### tweets_text

In [4]:
print((tweets_text_df.count(), len(tweets_text_df.columns)))
tweets_text_df.show(10)

(1122936, 3)
+------------------+--------------+--------------------+
|           tweetid|tweet_language|          tweet_text|
+------------------+--------------+--------------------+
|533622371429543936|            fr|@bellisarobz Ces ...|
|527205814906654721|            en|@ParkerLampe An i...|
|545166827350134784|            en|@hadeelhmaidi @wo...|
|538045437316321280|            fr|@MartinYannis l'a...|
|530053681668841472|            fr|@courrierinter Le...|
|479670430911836160|            en|@irfhabib why bok...|
|526450009382719488|            en|@KhushbuCNN ISIS ...|
|525593430731157504|            en|@placesbrands Tur...|
|617751516947030016|            fr|@fentychuck L’Uni...|
|612565807911030784|            fr|@PringlesNico pou...|
+------------------+--------------+--------------------+
only showing top 10 rows



Unknown *tweet_language* can take both the value 'und', or null. We harmonize this column by setting all NaN to 'und'.

In [5]:
tweets_text_df = tweets_text_df.fillna('und',['tweet_language'])

### tweets_stats

In [6]:
print((tweets_stats_df.count(), len(tweets_stats_df.columns)))
tweets_stats_df.dtypes

(1122936, 17)


[('tweetid', 'string'),
 ('userid', 'string'),
 ('tweet_time', 'string'),
 ('in_reply_to_tweetid', 'string'),
 ('in_reply_to_userid', 'string'),
 ('quoted_tweet_tweetid', 'string'),
 ('is_retweet', 'string'),
 ('retweet_userid', 'string'),
 ('retweet_tweetid', 'string'),
 ('quote_count', 'string'),
 ('reply_count', 'string'),
 ('like_count', 'string'),
 ('retweet_count', 'string'),
 ('hashtags', 'string'),
 ('urls', 'string'),
 ('user_mentions', 'string'),
 ('poll_choices', 'string')]

We first convert the *tweet_time* into Datetime for ease of use, and we cast some columns into integers. We also create a static sql view of the main dataframe on which we can apply our SQL queries.

In [7]:
tweets_stats_df = tweets_stats_df.withColumn('tweet_time', to_timestamp(tweets_stats_df.tweet_time))
tweets_stats_df = tweets_stats_df.withColumn('quote_count', tweets_stats_df.quote_count.cast('int'))
tweets_stats_df = tweets_stats_df.withColumn('reply_count', tweets_stats_df.reply_count.cast('int'))
tweets_stats_df = tweets_stats_df.withColumn('like_count', tweets_stats_df.like_count.cast('int'))
tweets_stats_df = tweets_stats_df.withColumn('retweet_count', tweets_stats_df.retweet_count.cast('int'))
tweets_stats_df.createOrReplaceTempView("tweets_stats_sql")

We can now start splitting the data into smaller dataframes and remove the useless columns for each of those:
* **retweets_df** contains all the posts that are retweets.
* **replies_df** contains all the posts that are replies to other tweets.
* **normal_tweets_df** contains all the other ('normal') posts.

**NB:** some tweets have a value for *in_reply_to_userid* while their *in_reply_to_tweetid* is null (however the inverse never happens). Those are either replies to deleted tweets, or mentions of other users that were treated as replies. We decided to consider them as normal tweets.

In [8]:
# RETWEETS
retweets_df = spark.sql("SELECT * FROM tweets_stats_sql WHERE is_retweet=True")

# To understand how we selected the columns to remove, uncomment the next two lines
#for col in retweets_df:
#    retweets_df.select(col).distinct().show()

retweets_df = retweets_df.drop('in_reply_to_tweetid', 'in_reply_to_userid', 'is_retweet', 'quote_count', 'reply_count', 'like_count', 'retweet_count', 'poll_choices')
print((retweets_df.count(), len(retweets_df.columns)))

(232337, 9)


In [9]:
# REPLIES
replies_df = spark.sql("SELECT * FROM tweets_stats_sql WHERE is_retweet=False AND in_reply_to_tweetid IS NOT NULL")
replies_df = replies_df.drop('retweet_tweetid', 'retweet_userid', 'is_retweet')
print((replies_df.count(), len(replies_df.columns)))

(339350, 14)


In [10]:
# NORMAL
normal_tweets_df = spark.sql("SELECT * FROM tweets_stats_sql WHERE is_retweet=False AND in_reply_to_tweetid IS NULL")
normal_tweets_df = normal_tweets_df.drop('in_reply_to_tweetid', 'retweet_tweetid', 'retweet_userid', 'is_retweet')
print((normal_tweets_df.count(), len(normal_tweets_df.columns)))

(551249, 13)


We verify that the number of rows correspond and that we did not duplicate or remove any by accident.

In [12]:
print(tweets_stats_df.count(), retweets_df.count()+normal_tweets_df.count()+replies_df.count())

1122936 1122936


### tweets_meta

In [13]:
print((tweets_meta_df.count(), len(tweets_meta_df.columns)))
tweets_meta_df.show(10)

(1122936, 6)
+------------------+--------------+---------------+--------+---------+------------------+
|           tweetid|follower_count|following_count|latitude|longitude| tweet_client_name|
+------------------+--------------+---------------+--------+---------+------------------+
|533622371429543936|          8012|           1450|    null|     null|Twitter Web Client|
|527205814906654721|          8012|           1450|    null|     null|Twitter Web Client|
|545166827350134784|          8012|           1450|    null|     null|Twitter Web Client|
|538045437316321280|          8012|           1450|    null|     null|Twitter Web Client|
|530053681668841472|          8012|           1450|    null|     null|Twitter Web Client|
|479670430911836160|          8012|           1450|    null|     null|Twitter Web Client|
|526450009382719488|          8012|           1450|    null|     null|Twitter Web Client|
|525593430731157504|          8012|           1450|    null|     null|Twitter Web Clien

In [14]:
tweets_meta_df.createOrReplaceTempView("tweets_meta_sql")

It appears that the number of rows with a non-null *latitude*/*longitude* combination is very small compared to the size of dataset. Furthermore, several of them are repeated. We thus consider it rather useless and prefer dropping it.

In [15]:
temp = spark.sql("SELECT * FROM tweets_meta_sql WHERE latitude IS NOT NULL")
print(tweets_meta_df.count(), temp.count(), temp.select('latitude', 'longitude').distinct().count())

tweets_meta_df = tweets_meta_df.drop('latitude', 'longitude')
tweets_meta_df.createOrReplaceTempView("tweets_meta_sql")

1122936 32 20


In [16]:
# Possibility to discriminate on tweet_client_name
temp = spark.sql("SELECT * FROM tweets_meta_sql WHERE tweet_client_name='Twitter Web Client'")
print(tweets_meta_df.count(), temp.count())

1122936 699348


## TODO: Check that it makes sense with the bigger dataset

In [17]:
print((tweets_meta_df.count(), len(tweets_meta_df.columns)))

(1122936, 4)


### tweets_user

In [18]:
print((tweets_user_df.count(), len(tweets_user_df.columns)))
print(tweets_user_df.columns)
#tweets_user_df.show(10)

(660, 11)
['userid', 'user_display_name', 'user_screen_name', 'user_reported_location', 'user_profile_description', 'user_profile_url', 'account_creation_date', 'account_language', 'follower_count', 'following_count', 'last_tweet_at']


We first convert the dates and integers present in the dataframe. This also treats the wrong encodings in those columns (such as a language ('en') present in *last_tweet_at*.

In [19]:
tweets_user_df = tweets_user_df.withColumn('account_creation_date', to_timestamp(tweets_user_df.account_creation_date))
tweets_user_df = tweets_user_df.withColumn('last_tweet_at', to_timestamp(tweets_user_df.last_tweet_at))
tweets_user_df = tweets_user_df.withColumn('follower_count', tweets_user_df.follower_count.cast('int'))
tweets_user_df = tweets_user_df.withColumn('following_count', tweets_user_df.following_count.cast('int'))
tweets_user_df.createOrReplaceTempView("tweets_user_sql")

There also appear to be some wrong encodings in *account_language*.
## TODO: treat that with the bigger dataset

In [20]:
temp = spark.sql("SELECT userid, account_language FROM tweets_user_sql WHERE LENGTH(account_language)>2")

print(tweets_user_df.count(), temp.count())
temp.show()

660 8
+--------------------+--------------------+
|              userid|    account_language|
+--------------------+--------------------+
|4c05300dee83b32e7...|               en-gb|
|59889941a89633194...|               en-gb|
|76e8b0a247db6b3d2...|               en-gb|
|84dd8c4f6d42a7a78...|               en-gb|
|943154a86aa64a498...| islami bilgilər ...|
|bac526884ab0d54de...|          2017-03-02|
|bc1f64b72afcf37d0...|          2017-02-04|
|cc524f9726984adac...|               en-gb|
+--------------------+--------------------+



In [21]:
tweets_user_df.createOrReplaceTempView("tweets_user_sql")

We then split this dataframe into two:
* **anonymized_user_df** contains all the users that are anonymized.
* **exposed_user_df** contains all the other users.

This allows us to drop two columns for the anonymized users, which are a majority.

In [22]:
anonymized_user_df = spark.sql("SELECT * FROM tweets_user_sql WHERE userid=user_display_name")
exposed_user_df = spark.sql("SELECT * FROM tweets_user_sql WHERE NOT userid=user_display_name")

anonymized_user_df = anonymized_user_df.drop('user_display_name', 'user_screen_name')

print(tweets_user_df.count(), anonymized_user_df.count(), exposed_user_df.count())

660 618 42
