In [1]:
!pip3 install -r requirements.txt

Defaulting to user installation because normal site-packages is not writeable
You should consider upgrading via the '/Library/Developer/CommandLineTools/usr/bin/python3 -m pip install --upgrade pip' command.[0m


In [2]:
import os
import json
import requests
from sqlalchemy import create_engine 
import pandas as pd
from dotenv import load_dotenv

Pyarrow will become a required dependency of pandas in the next major release of pandas (pandas 3.0),
(to allow more performant data types, such as the Arrow string type, and better interoperability with other libraries)
but was not found to be installed on your system.
If this would cause problems for you,
please provide us feedback at https://github.com/pandas-dev/pandas/issues/54466
        
  import pandas as pd


In [3]:
load_dotenv()

True

In [4]:
conn_string = f"postgresql://{os.getenv('DB__USER')}:{os.getenv('DB__PASSW')}@{os.getenv('LOCAL__HOST')}:{os.getenv('PG__PORT')}/{os.getenv('DB__NAME')}"
print(conn_string)
db = create_engine(conn_string) 
conn = db.connect() 

postgresql://postgres:postgres@127.0.0.1:5432/trades


In [5]:
df = pd.DataFrame(
    [
        (1, 'SBOL', 1973453, 'DFGHrthfgfTJ', 1702339200000),
        (2, 'TINKOFF', 1412434, 'ergerRhdsht', 1702339276000),
        (3, 'ALFA', 2542352, 'ewtegHNTH', 1702339239000)
    ],
    columns = ['id', 'ticker', 'securityid', 'tradecode', 'datepost']
)
df.sample()

Unnamed: 0,id,ticker,securityid,tradecode,datepost
2,3,ALFA,2542352,ewtegHNTH,1702339239000


In [7]:
df.to_sql('quotes', conn, if_exists='append', index=False)

3

In [8]:
with open('connector_psql.json', 'r') as f:
    data = json.load(f)
data

{'name': 'movies-db-connector',
 'config': {'connector.class': 'io.debezium.connector.postgresql.PostgresConnector',
  'plugin.name': 'pgoutput',
  'tasks.max': '1',
  'database.hostname': 'postgres_db',
  'database.port': '5432',
  'database.user': 'postgres',
  'database.password': 'postgres',
  'database.dbname': 'trades',
  'database.server.name': 'postgres',
  'table.include.list': 'public.quotes',
  'database.history.kafka.bootstrap.servers': 'broker:9092',
  'database.history.kafka.topic': 'schema-changes.quotes',
  'topic.prefix': 'uat.psql',
  'topic.creation.enable': 'true',
  'topic.creation.default.replication.factor': '1',
  'topic.creation.default.partitions': '1',
  'topic.creation.default.cleanup.policy': 'delete',
  'topic.creation.default.retention.ms': '604800000',
  'key.converter': 'io.confluent.connect.avro.AvroConverter',
  'value.converter': 'io.confluent.connect.avro.AvroConverter',
  'key.converter.schema.registry.url': 'http://schema-registry:8081',
  'value.

## Connect Debezium to create PSQL producer

In [10]:

url = f"http://{os.getenv('LOCAL__HOST')}:{os.getenv('DEBEZIUM__PORT')}/connectors"
url

'http://127.0.0.1:8083/connectors'

In [11]:
resp = requests.post(
    url = url, 
    headers = {'Content-Type':'application/json','Accept':'application/json'},
    json = data
)
resp.text

'{"name":"movies-db-connector","config":{"connector.class":"io.debezium.connector.postgresql.PostgresConnector","plugin.name":"pgoutput","tasks.max":"1","database.hostname":"postgres_db","database.port":"5432","database.user":"postgres","database.password":"postgres","database.dbname":"trades","database.server.name":"postgres","table.include.list":"public.quotes","database.history.kafka.bootstrap.servers":"broker:9092","database.history.kafka.topic":"schema-changes.quotes","topic.prefix":"uat.psql","topic.creation.enable":"true","topic.creation.default.replication.factor":"1","topic.creation.default.partitions":"1","topic.creation.default.cleanup.policy":"delete","topic.creation.default.retention.ms":"604800000","key.converter":"io.confluent.connect.avro.AvroConverter","value.converter":"io.confluent.connect.avro.AvroConverter","key.converter.schema.registry.url":"http://schema-registry:8081","value.converter.schema.registry.url":"http://schema-registry:8081","name":"movies-db-connecto

## Kafka and fastavro consumer

In [12]:
import io
from kafka import KafkaConsumer, TopicPartition
import fastavro
from confluent_kafka.schema_registry import SchemaRegistryClient

In [13]:
def get_schema_from_schema_registry(
                            schema_registry_url: str = 'http://schema-registry:8081',
                            schema_registry_subject: str = ''):
    
    sr = SchemaRegistryClient({'url': schema_registry_url})
    latest_version = sr.get_latest_version(schema_registry_subject)

    return sr, latest_version

In [14]:
_, schema_response = get_schema_from_schema_registry(
    schema_registry_url= f"http://{os.getenv('LOCAL__HOST')}:8081",
    schema_registry_subject = 'uat.psql.public.quotes-value')
print(schema_response.schema.schema_str)

{"type":"record","name":"Envelope","namespace":"uat.psql.public.quotes","fields":[{"name":"before","type":["null",{"type":"record","name":"Value","fields":[{"name":"id","type":"long"},{"name":"ticker","type":"string"},{"name":"securityid","type":"long"},{"name":"tradecode","type":"string"},{"name":"datepost","type":"long"}],"connect.name":"uat.psql.public.quotes.Value"}],"default":null},{"name":"after","type":["null","Value"],"default":null},{"name":"source","type":{"type":"record","name":"Source","namespace":"io.debezium.connector.postgresql","fields":[{"name":"version","type":"string"},{"name":"connector","type":"string"},{"name":"name","type":"string"},{"name":"ts_ms","type":"long"},{"name":"snapshot","type":[{"type":"string","connect.version":1,"connect.parameters":{"allowed":"true,last,false,incremental"},"connect.default":"false","connect.name":"io.debezium.data.Enum"},"null"],"default":"false"},{"name":"db","type":"string"},{"name":"sequence","type":["null","string"],"default":n

In [15]:
## Consumer kafka
kafka_cons = KafkaConsumer(
    auto_offset_reset='earliest',
    bootstrap_servers=f"{os.getenv('LOCAL__HOST')}:9092",
    security_protocol="PLAINTEXT",
    sasl_mechanism='PLAINTEXT'
)

In [16]:
topic = "uat.psql.public.quotes"
partitions = kafka_cons.partitions_for_topic(topic)
partitions  

{0}

In [17]:
tp = TopicPartition(topic, 0)
kafka_cons.assign([tp])
kafka_cons.seek_to_beginning(tp)

In [18]:
dc, i = {}, 0
for msg in kafka_cons:
    dc[i] = msg.value
    i += 1
    
i

KeyboardInterrupt: 

In [19]:
dc

{0: b'\x00\x00\x00\x00\x02\x00\x02\x02\x08SBOL\x9a\xf3\xf0\x01\x18DFGHrthfgfTJ\x80\x90\x96\xb5\x8bc\x162.4.1.Final\x14postgresql\x10uat.psql\x96\xbf\xbc\x8a\xb8c\x00\nfirst\x0ctrades\x02"[null,"22987440"]\x0cpublic\x0cquotes\x02\xe6\x07\x02\xe0\x8a\xf6\x15\x00\x02r\x02\xb6\xc0\xbc\x8a\xb8c\x00',
 1: b'\x00\x00\x00\x00\x02\x00\x02\x04\x0eTINKOFF\xa4\xb5\xac\x01\x16ergerRhdsht\xc0\xb3\x9f\xb5\x8bc\x162.4.1.Final\x14postgresql\x10uat.psql\x96\xbf\xbc\x8a\xb8c\x00\x08true\x0ctrades\x02"[null,"22987440"]\x0cpublic\x0cquotes\x02\xe6\x07\x02\xe0\x8a\xf6\x15\x00\x02r\x02\xb8\xc0\xbc\x8a\xb8c\x00',
 2: b'\x00\x00\x00\x00\x02\x00\x02\x06\x08ALFA\xa0\xac\xb6\x02\x12ewtegHNTH\xb0\xf1\x9a\xb5\x8bc\x162.4.1.Final\x14postgresql\x10uat.psql\x96\xbf\xbc\x8a\xb8c\x00\x08last\x0ctrades\x02"[null,"22987440"]\x0cpublic\x0cquotes\x02\xe6\x07\x02\xe0\x8a\xf6\x15\x00\x02r\x02\xb8\xc0\xbc\x8a\xb8c\x00'}

In [20]:
schema_fastavro = fastavro.parse_schema(json.loads(schema_response.schema.schema_str))

In [21]:
string_io = io.BytesIO(dc[0])
string_io.seek(5)

5

In [22]:
fastavro.schemaless_reader(string_io, schema_fastavro)

{'before': None,
 'after': {'id': 1,
  'ticker': 'SBOL',
  'securityid': 1973453,
  'tradecode': 'DFGHrthfgfTJ',
  'datepost': 1702339200000},
 'source': {'version': '2.4.1.Final',
  'connector': 'postgresql',
  'name': 'uat.psql',
  'ts_ms': 1708334223307,
  'snapshot': 'first',
  'db': 'trades',
  'sequence': '[null,"22987440"]',
  'schema': 'public',
  'table': 'quotes',
  'txId': 499,
  'lsn': 22987440,
  'xmin': None},
 'op': 'r',
 'ts_ms': 1708334223387,
 'transaction': None}