In [1]:
import pandas as pd

In [2]:
df_orders = pd.read_csv('data/olist_orders_dataset.csv', 
                 usecols=['order_id', 'customer_id', 'order_status','order_approved_at']).set_index('order_id')
df_items = pd.read_csv('data/olist_order_items_dataset.csv', 
                 usecols=['order_id', 'product_id', 'price']).set_index('order_id')
df_reviews = pd.read_csv('data/olist_order_reviews_dataset.csv',
                 usecols=['order_id', 'review_id', 'review_score', 'review_comment_message', 'review_creation_date'])

In [3]:
# dropna and duplicates and keep delivered orders
df_orders = df_orders.join(df_items, on='order_id').dropna().drop_duplicates()
df_orders = df_orders[df_orders['order_status']=='delivered']
# Keep reviews with comments msg
df_reviews = df_reviews.dropna()

In [4]:
print("Number of orders: %d"%len(df_orders))
print("Number of reviews: %d"%len(df_reviews))

Number of orders: 100182
Number of reviews: 40977


In [5]:
import time
import datetime

# str2ts = lambda x: datetime.datetime.strptime(x.split()[0], "%Y-%m-%d").timestamp()
str2date = lambda x: x.split()[0]

# df_orders['ts'] = df_orders['order_approved_at'].map(str2ts)
# df_reviews['ts'] = df_reviews['review_creation_date'].map(str2ts)
df_orders['date'] = df_orders['order_approved_at'].map(str2date)
df_reviews['date'] = df_reviews['review_creation_date'].map(str2date)
df_orders['price'] = df_orders['price'].map(lambda x: str(x))
df_reviews['review_comment_message'] = df_reviews['review_comment_message'].map(lambda x: x.replace('\n', '').replace('\r', ''))

In [6]:
start_date_str = df_orders['date'].sort_values().iloc[0]
end_date_str = df_orders['date'].sort_values().iloc[-1]
start_date_obj = datetime.datetime.strptime(start_date_str, '%Y-%m-%d')
end_date_obj = datetime.datetime.strptime(end_date_str, '%Y-%m-%d')

delta = (end_date_obj - start_date_obj).days

In [None]:
from kafka import KafkaProducer

producer = KafkaProducer(bootstrap_servers='localhost:9092')
df_orders_group = df_orders[['customer_id','product_id','price','date']].groupby(by='date')
df_reviews_group = df_reviews[['review_id','review_comment_message','date']].groupby(by='date')
orders_groups = df_orders_group.groups
reviews_groups = df_reviews_group.groups

for i in range(delta):
    now_obj = start_date_obj + datetime.timedelta(days=i)
    now_str = now_obj.strftime('%Y-%m-%d')
    print("Now:", now_str)
    if now_str in orders_groups:
        orders = df_orders_group.get_group(now_str).values
        for order in orders:
            key = order[0].encode('utf-8')
            value = '[SEP]'.join(order[1:]).encode('utf-8')
#             print(key.decode('utf-8'), value.decode('utf-8'))
            producer.send('order', key=key, value=value)
    if now_str in reviews_groups:
        reviews = df_reviews_group.get_group(now_str).values
        for review in reviews:
            key = review[0].encode('utf-8')
            value = '[SEP]'.join(review[1:]).encode('utf-8')
#             print(key.decode('utf-8'), value.decode('utf-8'))
            producer.send('review', key=key, value=value)
    time.sleep(1)
