In [6]:
import gzip
from time import sleep
from json import dumps

topic = 'tweets'
BS_SERVERS = ['localhost:9092']
sleep_time = 5 # in seconds
bulk_size = 1000
stream_limit = 100000000

In [7]:
import random
from datetime import datetime, timedelta

def rand_date():
  today = datetime.today()
  years_ago = today - timedelta(days=30)
  random_days = random.randint(0, 30)

  # Calculate the random date
  random_date = years_ago + timedelta(days=random_days)
  random_date = random_date.strftime("%Y-%m-%d")
  return random_date

In [8]:
## functions
def get_tweet(idx, line) -> dict:
  """ ### Converts a line of text into a dictionary representing a tweet.

  #### Args:
    `idx` (int): The index of the line.
    `line` (str): The line of text representing a tweet.

  #### Returns:
    `dict`: A dictionary representing the tweet with the keys 'id', 'date', 'user', 'text', and 'retweets'.
  """
  try:
    line = line.replace("\'", "\"")
    attribute_details = line.split(',')
    tweet = {
      "id": attribute_details[1],
      "date": rand_date(),
      "user": attribute_details[4],
      "text": attribute_details[5],
      "retweets": int(random.random() * 10)
    }
    return tweet
  except:
    print(f"error at {idx}")
    return ''

In [9]:
## setting up the producer
from kafka import KafkaProducer

DUMP = lambda x: dumps(x).encode('utf-8')
producer = KafkaProducer(
  bootstrap_servers = BS_SERVERS,
  value_serializer = DUMP)

In [10]:
## sending stream of tweets
with gzip.open('data/training.1600000.processed.noemoticon.csv.gz','rt',encoding='latin-1') as f:
  for i, line in enumerate(f):
    tweet = get_tweet(i,line)
    producer.send('tweets', value=tweet)
    if i % bulk_size==0 and i > 0:
      print(f"streamed {i} {topic}")
      sleep(sleep_time)
    if i > stream_limit:
      break

streamed 1000 tweets
streamed 2000 tweets
streamed 3000 tweets
streamed 4000 tweets
