In [1]:
from sqlalchemy import create_engine
import pandas as pd
import os
import time

pwd = 'airflow'
uid = 'airflow'
server = "localhost"
db = "airflow"
port = "50000"

### Setting up debezium connectors

In [2]:
import socket
s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
s.connect(("8.8.8.8", 80))
local_ip_address = s.getsockname()[0]
s.close()

In [3]:
import requests

for table_name in ['stores', 'customers', 'purchases', 'manufacturers', 
                   'categories', 'products', 'purchase_items', 
                   'price_change', 'deliveries']:
    curr_connector = {
        "name": f"{table_name}-connector",
        "config": {
            "connector.class": "io.debezium.connector.postgresql.PostgresConnector",
    	"database.hostname": str(local_ip_address),
            "database.port": port,
            "database.user": uid,
    	"database.password": pwd,
            "database.dbname": db,
            "plugin.name": "pgoutput",
            "database.server.name": "source",
            "key.converter.schemas.enable": "false",
            "value.converter.schemas.enable": "false",
            "transforms": "unwrap",
            "transforms.unwrap.type": "io.debezium.transforms.ExtractNewRecordState",
            "value.converter": "org.apache.kafka.connect.json.JsonConverter",
            "key.converter": "org.apache.kafka.connect.json.JsonConverter",
    	"table.include.list": f"public.{table_name}",
            "slot.name" : f"dbz_{table_name}_slot"
        }
    }
    r = requests.post('http://localhost:8083/connectors', json=curr_connector)
    print(f'table {table_name} response code: {r.status_code}')

table stores response code: 201
table customers response code: 201
table purchases response code: 201
table manufacturers response code: 201
table categories response code: 201
table products response code: 201
table purchase_items response code: 201
table price_change response code: 201
table deliveries response code: 201


In [4]:
requests.get('http://localhost:8083/connectors').json()

['customers-connector',
 'purchases-connector',
 'categories-connector',
 'price_change-connector',
 'manufacturers-connector',
 'products-connector',
 'stores-connector',
 'deliveries-connector',
 'purchase_items-connector']

### Inserting data to tables

In [5]:
engine = create_engine(f'postgresql://{uid}:{pwd}@{server}:{port}/{db}')

In [6]:
pd.read_sql(f'select * from public.stores', engine)

Unnamed: 0,store_id,store_name,store_country,store_city,store_address


In [7]:
tables_data_samples = {'categories' : [[0], ['example']], 
                       'customers' : [[0], ['example'], ['example'], ['example'], ['example']], 
                       'deliveries' : [[0], [0], [0], ['2023-01-01'], [0]],
                       'manufacturers' : [[0], ['example'], ['example']],
                       'price_change' : [[0], ['2023-01-01'], [0]], 
                       'products' : [[0], [0], [0], ['2023-01-01'], ['example'], ['example'], [0]], 
                       'purchase_items' : [[0], [0], [0], [0]], 
                       'purchases' : [[0], [0], [0], ['2023-01-01'], ['2023-01-01']], 
                       'stores' : [[0], ['example'], ['example'], ['example'], ['example']]}
for table_name in ['stores', 'customers', 'purchases', 'manufacturers', 
                   'categories', 'products', 'purchase_items', 
                   'price_change', 'deliveries']:
    print(f'table_name: {table_name}')
    df = pd.read_sql(f'select * from public.{table_name}', engine)
    
    df_to_append = pd.DataFrame(tables_data_samples[table_name], index=df.columns).T
    
    for index, row in df_to_append.iterrows():
        mod = pd.DataFrame(row.to_frame().T)
        mod.to_sql(f"{table_name}", engine, if_exists='append', index=False)
        print(f"Row Inserted, table '{table_name}'")
        time.sleep(1)

table_name: stores
Row Inserted, table 'stores'
table_name: customers
Row Inserted, table 'customers'
table_name: purchases
Row Inserted, table 'purchases'
table_name: manufacturers
Row Inserted, table 'manufacturers'
table_name: categories
Row Inserted, table 'categories'
table_name: products
Row Inserted, table 'products'
table_name: purchase_items
Row Inserted, table 'purchase_items'
table_name: price_change
Row Inserted, table 'price_change'
table_name: deliveries
Row Inserted, table 'deliveries'


### Kafka

In [8]:
from kafka import KafkaConsumer
bootstrap_servers = ['localhost:29092']
consumer = KafkaConsumer( bootstrap_servers=bootstrap_servers)
consumer.topics()

{'__confluent.support.metrics',
 '_schemas',
 'connect-status',
 'connect_configs',
 'connect_offsets',
 'source.public.categories',
 'source.public.customers',
 'source.public.deliveries',
 'source.public.manufacturers',
 'source.public.price_change',
 'source.public.products',
 'source.public.purchase_items',
 'source.public.purchases',
 'source.public.stores'}

In [13]:
topicName = 'source.public.purchase_items'
# Initialize consumer variable
consumer = KafkaConsumer (topicName , auto_offset_reset='earliest', 
                          bootstrap_servers = bootstrap_servers, group_id=topicName.split('.')[-1]+'_group')

In [14]:
import json
for msg in consumer:
    print(json.loads(msg.value))

{'product_id': 0, 'purchase_id': 0, 'product_count': 0, 'product_price': 'AA=='}


KeyboardInterrupt: 