# Produce messages to an Apache Kafka topic

In this notebook you will learn how to produce messages to an Apache Kafka topic

![Produce messages to an Apache Kafka Topic](../img/produce.png)

---

## Prerequisites

To start the tutorial you need to:

* Download the Aiven for Apache Kafka SSL certificates as mentioned in the [README instructions](../README.md#Download-the-required-SSL-certificates)
* Retrieve the Aiven for Apache Kafka hostname and port, from the [Aiven Console](https://console.aiven.io/) in the Aiven for Apache Kafka service overview
* Substitute the Apache Kafka hostname and port in the parameters below

In [1]:
# Replace the following two placeholders with Aiven for Apache Kafka service hostname and port

hostname="kafka-samtest-sam-tha2023.a.aivencloud.com"
port="19266"

---

### Install the required libraries

The following installs the [kafka-python library](https://kafka-python.readthedocs.io/en/master/) we'll use for the tutorial

In [2]:
!pip install confluent-kafka

Defaulting to user installation because normal site-packages is not writeable


---

## Create an Apache Kafka producer

The next step is to configure the Kafka producer.

In [3]:
import json
from confluent_kafka import SerializingProducer

def json_serializer(msg, s_obj):
    return json.dumps(msg).encode('ascii')

conf = {
    'bootstrap.servers': hostname+":"+port,
    'client.id': 'myclient',
    'security.protocol': 'SSL',
    'ssl.ca.location': '../sslcerts/ca.pem',
    'ssl.certificate.location': '../sslcerts/service.cert',
    'ssl.key.location': '../sslcerts/service.key', 
    'value.serializer': json_serializer,
    'key.serializer': json_serializer
    }

producer = SerializingProducer(conf)

---
## Push your first message to the `StockTickets` topic

In the below section we're pushing the first message to the `StockTickets` topic.

> Note that we're producing data to a topic named `StockTickets`, you need to have the topic created beforehand in Apache Kafka or the `kafka.auto_create_topics_enable` parameter enabled

The `flush` function ensures that the message is sent to Kafka

In [13]:

import uuid
from datetime import datetime

producer.produce(
    "StockTickets",
    key={"id": str(uuid.uuid4())},

    value={
        "symbol": "AMZN",
        "price": 201.25,
        "quantity": 50,
        "action": "buy",
        "timestamp": datetime.utcnow().strftime('%Y-%m-%dT%H:%M:%S.%f')
    }
)

producer.flush()

0

---

## Produce more messages



In [14]:
import uuid
from datetime import datetime

producer.produce(
    "StockTickets",
    key={"id": str(uuid.uuid4())},

    value={
        "symbol": "AAPL",
        "price": 150.15,
        "quantity": 100,
        "action": "buy",
        "timestamp": datetime.utcnow().strftime('%Y-%m-%dT%H:%M:%S.%f')
    }
)


producer.produce(
    "StockTickets",
    key={"id": str(uuid.uuid4())},

    value={
        "symbol": "MSFT",
        "price": 285.53,
        "quantity": 120,
        "action": "buy",
        "timestamp": datetime.utcnow().strftime('%Y-%m-%dT%H:%M:%S.%f')
    }
)

producer.produce(
    "StockTickets",
    key={"id": str(uuid.uuid4())},

    value={
        "symbol": "GOOGL",
        "price": 140.37,
        "quantity": 20,
        "action": "buy",
        "timestamp": datetime.utcnow().strftime('%Y-%m-%dT%H:%M:%S.%f')
    }
)

producer.produce(
    "StockTickets",
    key={"id": str(uuid.uuid4())},

    value={
        "symbol": "TSLA",
        "price": 261.44,
        "quantity":150,
        "action": "buy",
        "timestamp": datetime.utcnow().strftime('%Y-%m-%dT%H:%M:%S.%f')
    }
)

producer.flush()

0