# 🦀 Kafka Connect

![Kafka Connect PG](images/connect_pg.png)

Kafka connect are prebuilt connectors that can be used to integrate Kafka with other sources or targets (souces or sinks in Kafka terms). Let's create a postgreSQL one. 

In [None]:
from kafka import KafkaProducer
import json
from config.kafka_config import *

producer = KafkaProducer(
        bootstrap_servers=hostname+":"+str(port),
        security_protocol="SSL",
        ssl_cafile=cert_folder+"/ca.pem",
        ssl_certfile=cert_folder+"/service.cert",
        ssl_keyfile=cert_folder+"/service.key",
        value_serializer=lambda v: json.dumps(v).encode('ascii'),
        key_serializer=lambda v: json.dumps(v).encode('ascii')   
    )

---

Let's create a new stream, adding the **schema** to it. 

Kafka Connect JDBC Sink requires a schema to be attached to the stream defining the its fields in detail. We have two choices:
* Attaching the schema to each JSON message
* Use schema registry with AVRO format

For the sake of this example we'll include the schema definition to the JSON message. Let's define the schema

In [None]:
key_schema = {
    "type": "struct",
    "fields": [
        {
            "type": "int32",
            "optional": False,
            "field": "id"
        }
    ]
}

value_schema = {
    "type": "struct",
    "fields": [
        {
            "type": "string",
            "optional": False,
            "field": "name"
        },
        {
            "type": "string",
            "optional": False,
            "field": "pizza"}]
}

And send some data

In [None]:
producer.send(
    topic_name+"_schema", 
    key={"schema": key_schema, "payload": {"id":1}},
    value={"schema": value_schema, 
           "payload": {"name":"👨 Frank", "pizza":"Margherita 🍕"}}
)

producer.send(
    topic_name+"_schema",
    key={"schema": key_schema, "payload": {"id":2}},
    value={"schema": value_schema, 
           "payload": {"name":"👨 Dan", "pizza":"Fries 🍕+🍟"}}
)


producer.send(
    topic_name+"_schema",
    key={"schema": key_schema, "payload": {"id":3}},
    value={"schema": value_schema,
           "payload": {"name":"👨 Jan", "pizza":"Mushrooms 🍕+🍄"}}
)

producer.flush()

Let's start the **Kafka Connect JDBC Connector**

In [None]:
%%bash

source config/profile_info.sh

avn service connector create $KAFKA_NAME @config/kafka_connect_setup.json --project $PROJECT_NAME

Verify the **Connector** status

In [None]:
%%bash

source config/profile_info.sh

avn service connector status $KAFKA_NAME sink_kafka_pg --project $PROJECT_NAME

Let's add another **event**
with our **Python Producer**

In [None]:
producer.send(
    topic_name+"_schema",
    key={
        "schema": key_schema,
        "payload": {"id":4}
    },
    value={
        "schema": value_schema,
        "payload": {"name":"👨 Giuseppe", "pizza":"Hawaii 🍕+🍍+🥓"}
          }
)


producer.flush()