In [1]:
from boto.s3.connection import S3Connection
from boto.s3.key import Key
from functools import reduce
from collections import Counter
import boto3
import ssl
import json
from ipyparallel import Client
import pandas as pd

In [2]:
# Connect to Ipyrallel & S3
rc = Client()
conn = S3Connection()

In [3]:
# Get Kinesis firehose - S3 bucket
bucket = conn.get_bucket('chaufirehosebucket')
keys = bucket.get_all_keys()

In [4]:
dview = rc[:]
dview.scatter('keys', keys)

<AsyncResult: scatter>

In [5]:
%%px

import boto
import json
from collections import Counter
from boto.s3.connection import S3Connection

conn = S3Connection()
bucket = conn.get_bucket('chaufirehosebucket')

def count_hashtags(keys, bucket=bucket):
    '''
    input: list of boto s3 keys with text files containing tweets
    
    returns: hashtag counter
    '''
    
    hashtag_list = []
    
    for key in keys:  
        all_tweets= key.get_contents_as_string().split('\n')
        for tweet in all_tweets:
            try:
                # convert to json
                tweet_json = json.loads(tweet.decode('UTF-8'))
                
                # skip if it's deleted
                if 'delete' in tweet_json or 'entities' not in tweet_json:
                    continue
                
                #if tweet contains hashtags
                if len(tweet_json['entities']['hashtags']) != 0:    
                    hashtag_list.append(tweet_json['entities']['hashtags'][0]['text'])
            
            # when error
            except ValueError:
                continue
    
    # convert list to counter
    hashtag_count = Counter(hashtag_list)
        
    return hashtag_count

# run hashtag counter on each engine
hashtagcount = count_hashtags(keys, bucket=bucket)

In [6]:
# gather from all 10 engines
hash_counts = dview.gather('hashtagcount')

In [7]:
# add all the hashtag counts from each engine
top_hashtags = reduce(lambda x, y: x+y, hash_counts)
pd.DataFrame(top_hashtags.most_common()[0:10], columns=['hashtag', 'count'], index=range(1,11))

Unnamed: 0,hashtag,count
1,KCAPinoyStar,11410
2,توق_الاعصار_تويت_هوست,8715
3,KCAPersonalidadeBR,6971
4,モンスト,6260
5,KCAEstrellaLatina,5896
6,ALDUBxDTBYSinJie,5311
7,Lovatics,4179
8,izmirescort,3350
9,المباحث_وفولورز_الفلانتين,3293
10,sanremo2017,3218
