#  Kafka Streams (basic intro)

### 📘 Kafka Streams – Basic Introduction

**Kafka Streams** is a powerful Java library provided by Apache Kafka to build **real-time stream processing applications** on top of Kafka topics.



### 🔹 What Is Kafka Streams?

Kafka Streams allows you to:

* **Read** data from Kafka topics
* **Process/transform/analyze** that data
* **Write** the results back to Kafka (or other systems)

> It's built **on top of Kafka** itself—so no need for extra cluster components (unlike Apache Flink, Spark, etc.).



### 🔧 Key Features

| Feature                            | Description                                                              |
| ---------------------------------- | ------------------------------------------------------------------------ |
| **No cluster required**            | Runs inside a normal Java application – no Spark or Flink cluster needed |
| **Scalable**                       | Easily scales by increasing app instances                                |
| **Fault-tolerant**                 | Built-in stateful recovery using Kafka changelog topics                  |
| **Exactly-once semantics**         | Supported for reliable data processing                                   |
| **Windowing, Joins, Aggregations** | Built-in support for complex operations                                  |



### 📊 Real-World Use Case

**Example: Fraud Detection in a Bank**

1. Read **transaction events** from topic `transactions`
2. Group by user/account ID
3. Apply logic: e.g., more than 5 transactions in <10 seconds → suspicious
4. Emit alert into a topic `fraud_alerts`

This is a **Kafka Streams** application: reading from Kafka, analyzing in real-time, writing back to Kafka.



### 🧱 Core Building Blocks

| Component       | Description                                                         |
| --------------- | ------------------------------------------------------------------- |
| `KStream`       | A stream of events/messages                                         |
| `KTable`        | A table abstraction (like materialized views)                       |
| `GlobalKTable`  | A broadcasted table accessible to all tasks                         |
| `Processor API` | Low-level API for full control over processing                      |
| `DSL API`       | High-level API for declarative logic: `map()`, `filter()`, `join()` |



### 🔄 Kafka Streams vs Kafka Consumer API

| Kafka Consumer API                       | Kafka Streams                            |
| ---------------------------------------- | ---------------------------------------- |
| Manual offset handling                   | Built-in offset tracking                 |
| Single-message handling                  | Supports stateful, windowed processing   |
| Needs extra setup for joins, aggregation | Built-in joins, aggregations, windowing  |
| Suitable for simple consumers            | Ideal for stream processing applications |



# Kafka Connect

### 🔌 Kafka Connect – Introduction

**Kafka Connect** is a **framework** provided by Apache Kafka to **stream data between Kafka and external systems** (databases, cloud storage, key-value stores, file systems, etc.) without writing custom code.



### 🚀 Why Kafka Connect?

Kafka Connect is designed to:

* Simplify **data ingestion** into Kafka
* Simplify **data export** from Kafka to other systems
* Allow **scalable, fault-tolerant, distributed** data pipelines



### 🔁 What Does It Do?

Kafka Connect supports **two types of connectors**:

| Connector Type       | Purpose                                       | Example               |
| -------------------- | --------------------------------------------- | --------------------- |
| **Source Connector** | Pulls data from an external system into Kafka | MySQL → Kafka         |
| **Sink Connector**   | Pushes data from Kafka to an external system  | Kafka → Elasticsearch |



### 🏗️ Architecture Overview

Kafka Connect runs in two modes:

* **Standalone Mode**: For testing or simple use cases (single machine)
* **Distributed Mode**: For production (clustered and scalable)

Each Connect instance can run:

* **Multiple connectors**
* **Multiple tasks** per connector (for parallelism)



### ⚙️ Components

| Component       | Description                                                   |
| --------------- | ------------------------------------------------------------- |
| **Connector**   | High-level abstraction, defines where data comes from/goes to |
| **Task**        | Actual work unit that does the data movement                  |
| **Worker**      | The Kafka Connect process that manages connectors and tasks   |
| **Plugin Path** | Directory containing connector JARs                           |



### 🧰 Example Use Cases

| Use Case                           | Connector                          |
| ---------------------------------- | ---------------------------------- |
| Stream data from MySQL → Kafka     | MySQL Source Connector             |
| Dump Kafka data into Elasticsearch | Elasticsearch Sink Connector       |
| Backup Kafka data to AWS S3        | S3 Sink Connector                  |
| Monitor changes in Postgres (CDC)  | Debezium Postgres Source Connector |



### 📝 Sample Flow

Imagine you want to sync real-time updates from **MySQL** to **Kafka**:

1. Install MySQL Source Connector (like Debezium)
2. Configure JSON file:

   ```json
   {
     "name": "mysql-connector",
     "config": {
       "connector.class": "io.debezium.connector.mysql.MySqlConnector",
       "database.hostname": "localhost",
       "database.user": "root",
       ...
       "database.server.name": "mysql",
       "database.include.list": "salesdb",
       "table.include.list": "salesdb.orders"
     }
   }
   ```
3. Start the connector via REST API:

   ```
   curl -X POST -H "Content-Type: application/json" \
     --data @mysql-source.json \
     http://localhost:8083/connectors
   ```

Now all changes in the `orders` table stream directly to a Kafka topic.



### 📦 Popular Kafka Connectors

| Connector     | Type        | Purpose                                 |
| ------------- | ----------- | --------------------------------------- |
| Debezium      | Source      | CDC from DBs (MySQL, Postgres, MongoDB) |
| JDBC          | Source/Sink | Generic database support                |
| Elasticsearch | Sink        | Search engine/data analytics            |
| Amazon S3     | Sink        | Backup Kafka data                       |
| FileStream    | Source/Sink | Read/write from local files             |



### 📚 When to Use Kafka Connect?

Use Kafka Connect when:

* You want **ETL-like pipelines** without building custom code
* You need **high-throughput data sync**
* You want **out-of-the-box fault tolerance and scalability**
* You need to **plug into enterprise systems (DBs, NoSQL, cloud storage, etc.)**



# Schema Registry and Avro

### 📘 Schema Registry and Avro in Apache Kafka

Apache Kafka is often used to exchange structured data between different systems. To ensure **data consistency, versioning, and compatibility**, two important tools are used:



## 📦 What is Avro?

**Apache Avro** is a **data serialization format** commonly used with Kafka. It allows you to:

* Define a **schema** (structure) for your data
* Serialize/deserialize data efficiently
* Evolve schema over time (add/remove fields)

### ✅ Why Avro?

* **Compact**: Binary format is much smaller than JSON
* **Schema-based**: Helps consumers understand the structure
* **Supports schema evolution**: Compatible changes over time

### 📄 Example Avro Schema

```json
{
  "type": "record",
  "name": "User",
  "fields": [
    {"name": "name", "type": "string"},
    {"name": "age", "type": "int"}
  ]
}
```



## 🧠 What is Schema Registry?

**Confluent Schema Registry** is a separate service that stores and manages **Avro schemas** (also supports Protobuf & JSON Schema).

### 🎯 Purpose

* Validate messages against their schema before sending
* Store **multiple versions** of a schema
* Ensure **backward/forward compatibility**
* Enable Kafka consumers to **auto-discover schema**



## 🧭 How It Works

1. **Producer** serializes data using Avro + schema
2. **Schema is registered** with Schema Registry and assigned an ID
3. Message is sent to Kafka with the schema ID embedded
4. **Consumer** reads the message and retrieves the schema using the ID
5. Data is deserialized using the correct schema



## 🔐 Benefits

| Feature              | Benefit                                         |
| -------------------- | ----------------------------------------------- |
| Central Schema Store | Avoid schema conflicts across teams             |
| Versioning           | Update schema without breaking consumers        |
| Compatibility Checks | Ensure schema changes won’t break existing apps |
| Interoperability     | Allows different teams/languages to work safely |



## 🛠 Kafka + Avro + Schema Registry Stack

| Component               | Tool                                                         |
| ----------------------- | ------------------------------------------------------------ |
| Serialization           | Avro                                                         |
| Schema Storage          | Confluent Schema Registry                                    |
| Kafka Producer/Consumer | `confluent-kafka-python` or `kafka-python` with Avro support |



## 📦 Sample Python Producer with Avro

Using `confluent_kafka`:

```python
from confluent_kafka.avro import AvroProducer
from confluent_kafka.avro.serializer import SerializerError

schema_str = """
{
  "namespace": "example.avro",
  "type": "record",
  "name": "User",
  "fields": [
    {"name": "name", "type": "string"},
    {"name": "age", "type": "int"}
  ]
}
"""

value_schema = avro.loads(schema_str)

producer = AvroProducer(
    {
        'bootstrap.servers': 'localhost:9092',
        'schema.registry.url': 'http://localhost:8081'
    },
    default_value_schema=value_schema
)

producer.produce(topic='users', value={"name": "Alice", "age": 30})
producer.flush()
```



## 🧪 Avro vs JSON in Kafka

| Feature          | Avro | JSON         |
| ---------------- | ---- | ------------ |
| Compact          | ✅    | ❌ (verbose)  |
| Schema Support   | ✅    | ❌ (implicit) |
| Schema Evolution | ✅    | ❌            |
| Performance      | High | Medium       |



# Monitoring Kafka

### 📊 Monitoring Apache Kafka (Beginner to Pro Guide)

Monitoring Kafka is essential to ensure it's running efficiently, detect issues early, and scale smoothly. Here’s a detailed breakdown:



## 🎯 Why Monitor Kafka?

Kafka is a **distributed system**, so failures or performance issues can occur in:

* Brokers
* Producers
* Consumers
* Topics/Partitions

**Monitoring helps with:**

* Latency tracking
* Message throughput
* Broker health
* Consumer lag
* Disk usage
* Cluster availability



## 🔍 What to Monitor?

### 1. **Broker Metrics**

* **Under-replicated partitions** – Critical for reliability
* **Offline partitions** – Data unavailable
* **Request rate/latency** – API performance (produce, fetch)
* **Disk usage** – Log directory filling up
* **Garbage collection (GC)** – JVM performance

### 2. **Producer Metrics**

* **Record send rate**
* **Request latency**
* **Error rate** (timeouts, retries)

### 3. **Consumer Metrics**

* **Consumer lag** – Messages not yet processed
* **Throughput** – Messages per second
* **Commit rate** – Frequency of offset commits

### 4. **Zookeeper Metrics**

* **Session count**
* **Request latency**
* **Ephemeral nodes count**

### 5. **Cluster-Level Health**

* Number of active brokers
* Controller availability
* Partition distribution



## 🛠 Tools for Kafka Monitoring

### 1. **JMX (Java Management Extensions)**

Kafka exposes many metrics via JMX. You can connect JMX with monitoring tools like:

| Tool                         | Features                             |
| ---------------------------- | ------------------------------------ |
| **Prometheus + Grafana**     | Open-source, real-time dashboards    |
| **Confluent Control Center** | Kafka-native, rich UI, alerts        |
| **Datadog**                  | Cloud-based, Kafka plugins           |
| **LinkedIn Cruise Control**  | Cluster rebalancing and monitoring   |
| **Elastic Stack (ELK)**      | Log analysis with Kafka plugins      |
| **Burrow**                   | Specifically tracks **consumer lag** |



## 📊 Example: Prometheus + Grafana Monitoring Stack

### 1. Kafka Exporter (JMX → Prometheus)

Use the [JMX Exporter](https://github.com/prometheus/jmx_exporter) to convert Kafka JMX metrics to Prometheus metrics.

```yaml
# jmx_exporter.yml (Sample config)
rules:
  - pattern: "kafka.server<type=(.+), name=(.+)><>(Count|Value)"
    name: kafka_$1_$2
    type: GAUGE
```

### 2. Start JMX Exporter

```bash
KAFKA_OPTS="-javaagent:/path/jmx_prometheus_javaagent-0.18.0.jar=7071:/path/jmx_exporter.yml" \
bin/kafka-server-start.sh config/server.properties
```

### 3. Grafana Dashboard

* Use Kafka dashboards from [Grafana.com](https://grafana.com/grafana/dashboards/)
* Connect Prometheus as data source



## 🚨 Alerting (Recommended)

Set up alerts in Grafana/Prometheus or any other tool for:

* High consumer lag
* Offline brokers
* Under-replicated partitions
* JVM heap usage > 85%
* Disk usage > 90%



## ✅ Best Practices

| Tip                                            | Why                          |
| ---------------------------------------------- | ---------------------------- |
| Monitor consumer lag per topic                 | Ensures timely processing    |
| Enable log rotation                            | Avoid disk overflow          |
| Monitor GC & heap                              | Prevent broker crashes       |
| Automate alerts                                | Immediate response to issues |
| Track schema errors (if using Schema Registry) | Avoid breaking changes       |



#  Kafka Security Basics (SSL, SASL)

### 🔐 Kafka Security Basics: SSL, SASL Made Simple

Apache Kafka provides robust security mechanisms to ensure secure data transmission, client authentication, and access control. The core pillars of Kafka security are:



## 🛡️ 1. SSL (Secure Sockets Layer)

### 🔍 What is SSL?

SSL/TLS secures communication between Kafka clients (producers/consumers) and brokers by **encrypting** the data.

### 🔐 Features:

* Encrypts data-in-transit
* Prevents MITM (Man-in-the-Middle) attacks
* Can also be used for **client authentication**

### 🔧 How to Enable SSL (Broker Side):

1. Generate a keystore & truststore:

```bash
keytool -genkey -keystore kafka.server.keystore.jks -validity 365 -storepass password -keypass password -dname "CN=localhost"
keytool -export -alias localhost -keystore kafka.server.keystore.jks -file cert-file
keytool -import -alias localhost -file cert-file -keystore kafka.server.truststore.jks
```

2. Update `server.properties`:

```properties
listeners=SSL://localhost:9093
security.inter.broker.protocol=SSL
ssl.keystore.location=/path/kafka.server.keystore.jks
ssl.keystore.password=password
ssl.key.password=password
ssl.truststore.location=/path/kafka.server.truststore.jks
ssl.truststore.password=password
```



## 🔐 2. SASL (Simple Authentication and Security Layer)

### 🔍 What is SASL?

SASL provides **authentication** for Kafka clients using multiple mechanisms:

* **PLAIN** – Simple username/password
* **SCRAM** – Secure hash-based auth (recommended)
* **GSSAPI** – Kerberos-based authentication

### 🧪 Example: SASL/PLAIN (Client + Broker)

#### 🔧 Broker (`server.properties`)

```properties
listeners=SASL_PLAINTEXT://localhost:9094
security.inter.broker.protocol=SASL_PLAINTEXT
sasl.mechanism.inter.broker.protocol=PLAIN
sasl.enabled.mechanisms=PLAIN
```

#### 🔧 JAAS Config (broker)

Create `kafka_server_jaas.conf`:

```ini
KafkaServer {
  org.apache.kafka.common.security.plain.PlainLoginModule required
  username="admin"
  password="admin-secret"
  user_admin="admin-secret"
  user_user1="user1-secret";
};
```

Run Kafka with:

```bash
export KAFKA_OPTS="-Djava.security.auth.login.config=/path/kafka_server_jaas.conf"
```

#### 🔧 Client Properties:

```properties
security.protocol=SASL_PLAINTEXT
sasl.mechanism=PLAIN
```



## 🔏 3. ACLs (Access Control Lists)

You can define **fine-grained permissions** for producers and consumers:

```bash
bin/kafka-acls.sh --authorizer-properties zookeeper.connect=localhost:2181 \
--add --allow-principal User:producer1 --operation Write --topic my-topic
```



## 🔐 Best Practices Summary

| Feature           | Use For                 | Benefit                       |
| ----------------- | ----------------------- | ----------------------------- |
| SSL               | Encryption + Auth       | Secure data-in-transit        |
| SASL/PLAIN        | Authentication          | Simple user-based auth        |
| SASL/SCRAM        | Stronger Auth           | Secure hashed credentials     |
| ACLs              | Authorization           | Access control per topic/user |
| Kerberos (GSSAPI) | Enterprise environments | Centralized identity mgmt     |

