In [44]:
from datetime import date
import pandas as pd

from py2k.record import PandasToRecordsTransformer

Creating a pandas dataframe for example purpose:

In [45]:
df = pd.DataFrame({'name': ['Daniel','David','Felipe','Ruslan'],
                   'cool_level':['low', 'high', 'high', 'high'],
                   'value': [27.1,100.0, 0, 9000.0],
                   'date': [date(2021,3,1),date(2021,3,1),date(2021,3,1),date(2021,3,1)]})
df

Unnamed: 0,name,cool_level,value,date
0,Daniel,low,27.1,2021-03-01
1,David,high,100.0,2021-03-01
2,Felipe,high,0.0,2021-03-01
3,Ruslan,high,9000.0,2021-03-01


We create a basic `PandasToRecordsTransformer` by passing the dataframe and the `record_name`. The `record_name` auto assigns the "name" and "namespace" within the generated schema

In [46]:
df.dtypes

name           object
cool_level     object
value         float64
date           object
dtype: object

In [47]:
record_transformer = PandasToRecordsTransformer(df, record_name='KafkaRecord')

In [48]:
records = record_transformer.from_pandas()

In [49]:
records

[KafkaRecord(name='Daniel', cool_level='low', value=27.1, date=datetime.date(2021, 3, 1)),
 KafkaRecord(name='David', cool_level='high', value=100.0, date=datetime.date(2021, 3, 1)),
 KafkaRecord(name='Felipe', cool_level='high', value=0.0, date=datetime.date(2021, 3, 1)),
 KafkaRecord(name='Ruslan', cool_level='high', value=9000.0, date=datetime.date(2021, 3, 1))]

In [50]:
# We can inspect the schema of a record as follows:
records[0].schema()

{'type': 'record',
 'name': 'KafkaRecord',
 'namespace': 'python.kafka.kafkarecord',
 'fields': [{'default': '', 'type': 'string', 'name': 'name'},
  {'default': '', 'type': 'string', 'name': 'cool_level'},
  {'default': -1.0, 'type': 'double', 'name': 'value'},
  {'default': '0001-01-01',
   'type': 'string',
   'format': 'date',
   'name': 'date'}]}

To write these records onto Kafka, we need to create a `KafkaWriter` object. To this we need to specify a `producer_config` and `schema_registry_config`. The configuration for these can be found in the documentation for the [python confluent kafka](https://docs.confluent.io/platform/current/clients/confluent-kafka-python/html/index.html) package.

In [51]:
from py2k.writer import KafkaWriter

In [54]:
topic = 'py2k-test-topic'

writer = KafkaWriter(topic=topic,
                     schema_registry_config=schema_registry_config,
                     producer_config=producer_config)

In [55]:
writer.write(records)

100%|██████████| 4/4 [00:00<00:00,  7.69it/s]
