In [1]:
import json
from time import sleep
from bs4 import BeautifulSoup
from kafka import KafkaConsumer, KafkaProducer


def publish_message(producer_instance, topic_name, key, value):
    try:
        key_bytes = bytes(key, encoding='utf-8')
        value_bytes = bytes(value, encoding='utf-8')
        
        producer_instance.send(topic_name, key=key_bytes.lower(), value=value_bytes.lower())
        producer_instance.flush()
        print('Tweet published successfully.')
    except Exception as ex:
        print('Exception in publishing tweet')
        print(str(ex))


def connect_kafka_producer():
    _producer = None
    try:
        _producer = KafkaProducer(bootstrap_servers=['localhost:9092'], api_version=(0, 10))
    except Exception as ex:
        print('Exception while connecting Kafka')
        print(str(ex))
    finally:
        return _producer


def parse(tweet):
    title = str(tweet)
    rec = {}
    try:
        rec = {'tweet': title}

    except Exception as ex:
        print('Exception while parsing')
        print(str(ex))
    finally:
        return json.dumps(rec)


    
    

if __name__ == '__main__':
    print('===> Parsing Tweet Objects ...')
    print()
    parsed_records = []
    topic_name = 'raw_tweets'    
    parsed_topic_name = 'parsed_tweet'

    consumer = KafkaConsumer(topic_name, auto_offset_reset='earliest',
                             bootstrap_servers=['localhost:9092'], api_version=(0, 10), consumer_timeout_ms=1000)
    for msg in consumer:
        tweet = msg.value
        result = parse(tweet)
        print(tweet)
        parsed_records.append(result)

    consumer.close()
    sleep(5)

    if len(parsed_records) > 0:
        print()
        print('Publishing records...')
        print()
        producer = connect_kafka_producer()
        for rec in parsed_records:
            publish_message(producer, parsed_topic_name, 'parsed', rec)

===> Parsing Tweet Objects ...
b'Ontario to pay for meningitis vaccine in September - CTV News http://ow.ly/agm1'
b'Ontario to pay for meningitis vaccine in September http://tinyurl.com/lt3m36'
b'Ontario to pay for meningitis vaccine in September http://tinyurl.com/kp4c9e'
b'@Tazdevil2k9 do you no about the meningitis glass test'
b'@MJPhillips Pretty sure a bad Flu has meningitis like symptoms. And yes, it is contagious.'
b'well, we ruled out lepto, meningitis , giardia, west nile, swine, e. choli... Zax is finally symptom-free after 48 hrs, so bacteria it was!'
b'@metalfingers meningitis is more likely :)'
b'@charchaos ah right, i am gonna ask at the doctors see what they say the only injections i have had is the tb one and the meningitis one'
b'@perpetualspiral The Canadian definition of #fibro fits me to a T! I had Viral Meningitis caused by west nile virus in 2003. Vry sick since.'
b'Sending a email to my boss bitching about the fact that if I had been a nurse I would have known no