In [1]:
import pyspark, pickle
from pyspark import SparkContext
from pyspark.sql.functions import countDistinct
from pyspark.storagelevel import StorageLevel
import pandas as pd
import numpy as np

In [2]:
spark = pyspark.sql.SparkSession.builder.getOrCreate()
sc = spark.sparkContext

## Check schema of dataframe made from tweets

In [4]:
df = spark.read.json('tweet_storage1/')
df.printSchema()

root
 |-- contributors: string (nullable = true)
 |-- coordinates: struct (nullable = true)
 |    |-- coordinates: array (nullable = true)
 |    |    |-- element: double (containsNull = true)
 |    |-- type: string (nullable = true)
 |-- created_at: string (nullable = true)
 |-- entities: struct (nullable = true)
 |    |-- hashtags: array (nullable = true)
 |    |    |-- element: struct (containsNull = true)
 |    |    |    |-- indices: array (nullable = true)
 |    |    |    |    |-- element: long (containsNull = true)
 |    |    |    |-- text: string (nullable = true)
 |    |-- media: array (nullable = true)
 |    |    |-- element: struct (containsNull = true)
 |    |    |    |-- display_url: string (nullable = true)
 |    |    |    |-- expanded_url: string (nullable = true)
 |    |    |    |-- id: long (nullable = true)
 |    |    |    |-- id_str: string (nullable = true)
 |    |    |    |-- indices: array (nullable = true)
 |    |    |    |    |-- element: long (containsNull = true

## Function to read in tweet files, extract necessary data, and save

In [3]:
def extract_tweet_data(tweets_folder, save_as):
    """
    tweets_folder: a folder with .json files containing tweets
    save_as: filename to save processed tweets
    This function loads the tweets, extracts the necessary data, and saves a new parquet file with that data.
    """

    df = spark.read.json(tweets_folder)
    df.registerTempTable("df")

    # Extract tweets, nested retweets, and nested quoted tweets into separate flattened dataframes
    flat_tweets = spark.sql("""
        select id_str as tweet_id, coordinates.coordinates[0] as lon, coordinates.coordinates[1] as lat, created_at,
            in_reply_to_status_id_str as in_reply_to_tweet_id, in_reply_to_screen_name, source, user.description,
            user.followers_count, user.friends_count, user.id_str as user_id, user.screen_name as screen_name,
            user.location, user.name, user.statuses_count, user.time_zone, user.utc_offset, retweet_count, 
            retweeted_status.id_str as retweet_id, retweeted_status.user.screen_name as retweeted_screen_name,
            quoted_status_id_str as quote_id, quoted_status.user.screen_name as quoted_screen_name, entities.hashtags,
            entities.user_mentions,
                case when retweeted_status is not null then retweeted_status.text
                else text end as text

        from df
    """)

    flat_RTs = spark.sql("""
        select id_str as tweet_id, coordinates.coordinates[0] as lon, coordinates.coordinates[1] as lat, created_at,
            in_reply_to_status_id_str as in_reply_to_tweet_id, in_reply_to_screen_name, source, user.description,
            user.followers_count, user.friends_count, user.id_str as user_id, user.screen_name as screen_name,
            user.location, user.name, user.statuses_count, user.time_zone, user.utc_offset, retweet_count, 
            null as retweet_id, null as retweeted_screen_name, quoted_status_id_str as quote_id,
            quoted_status.user.screen_name as quoted_screen_name, entities.hashtags, entities.user_mentions, text
        from 
            (select retweeted_status.*
             from df
             where retweeted_status is not null
            ) as sub
    """)

    flat_QTs = spark.sql("""
        select id_str as tweet_id, coordinates.coordinates[0] as lon, coordinates.coordinates[1] as lat, created_at,
            in_reply_to_status_id_str as in_reply_to_tweet_id, in_reply_to_screen_name, source, user.description,
            user.followers_count, user.friends_count, user.id_str as user_id, user.screen_name as screen_name,
            user.location, user.name, user.statuses_count, user.time_zone, user.utc_offset, retweet_count, 
            null as retweet_id, null as retweeted_screen_name, quoted_status_id_str as quote_id,
            null as quoted_screen_name, entities.hashtags, entities.user_mentions, text
        from 
            (select quoted_status.*
             from df
             where quoted_status is not null
            ) as sub
    """)

    flat_tweets.registerTempTable('flat_tweets')
    flat_RTs.registerTempTable('flat_RTs')
    flat_QTs.registerTempTable('flat_QTs')
    
    # Combine the above and keep distinct tweet_ids
    undistinct_tweets = spark.sql("""
        select *
        from flat_tweets
        union all
        select *
        from flat_RTs
        union all
        select *
        from flat_QTs

    """)
    undistinct_tweets.registerTempTable('undistinct_tweets')
    
    distinct_tweets = spark.sql("""
        select *
        from
            (select *,
                row_number() over (partition by tweet_id order by created_at) as n_repeats
            from undistinct_tweets) as sub
        where n_repeats = 1
    """)
    distinct_tweets.persist(StorageLevel.MEMORY_AND_DISK)
    distinct_tweets.registerTempTable('distinct_tweets')
    
    distinct_tweets.write.parquet(save_as)
    print('Extracted and saved!')
    
    

## Extract and save data

In [7]:
extract_tweet_data('tweet_storage1/', 'tweets1.parquet')

Extracted and saved!


In [5]:
extract_tweet_data('tweet_storage2/', 'tweets2.parquet')

Extracted and saved!


## Load simplified data and union distinct tweet_ids

In [6]:
tweets1 = spark.read.parquet('tweets1.parquet')
tweets2 = spark.read.parquet('tweets2.parquet')

tweets1.registerTempTable('tweets1')
tweets2.registerTempTable('tweets2')

In [8]:
all_tweets = spark.sql("""
    select *
    from
        (select *,
            row_number() over (partition by tweet_id order by created_at) as n_repeats2
        from
            (select *
            from tweets1

            union all

            select *
            from tweets2) sub
        ) sub2
    where n_repeats2 = 1
""")
all_tweets.persist(StorageLevel.MEMORY_AND_DISK)
all_tweets.registerTempTable('all_tweets')

In [9]:
all_tweets.count()

1335009

## Do some checks on the data

In [10]:
# View some of the data
all_tweets.filter('lon is not null').select('tweet_id', 'screen_name', 'lon', 'lat', 'text').show()

+------------------+---------------+-------------+------------+--------------------+
|          tweet_id|    screen_name|          lon|         lat|                text|
+------------------+---------------+-------------+------------+--------------------+
|840539877162668034|  yveslavoieMTL|     -73.5865|     45.4772|Good morning Mooi...|
|837644997671403521|     kjnisamutt| -77.03733333| 38.92869444|Trees flowering 3...|
|842491721317457920|      Boston_CP|  -71.0552516|  42.3561185|Funding for clima...|
|839012242620203009|  trendinaliaCH|       8.4445|      46.813|#ClimateChange ju...|
|840441224553205761|     LegiScanMA|   -71.063701|   42.358424|H2700 [NEW] To in...|
|841310563883864066|TonSeewisahnAEC|   100.798135|   18.731498|@PrincePhilipDoE ...|
|841633766514741251| tmj_VAA_gensci|  -77.1124701|   38.882584|See our latest #A...|
|841746155671175169|  RealTMoneyNBA| -83.51975641| 35.70950542|Enough with the E...|
|842338009672380416|AndrewFaulkner9|    138.65189| -34.9147756|So

In [11]:
# Number of distinct users: 306,753
all_tweets.agg(countDistinct(all_tweets.user_id)).show()

+-----------------------+
|count(DISTINCT user_id)|
+-----------------------+
|                 560768|
+-----------------------+



In [12]:
# Most retweeted users
spark.sql("""
    select screen_name, sum(retweet_count) as total_rts
    from all_tweets
    where retweet_id is null
    group by screen_name
    order by total_rts desc
""").show()

+---------------+---------+
|    screen_name|total_rts|
+---------------+---------+
|realDonaldTrump|  1124229|
|     SenSanders|   483268|
|        BillNye|   199603|
|    GeorgeTakei|   178596|
|   justinbieber|   157640|
|   memeprovider|   137662|
|    LeoDiCaprio|   127303|
|    DennysDiner|   119755|
|  BernieSanders|   118881|
|          POTUS|   118746|
|StephenSchlegel|   114245|
|      moodtrble|   105767|
|  GameOfThrones|   104459|
|BlackMarvelGirl|   103433|
|  followersell4|   103384|
| TheReal_JDavis|   102211|
|    NotAltWorld|   100704|
|      JOE_co_uk|    99287|
|            CNN|    89681|
|         maddow|    85244|
+---------------+---------+
only showing top 20 rows



In [13]:
# Users with most followers
spark.sql("""
    select screen_name, max(followers_count) as n_followers
    from all_tweets
    group by screen_name
    order by n_followers desc
""").show()

+---------------+-----------+
|    screen_name|n_followers|
+---------------+-----------+
|      katyperry|   96447094|
|   justinbieber|   92224531|
|    BarackObama|   85885212|
|        YouTube|   67034413|
|   TheEllenShow|   66010915|
|  britneyspears|   51236463|
|         cnnbrk|   46771359|
|    jimmyfallon|   44996522|
|       ddlovato|   41428873|
|        nytimes|   34501992|
|      BillGates|   33867519|
|            CNN|   32680520|
|           espn|   30821565|
|    BBCBreaking|   30347572|
|  NiallOfficial|   28778267|
|   narendramodi|   28016904|
|realDonaldTrump|   26649432|
|            NBA|   24458309|
|     EmmaWatson|   24084843|
|        pitbull|   23766408|
+---------------+-----------+
only showing top 20 rows



## Read a bunch of tweets from each of my search categories to make sure they are generally relevant

In [14]:
# 'climatechange' is all about climate change

spark.sql("""
    select screen_name, created_at, text
    from all_tweets
    where text like '%climatechange%'
""").take(50)

[Row(screen_name='pepita78', created_at='Thu Jan 05 06:15:05 +0000 2017', text='A website to predict how #climatechange could impact #FoodInsecurity #nonprofit #nptech  https://t.co/IPLb6k0Sf0'),
 Row(screen_name='Freightera', created_at='Wed Feb 01 00:14:08 +0000 2017', text='New #infographic just went live! The Green Future of Freight  https://t.co/0MUKwmc4eO #greenfreight #greentransport #climatechange &lt;3'),
 Row(screen_name='findingAJ', created_at='Thu Feb 23 21:11:34 +0000 2017', text="Everyone wants the ocean &amp; the garden view condos but if you don't acknowledge #climatechange there will be no view you rich rep bastards."),
 Row(screen_name='GrnFury', created_at='Sun Feb 26 23:29:29 +0000 2017', text='The walking dead in DC - why 45 will cause alarm bells sooner than HRC #climatechange #refugees #poverty https://t.co/JxsPJMxVbj'),
 Row(screen_name='WWF', created_at='Mon Feb 27 17:15:32 +0000 2017', text='Happy #InternationalPolarBearDay! Find out what you can do to help co

In [None]:
# '#climate' is all about climate change

spark.sql("""
    select screen_name, created_at, text
    from all_tweets
    where text like '%#climate %'
""").take(50)

In [None]:
# 'climate' and combinations of certain other words are all about climate change

spark.sql("""
    select screen_name, created_at, text
    from all_tweets
    where text like '%climate%' and text rlike 'science|scientists?|alarmists?|change|realists?|denial|deniers?' 
""").take(50)

In [None]:
# 'globalwarming' and 'global' or 'warming' are all about climate change

spark.sql("""
    select screen_name, created_at, text
    from all_tweets
    where (text like '%globalwarming%') or (text like '%global%' and text like '%warming%') 
""").take(50)

In [None]:
# 'agw' grabs lots of irrelevant stuff (mostly in Japanese...)
# I changed my API query to search specifically for '#agw', which appears to be much more effective

spark.sql("""
    select screen_name, created_at, text
    from all_tweets
    where text like '%#agw%'
""").take(50)

## Delete tweets that reference Pitbull's new album, "Climate Change"

In [16]:
delete_tweets = spark.sql("""
    select distinct tweet_id
    from 
        (select tweet_id
        from all_tweets
        where screen_name in ('pitbull', 'RCARecords', 'Genius') or
            retweeted_screen_name in ('pitbull', 'RCARecords', 'Genius') or
            quoted_screen_name in ('pitbull', 'RCARecords', 'Genius') or
            in_reply_to_screen_name in ('pitbull', 'RCARecords', 'Genius')

        union

        select tweet_id
        from
            (select tweet_id, explode(user_mentions) as user_mentions
            from all_tweets) sub
        where user_mentions.screen_name in ('pitbull', 'RCARecords', 'Genius')
        ) delete_tweets_sub
""")
delete_tweets.registerTempTable('delete_tweets')

In [None]:
all_tweets = spark.sql("""
    select a.*
    from all_tweets as a
    left join delete_tweets as d
    on a.tweet_id = d.tweet_id
    where d.tweet_id is null
""")
all_tweets.registerTempTable('all_tweets')

In [18]:
all_tweets.count()

1323677

## Remove bots

In [19]:
# View retweet/tweet ratios for users. 

spark.sql("""
select screen_name, n_rt, n_orig, n_rt/(n_rt + n_orig)*100 as rt_tw_ratio
from
    (select screen_name, 
        sum(case when type = 'rt' then n else 0 end) as n_rt,
        sum(case when type = 'orig' then n else 0 end) as n_orig
    from
        (select screen_name, type, count(*) as n
        from
            (select screen_name,
                case when retweet_id is null then 'orig'
                else 'rt' end as type
            from all_tweets) as sub
        group by screen_name, type) as sub2
    group by screen_name) as sub3
order by rt_tw_ratio desc, n_rt desc
""").show()

+---------------+----+------+-----------+
|    screen_name|n_rt|n_orig|rt_tw_ratio|
+---------------+----+------+-----------+
| TrumpvsScience|3092|     0|      100.0|
| NRGrenaissance|1993|     0|      100.0|
| EarthMineYours|1341|     0|      100.0|
|  BeingHelpless|1108|     0|      100.0|
|   JupiterHydro|1014|     0|      100.0|
|     CarbonHand| 406|     0|      100.0|
|    RussJensen5| 363|     0|      100.0|
|     energycoin| 342|     0|      100.0|
|   Abby_Retweet| 324|     0|      100.0|
|GrnConservatism| 299|     0|      100.0|
|        rjber15| 294|     0|      100.0|
|HinduismVersity| 242|     0|      100.0|
|    Antonis_Avg| 209|     0|      100.0|
|ClimateCong_Bot| 190|     0|      100.0|
|  coolgreenland| 188|     0|      100.0|
| krosenlund_com| 186|     0|      100.0|
|      dianez123| 179|     0|      100.0|
|        Dipudgo| 179|     0|      100.0|
|    jo_kasprzak| 167|     0|      100.0|
|  ollivier_yves| 166|     0|      100.0|
+---------------+----+------+-----

In [21]:
#Remove users that have retweet/tweet ratios greater than 97%, and that have more than 10 retweets.
# They are either bots or curated lists who don't engage in conversation.

bot_list = spark.sql("""
    select screen_name
    from
        (select screen_name, 
            sum(case when type = 'rt' then n else 0 end) as n_rt,
            sum(case when type = 'orig' then n else 0 end) as n_orig
        from
            (select screen_name, type, count(*) as n
            from
                (select screen_name,
                    case when retweet_id is null then 'orig'
                    else 'rt' end as type
                from all_tweets) as sub
            group by screen_name, type) as sub2
        group by screen_name
        having n_rt/(n_rt + n_orig)*100 > 97 and n_rt > 10) bots
""")
bot_list.registerTempTable('bot_list')
bot_list.persist(StorageLevel.MEMORY_AND_DISK)

DataFrame[screen_name: string]

In [22]:
bot_list.count()

5491

In [23]:
all_tweets = spark.sql("""
    select t.screen_name as screen_name, tweet_id, lon, lat, created_at, in_reply_to_tweet_id, in_reply_to_screen_name, source, description,
        followers_count, friends_count, user_id, location, name, statuses_count, time_zone, utc_offset, retweet_count,
        retweet_id, retweeted_screen_name, quote_id, quoted_screen_name, hashtags, user_mentions, text
    from all_tweets t
    left join bot_list b
    on b.screen_name = t.screen_name
    where b.screen_name is null
""")

In [24]:
all_tweets.count()

1185967

## Save tweets

In [25]:
all_tweets.write.parquet('tweets_all.parquet')

## Extract the messy nested objects (i.e. hashtags and user_mentions) from the tweets  
## Save as separate tables

In [26]:
hashtags = spark.sql("""
    select distinct tweet_id, lower(hashtags.text) as hashtag
    from
        (select tweet_id, explode(hashtags) as hashtags
        from all_tweets) as sub
""")

In [27]:
hashtags.show()

+------------------+--------------------+
|          tweet_id|             hashtag|
+------------------+--------------------+
|527628746803933185|                il12|
|704210962786586624|    blacklivesmatter|
|714906364452216832|       banplasticbag|
|714906364452216832|       globalwarming|
|714906364452216832|            goletete|
|723908456567177216|              nyspen|
|736061416839385089|       globalwarming|
|798538297106497541|environmentaljustice|
|808658270667751424|             climate|
|815214201228640256|                xkcd|
|816890746439692288|       climatechange|
|816890746439692288|      foodinsecurity|
|816890746439692288|           nonprofit|
|816890746439692288|              nptech|
|826584383796637696|         infographic|
|826584383796637696|        greenfreight|
|826584383796637696|      greentransport|
|826584383796637696|       climatechange|
|831981443916169218|       climatechange|
|831981443916169218|               trump|
+------------------+--------------

In [30]:
hashtags.write.parquet('hashtags_all.parquet')

In [28]:
mentions = spark.sql("""
    select distinct tweet_id, mentions.screen_name as mention_screen_name
    from
        (select tweet_id, explode(user_mentions) as mentions
        from all_tweets) as sub
""")

In [29]:
mentions.show()

+------------------+-------------------+
|          tweet_id|mention_screen_name|
+------------------+-------------------+
|737652900042821632|            YouTube|
|805367896901697536|    healthy_climate|
|808150313194627072|    realDonaldTrump|
|815214201228640256|             PopSci|
|831981443916169218|             SFGate|
|833009966386667520|            CUSP_uk|
|833866585660067840|        KaceyDeamer|
|833866585660067840|        LiveScience|
|836528979356045312|            BillNye|
|836827463263084544|             JKment|
|836827463263084544|         SenSanders|
|836827463263084544|     HillaryClinton|
|836841994735521792|    KenyaMetService|
|836845294272856064|      britneyspears|
|836846111964954626|    EuroGeosciences|
|836846111964954626|        guardianeco|
|836847819768086528|      RoseAnnDeMoro|
|836847819768086528|              Shell|
|836849424684974081|         SenSanders|
|836851436285747201|    CrimsonCloakPub|
+------------------+-------------------+
only showing top

In [31]:
mentions.write.parquet('mentions_all.parquet')

## Check on tweets that I might be missing  
### (I didn't end up doing anything with this.)

In [None]:
# Of my tweets, 18067 are replies to distinct statuses that I didn't download
tweet_ids_to_get = spark.sql("""
    select distinct tweet_id
    from
        (select in_reply_to_tweet_id as tweet_id
        from all_tweets
        except
        select tweet_id
        from all_tweets) as sub
""").collect()

tweet_ids_to_get = [str(x.tweet_id) for x in tweet_ids_to_get]

# use tweepy API.statuses_lookup with include_entities set to True
# http://docs.tweepy.org/en/v3.5.0/api.html

In [None]:
# with open('tweet_ids_to_get.pkl', 'wb') as picklefile:
#     pickle.dump(tweet_ids_to_get, picklefile)

## Notes on what some of the tweet fields mean

In [4]:
# Showing that if you retweet, your retweet_count shows the number of RTs of the original tweet
df.select('user.screen_name', 'text', 'retweet_count', 'retweeted_status', 'retweeted_status.user.screen_name', 
          'retweeted_status.text').limit(20).show()

+---------------+--------------------+-------------+--------------------+-------------+--------------------+
|    screen_name|                text|retweet_count|    retweeted_status|  screen_name|                text|
+---------------+--------------------+-------------+--------------------+-------------+--------------------+
|    LeahHelene3|RT @brianschatz: ...|          395|[null,null,Thu Ma...|  brianschatz|This is just nuts...|
|      nawillyms|RT @AllanMargolin...|           26|[null,null,Thu Ma...|AllanMargolin|#Climate Cartoon:...|
|   CentristView|What? https://t.c...|            0|                null|         null|                null|
|    gierke_matt|RT @ReutersUS: JU...|         2864|[null,null,Thu Ma...|    ReutersUS|JUST IN: EPA chie...|
| FogartyClimate|EPA chief Scott P...|            0|                null|         null|                null|
| CarloScagnelli|EPA chief Scott P...|            2|                null|         null|                null|
|      pauleliza|RT

In [6]:
# Showing, with the above table for reference, that quoted tweets don't count as RTs
df.filter('quoted_status is not null').select('user.id_str', 'user.screen_name', 'text',
                'quoted_status', 'quoted_status.user.screen_name', 'quoted_status.text').limit(20).show()

+------------------+---------------+--------------------+--------------------+---------------+--------------------+
|            id_str|    screen_name|                text|       quoted_status|    screen_name|                text|
+------------------+---------------+--------------------+--------------------+---------------+--------------------+
|          42255271|   moberhoffner|Looks like Scott ...|[null,null,Thu Ma...|           CNBC|EPA chief Scott P...|
|        4220206828|        SeanCLF|Shameful.  Even R...|[null,null,Thu Ma...|      RogueNASA|Oh. EPA chief Sco...|
|          15104467|      FogBelter|.@CNBC Pruitt is ...|[null,null,Thu Ma...|    keithboykin|Contradicting @NA...|
|          44425835|     _kaitlin_s|I feel so protect...|[null,null,Thu Ma...|      ReutersUS|JUST IN: EPA chie...|
|        2493496338|       bbschrei|Confirmation time...|[null,null,Thu Ma...|           CNBC|EPA chief Scott P...|
|          22041466|        Dlashof|If #PollutingPrui...|[null,null,Thu 

In [7]:
# Showing that even if your tweet is a quote, you can still get RTs on it
df.filter('quoted_status is not null and retweet_count > 0').select('user.screen_name', 'text',
            'quoted_status.user.screen_name', 'quoted_status.text', 'retweet_count').limit(20).show()

+---------------+--------------------+--------------+--------------------+-------------+
|    screen_name|                text|   screen_name|                text|retweet_count|
+---------------+--------------------+--------------+--------------------+-------------+
|      kimjiwxxn|she's the cause o...|    aesthanbin|hi, i'm back 👊🏻...|            1|
|      BillTufts|Great analysis of...|        GYFHAS|@BillTufts not ju...|            3|
|TheGospelOfAsif|And that's what h...|     ReutersUS|JUST IN: EPA chie...|            2|
|  DawnMMorrison|@SenWhitehouse Un...|    BraddJaffy|EPA chief Scott P...|            1|
|     brianroewe|Scott Pruitt does...|          CNBC|EPA chief Scott P...|            1|
|RepAnthonyBrown|Very concerning t...|          CNBC|EPA chief Scott P...|            3|
|   BrandonDusch|Every climate sci...|         KFILE|EPA chief Scott P...|            1|
|   SMacMillanMD|Global warming is...|       Reuters|JUST IN: EPA chie...|            3|
|   jonbernhardt|If glo