In [47]:
import os
import boto3
import json
from faker import Faker
from geolite2 import geolite2
import random
from IPython.display import clear_output
import time
from datetime import datetime

In [55]:
REGION_NAME = 'us-west-2'
STREAM_NAME = 'review-stream'

# only send the review in range from FILE_RANGE_LOW to FILE_RANGE_HIGH
# Default is to send review #1 to #10 to Firehose
# For test purpose please you can send only a few reviews
# To send all 50k reviews, set FILE_RANGE_LOW = 1 and FILE_RANGE_HIGH = 50000
FILE_RANGE_LOW = 1
FILE_RANGE_HIGH = 100

In [49]:
path = 'aclImdb/train/unsup'
files = os.listdir(path)
faker = Faker()


In [50]:
file = os.path.join('aclImdb', '1990-2015-movie-titles.txt')
with open(file, 'r') as fin:
    lines = fin.readlines()
    titles = [line.split('\t').pop()[:-1] for line in lines]
num_title = len(titles)

In [51]:
client = boto3.client(service_name='firehose', region_name=REGION_NAME)

In [52]:
def deep_get(_dict, keys, default=None):
    for key in keys:
        if isinstance(_dict, dict):
            _dict = _dict.get(key, default)
        else:
            return default
    return _dict

In [56]:
%time
i = 0
text_list = []
reader = geolite2.reader()

for file in files:
    i += 1
    if i < FILE_RANGE_LOW or i > FILE_RANGE_HIGH: continue
    file = os.path.join(path, file)
    fsize = os.path.getsize(file)
    if fsize >= 5000:
        continue
    with open(file, 'r') as fin:
        text = fin.read()
        text = text.replace('<br /><br />', ' ')
    
    # randomly pick up a title that is less than 20 characters (longer name can not fit in small dashboard)
    title = ' ' * 30
    while len(title) > 20:
        j = random.randint(0, num_title - 1)
        title = titles[j]
    
    # generate ip
    info = {}
    while not info:
        ip = faker.ipv4()
        info = reader.get(ip)
    
    # generate geo info    
    continent_name = deep_get(info, ['continent', 'names', 'en'])
    country_iso_code = deep_get(info, ['country', 'iso_code'])
    region_name = info.get('subdivisions')[0]['names']['en'] if info.get('subdivisions') else None
    city_name = deep_get(info, ['city', 'names', 'en'])
    timezone = deep_get(info, ['location', 'time_zone'])
    location = {'lat': info['location']['latitude'], 'lon': info['location']['longitude']} if info.get('location')\
               else None

    geoip = {}
    if continent_name: geoip['continent_name'] = continent_name
    if country_iso_code: geoip['country_iso_code'] = country_iso_code
    if region_name: geoip['region_name'] = region_name
    if city_name: geoip['city_name'] = city_name
    if timezone: geoip['timezone'] = timezone
    if location: geoip['location'] = location
    
    # construct payload sent to kinesis firehose
    payload = {
        'text': text,
        'title': title,
        'ip_addr': ip,
        'geoip': geoip
    }
    
    clear_output(wait=True)
    print('processing review #{}'.format(i))
    
    # send payload to kinesis firehose
    client.put_record(DeliveryStreamName=STREAM_NAME, Record={'Data':json.dumps(payload)})
    
print('finished sending {} reviews'.format(FILE_RANGE_HIGH - FILE_RANGE_LOW + 1))


processing review #20
finished sending 49999 reviews
