## Make Sure Contexts are Loaded

In [44]:
sc

<pyspark.context.SparkContext at 0x7fa30003e550>

In [45]:
sqlContext

<pyspark.sql.context.HiveContext at 0x7fa2e4c68790>

## Load the Captured Streaming Data in Python

The tweets.json file has 243,554 tweets and takes 1.2 GiB of space. It was collected from Twitter's filter stream by searching for "trump,donaldtrump,clinton,hillaryclinton,sanders,berniesanders", and it was collected for two hours or so around noon MDT. Let's get a dictionary of languages as keys and number of tweets in those languages are values. We'll see how long this takes in old-fashioned Python.

In [3]:
import time
import simplejson as json

filename = '/home/anaconda/md0/data/2016_potus/stream/tweets.json'
langs = {}

start_time = time.time()

f_p = open(filename,'r')
for line in f_p:
    tweet = json.loads(line)
    if 'lang' in tweet:
        if tweet['lang'] in langs:
            langs[tweet['lang']] += 1
        else:
            langs[tweet['lang']] = 1
    
elapsed_time = time.time() - start_time
print "%02f seconds" % elapsed_time
# Pretty print langs as JSON
print "%s" % json.dumps(langs, indent=4)

21.767186 seconds
{
    "el": 13,
    "en": 226229,
    "zh": 5,
    "vi": 14,
    "is": 3,
    "it": 258,
    "iw": 5,
    "eu": 1,
    "cy": 23,
    "ar": 48,
    "in": 181,
    "cs": 25,
    "et": 48,
    "es": 4200,
    "ru": 40,
    "nl": 223,
    "pt": 447,
    "no": 28,
    "lo": 1,
    "tr": 255,
    "lt": 8,
    "lv": 4,
    "tl": 223,
    "th": 2,
    "ro": 112,
    "pl": 630,
    "ta": 1,
    "fr": 1409,
    "de": 340,
    "ht": 84,
    "da": 129,
    "fa": 3,
    "hi": 21,
    "fi": 9,
    "hu": 20,
    "ja": 102,
    "sr": 1,
    "ko": 5,
    "sv": 89,
    "und": 8301,
    "sl": 5
}


## Load Data in SparkSQL

We're going to look at the same computation in SparkSQL.

In [5]:
import time

filename = '/home/anaconda/md0/data/2016_potus/stream/tweets.json'

start_time = time.time()

# Form a Spark dataframe and register a temp table
sdf = sqlContext.read.json(filename)
sdf.registerTempTable('tweets')

query = "select lang, count(*) as num from tweets group by lang order by num desc"

elapsed_time = time.time() - start_time

print "%02f seconds" % elapsed_time
pdf = sqlContext.sql(query).toPandas()
pdf

6.526120 seconds


Unnamed: 0,lang,num
0,en,226229
1,und,8301
2,es,4200
3,fr,1409
4,pl,630
5,pt,447
6,de,340
7,it,258
8,tr,255
9,tl,223


In [6]:
sdf.printSchema()

root
 |-- contributors: string (nullable = true)
 |-- coordinates: struct (nullable = true)
 |    |-- coordinates: array (nullable = true)
 |    |    |-- element: double (containsNull = true)
 |    |-- type: string (nullable = true)
 |-- created_at: string (nullable = true)
 |-- entities: struct (nullable = true)
 |    |-- hashtags: array (nullable = true)
 |    |    |-- element: struct (containsNull = true)
 |    |    |    |-- indices: array (nullable = true)
 |    |    |    |    |-- element: long (containsNull = true)
 |    |    |    |-- text: string (nullable = true)
 |    |-- media: array (nullable = true)
 |    |    |-- element: struct (containsNull = true)
 |    |    |    |-- display_url: string (nullable = true)
 |    |    |    |-- expanded_url: string (nullable = true)
 |    |    |    |-- id: long (nullable = true)
 |    |    |    |-- id_str: string (nullable = true)
 |    |    |    |-- indices: array (nullable = true)
 |    |    |    |    |-- element: long (containsNull = true

In [43]:
start_time = time.time()

query = """
select
    sq2.time_zone as time_zone,
    sq2.mentions as mentions,
    sq2.clinton_rank as clinton_rank,
    sq2.trump_rank as trump_rank,
    sq2.sanders_rank as sanders_rank
from (
    select
        sq.time_zone as time_zone,
        sq.clinton + sq.trump + sq.sanders as mentions,
        dense_rank() over (order by sq.clinton desc) as clinton_rank,
        dense_rank() over (order by sq.trump desc) as trump_rank,
        dense_rank() over (order by sq.sanders desc) as sanders_rank
    from (
        select
            user.time_zone as time_zone,
            sum(case
                when lower(text) like '%clinton%' or lower(text) like '%hillary%'
                    then 1
                    else 0
                end) as clinton,
            sum(case
                when lower(text) like '%trump%' or lower(text) like '%donald%'
                    then 1
                    else 0
                end) as trump,
            sum(case
                when lower(text) like '%sanders%' or lower(text) like '%bernie%'
                    then 1
                    else 0
                end) as sanders
        from tweets
        group by user.time_zone
    ) sq
) sq2
where
    sq2.clinton_rank < 30
    or sq2.trump_rank < 30
    or sq2.sanders_rank < 30
order by sq2.mentions desc
"""
pdf = sqlContext.sql(query).toPandas()

elapsed_time = time.time() - start_time
print "%02f seconds" % elapsed_time

pdf

3.718285 seconds


Unnamed: 0,time_zone,mentions,clinton_rank,trump_rank,sanders_rank
0,,112310,1,1,1
1,Eastern Time (US & Canada),39572,2,2,3
2,Pacific Time (US & Canada),39096,3,3,2
3,Central Time (US & Canada),21007,4,4,4
4,Atlantic Time (Canada),5842,5,5,5
5,Quito,4924,6,6,7
6,Mountain Time (US & Canada),4452,7,7,6
7,Arizona,3472,8,8,8
8,London,2968,9,9,9
9,Hawaii,1532,11,11,10
