In [None]:
import random


In [None]:
%reload_ext streaming_jupyter_integrations.magics

In [None]:
%load_config_file --path ./config-local.json

In [None]:
%%bash -s {kafka_bootstrap}
kafka-topics --bootstrap-server $1 --delete --topic traffic


In [None]:
%%bash -s {kafka_bootstrap}
kafka-topics --bootstrap-server $1 --create --topic traffic --partitions 1 --replication-factor 1 \
--config file.delete.delay.ms=604800000000 \
--config segment.ms=604800000000 \
--config retention.ms=604800000000 \


In [None]:
%%bash 
export PGPASSWORD=$(kubectl get secret --namespace postgresql postgresql -o jsonpath="{.data.postgres-password}" | base64 -d)
PGPASSWORD="$PGPASSWORD" psql --host 127.0.0.1 -U postgres -d postgres -p 5432 -c 'drop table if exists users'
PGPASSWORD="$PGPASSWORD" psql --host 127.0.0.1 -U postgres -d postgres -p 5432 -c 'drop table if exists orders'
PGPASSWORD="$PGPASSWORD" psql --host 127.0.0.1 -U postgres -d postgres -p 5432 -c 'create table users (user_id int, timestamp bigint, country varchar(20), platform varchar(20))'
PGPASSWORD="$PGPASSWORD" psql --host 127.0.0.1 -U postgres -d postgres -p 5432 -c 'create table orders (order_id int, timestamp bigint, user_id int)'



In [None]:
%%bash
export PGPASSWORD=$(kubectl get secret --namespace postgresql postgresql -o jsonpath="{.data.postgres-password}" | base64 -d)
PGPASSWORD="$PGPASSWORD" psql --host 127.0.0.1 -U postgres -d postgres -p 5432 -c "select * from users limit 10"

In [None]:
from typing import Dict, Any
from doge_datagen import DataOnlineGenerator, KafkaAvroSinkFactory, DbSinkFactory, Subject, Transition, SubjectFactory, DataOnlineGenerator
import json
from dataclasses import dataclass
from random import randrange

countries = ['Poland', 'Germany', 'Czechia']
platforms = ['Mobile', 'PC', 'TV']

@dataclass
class User:
    user_id: int
    country: str
    platform: str
    site: str

    def __hash__(self):
        return self.user_id


class UserFactory:
    def __init__(self):
        super().__init__()
        self.current_id = 0

    def create(self) -> User:
        user = User(self.current_id, random.choice(countries), random.choice(platforms), None)
        self.current_id += 1
        return user


def key_function(subject: User, transition: Transition) -> Dict[str, Any]:
    return {'key': str(subject.user_id)}


def get_schema(schema_path):
    with open(schema_path) as f:
        return f.read()



In [None]:
kafka_avro_factory = KafkaAvroSinkFactory([kafka_bootstrap], schema_registry_url, 'feast-avro-demo')


In [None]:
print(kafka_bootstrap)
print(schema_registry_url)

In [None]:
db_factory = DbSinkFactory(f'postgresql://{postgres_username}:{postgres_password}@{postgres_url_short}/postgres')


In [None]:
key_schema = get_schema('./avro/Key.avsc')



In [None]:
# Traffic definition
traffic_event_schema = get_schema('./avro/Traffic.avsc')
class TrafficMapper:
    session_status = {}
    current_id = 0
    @staticmethod
    def traffic_key_function(subject: User, transition: Transition) -> Dict[str, Any]:
        return {'key': str(TrafficMapper.current_id)}
    @staticmethod
    def traffic_value_function(timestamp: int, subject: User, transition: Transition) -> Dict[str, Any]:
        if subject.user_id not in TrafficMapper.session_status or timestamp > TrafficMapper.session_status[subject.user_id]['last_timestamp'] + 3600000:
            TrafficMapper.session_status[subject.user_id] = {
                'last_timestamp' : timestamp,
                'session_listing_page_views': 0,
                'session_product_page_views': 0,
                'session_photo_page_views': 0,
            }
        else:
            TrafficMapper.session_status[subject.user_id]['last_timestamp'] = timestamp
            
        if subject.site == 'listing_page':
            TrafficMapper.session_status[subject.user_id]['session_listing_page_views'] += 1
        if subject.site == 'product_page':
            TrafficMapper.session_status[subject.user_id]['session_product_page_views'] += 1
        if subject.site == 'photo_page':
            TrafficMapper.session_status[subject.user_id]['session_photo_page_views'] += 1
            
        value = {
            'traffic_id': str(TrafficMapper.current_id),
            'timestamp': timestamp,
            'user_id': str(subject.user_id),
            'event': str(subject.site),
            'session_listing_page_views': TrafficMapper.session_status[subject.user_id]['session_listing_page_views'],
            'session_product_page_views': TrafficMapper.session_status[subject.user_id]['session_product_page_views'],
            'session_photo_page_views': TrafficMapper.session_status[subject.user_id]['session_photo_page_views'],
        }
        TrafficMapper.current_id += 1
        return value


mapper = TrafficMapper()


traffic_sink = kafka_avro_factory.create("traffic", mapper.traffic_key_function, key_schema, mapper.traffic_value_function, traffic_event_schema)


In [None]:
def users_mapper_function(timestamp: int, subject: User, transition: Transition) -> Dict[str, Any]:
    row = {
        'user_id': str(subject.user_id),
        'timestamp': timestamp,
        'country': subject.country,
        'platform': subject.platform,
    }
    return row


users_sink = db_factory.create("users", users_mapper_function)

In [None]:
class OrderMapper:
    current_id = 0
    @staticmethod
    def orders_mapper_function(timestamp: int, subject: User, transition: Transition) -> Dict[str, Any]:
        row = {
            'order_id': OrderMapper.current_id,
            'timestamp': timestamp,
            'user_id': str(subject.user_id)
        }
        OrderMapper.current_id += 1
        return row

mapper = OrderMapper()
orders_sink = db_factory.create("orders", mapper.orders_mapper_function)

In [None]:
class CallbackFactory:
    def __init__(self, target_page):
        self.target_page = target_page
    def get_callback(self):
        def callback(subject: User, transition: Transition):
            subject.site = self.target_page
            return True
        return callback  
       

In [None]:
# Doge configuration
def create_example_data_online_generator():
    datagen = DataOnlineGenerator(['register', 'offline', 'listing_page', 'product_page', 'photo_page', 'order_page'], 'register', UserFactory(), 1000, 60000, 50000, 1600000000000)
    datagen.add_transition('register_user', 'register', 'offline', 100, event_sinks=[users_sink])
    datagen.add_transition('login', 'offline', 'listing_page', 0.01, action_callback=CallbackFactory('listing_page').get_callback(), event_sinks=[traffic_sink])
    datagen.add_transition('logout', 'listing_page', 'offline', 1)
    datagen.add_transition('listing_to_product_page', 'listing_page', 'product_page', 9, action_callback=CallbackFactory('product_page').get_callback(), event_sinks=[traffic_sink])
    datagen.add_transition('product_to_listing_page', 'product_page', 'listing_page', 2, action_callback=CallbackFactory('listing_page').get_callback(), event_sinks=[traffic_sink])
    datagen.add_transition('product_to_photo_page', 'product_page', 'photo_page', 6, action_callback=CallbackFactory('photo_page').get_callback(), event_sinks=[traffic_sink])
    datagen.add_transition('product_to_order_page', 'product_page', 'order_page', 2, action_callback=CallbackFactory('order_page').get_callback(), event_sinks=[traffic_sink, orders_sink])
    datagen.add_transition('photo_to_product_page', 'photo_page', 'product_page', 5, action_callback=CallbackFactory('product_page').get_callback(), event_sinks=[traffic_sink])
    datagen.add_transition('photo_to_order_page', 'photo_page', 'order_page', 5, action_callback=CallbackFactory('order_page').get_callback(), event_sinks=[traffic_sink, orders_sink])
    datagen.add_transition('order_to_listing_page', 'order_page', 'listing_page', 50, action_callback=CallbackFactory('listing_page').get_callback(), event_sinks=[traffic_sink])
    datagen.add_transition('order_to_offline_page', 'order_page', 'offline', 50)


    return datagen



In [None]:
datagen = create_example_data_online_generator()



In [None]:
datagen.start()
