<h1 align="center"> Lesson 2 - Kafka Topology and Components </h1>


As mentioned in Lesson 1, Kafka is a Publish-Subscribe messaging system used with real-time streaming data (in addition to it's ability to process batch data).

In order for Kafka to be able to handle massive volumes of data at scale coming in at rapid velocity, several components are required to setup a robust Kafka system.

As a quick reminder, below is the overall Kafka topology at a high-level:

<p align="center">

<img src= "images/Kafka_Architecture2.png">

</p>


## Main Concepts and Terminology

An __Event__ records the fact that "something happened" in the world or in your business. It is also called record or message in the documentation. When you read or write data to Kafka, you do this in the form of Events. 

Conceptually, an Event has a _Key_, _Value_, _Timestamp_, and optional _Metadata headers_. Here's an example Event:

- Event Key: "Alice"

- Event Value: "Made a payment of $200 to Bob"

- Event Timestamp: "Jun. 25, 2020 at 2:06 p.m."

__Producers__ are those client applications that publish (write) Events to Kafka, and __Consumers__ are those that subscribe to (read and process) these Events. 
In Kafka, Producers and Consumers are fully decoupled and agnostic of each other, which is a key design element to achieve the high scalability that Kafka is known for. For example, Producers never need to wait for Consumers. Kafka provides various guarantees such as the ability to process events exactly-once.

Events are organized and durably stored in __Topics__. Very simplified, a Topic is similar to a folder in a filesystem, and the Events are the files in that folder. An example Topic name could be "payments". Topics in Kafka are always multi-producer and multi-subscriber: a Topic can have zero, one, or many Producers that write Events to it, as well as zero, one, or many Consumers that subscribe to these Events. 

Events in a Topic can be read as often as needed—unlike traditional messaging systems. Events are not deleted after consumption. Instead, the user defines for how long Kafka should retain your Events through a per-Topic configuration setting, after which old Events will be discarded. Kafka's performance is effectively constant with respect to data size, so storing data for a long time is perfectly fine.

When a new Event is published to a Topic, it is actually appended to one of the Topic's __Partitions__. Events with the same Event Key (e.g., a customer or vehicle ID) are written to the same Partition, and Kafka guarantees that any consumer of a given Topic-partition will always read that Partition's Events in exactly the same order as they were written.

Below is a visual representation of what Partitions look like:
<p align="center">
<img src= "images/Kafka_Topics.png" width=600>
</p>

To recap what we've covered so far in Lessons 1 and 2, there are __five main components__ in a Kafka System:

__1.	Broker:__ 
- A Broker is a Kafka node or server which is part of the Kafka system
- A Kafka __Cluster__ is usually composed of multiple Brokers
- Each Broker has a unique ID
- Brokers store the Topic log Partitions
- Brokers handle all requests from Clients (produce, consume, and metadata) and keeps data replicated within the cluster. 
- There can be one or more Brokers in a cluster.

For a video explanation on Brokers, please watch the following:

- [__Brokers Introduction Video__](https://www.youtube.com/embed/jHnyBSUVcOU)


A Broker can be configured and the properties can be updated in the __server.properties__ file.

__Core configurations for a Broker consist of:__
1. `broker.id`
    - This is an integer that must be set as a unique value for each Broker
2. `listeners`
    - This is the address that the socket server listens on (hostname and the port)
3. `advertised.listeners`
    - This is the Hostname and port the Broker will advertise to the Producers and Consumers
4. `log.dirs`
    - This is a comma seperated list of directories under which to store the Log files
5. `num.partitions`
    -  The number of Partitions per Topic
6. `log.retention.hours`
    - The minimum age of a Log file before deletion
7. `zookeeper.connect`
    - A comma seperated host:port pairs each corresponding to a Zookeeper server




__2.	Zookeeper:__
- Kafka uses Zookeeper to manage service discovery for Brokers (e.g. if a new Broker joins, or a Broker dies etc.)
- Zookeeper is part of the Hadoop technology stack (external to the core Kafka components yet required as part of the system) and is responsible for coordinating Kafka with other Big Data components as well.
- Maintains the state of the cluster (Brokers, Topics, Users).


Let's take a look at what Zookeeper configurations look like.  These can be found in the __zookeeper.proerties__ file.

__Core configurations for Zookeeper include:__
1. `dataDir`
    -   The directory where the snapshots will be stored
2. `clientPort`
    -   The Port which clients will use to connect
3. `maxClientCnxns`
    -   Enables/Disables the per-IP limit on the number of connections
4. `admin.enableServer`
    -   Enable/Disables the adminserver to avoid port conflicts



## Excercise
We'll continue the hands-on excercise where we left off in Lesson 1, so make sure that Kafka is downloaded and untarred on your Linux machine.

You'll need to create a new "data" folder which will store the data records.

Within the "data" folder, you'll need to create a "kafka" and "zookeeper" subfolders.  These are required in order to update the Configuration files.

Go ahead and run the below commands:

In [None]:
mkdir data
mkdir data/kafka
mkdir data/zookeeper
ls -al

Now, your output should be similar to:

![](images/kafka-data.png)

Next, we'll need to update the __Zookeeper Properties__ and the __Apache Kafka Log__ files to point to the new data directory we just created.

To do this, type the following commands and update the path in both files.  Make sure to save the files:

In [None]:
cd config
vim zookeeper.properties
vim server.properties

*_Note_: VIM is just one way to access and edit text files in Linux.  You can use the tool of your choice

One final update we need to do is to update the Server Properties file to make it point to the Localhost.  To do this, go to line 31 and uncomment the line.

Next, update line 31 to the following and Save:


In [None]:
listeners=PLAINTEXT://127.0.0.1:9092




The next step is to start the actual Kafka Cluster.

To achieve this, first we need to run __Zookeeper__.

This can be done by using the following command:

In [None]:
bin/zookeeper-server-start.sh config/zookeeper.properties

The __`zookeeper-server-start.sh`__ command does the following:
- Starts the Zookeeper Server
- Takes as an argument the location of the Zookeeper properties file

To read more about the __`zookeeper-server-start.sh`__ command and its various properties, please check the official documentation in the following link:
- __[Zookeeper official documentation](https://zookeeper.apache.org/doc/r3.6.3/zookeeperTools.html)__

*Note: `zookeeper-server-start.sh` and `zkServer.sh` are essentially the same.  The naming differnce is due to the fact that the former was named by the Kafka foundation while the latter is a Zookeeper terminology. 




There should be many items being downloaded which will take a few minutes.

Now, if everything works correctly you should see an output similar to the following:

![](images/kafka-zookeeper.png)


After displaying several updates, the terminal will remain open and the cursor will be blinking.  This is normal and to be expected.  We'll leave this terminal open and continue.

The next step is to run the Kafka Broker.

Open a second Ubuntu terminal session (and make sure to keep the Zookeeper one open) and type the following command:

In [None]:
cd kafka_2.13-3.0.0 
bin/kafka-server-start.sh config/server.properties

The __`kafka-server-start.sh`__ command starts the Kafka server.

It takes the following arguments:
- Server Properties file path
- Override Property Value (optional)
 


If all runs correctly, you should see a long output that looks similar to the following:

![](images/kafka-server.png)


__3.	Topic:__
- A Topic is a category/feed name to which data records are stored and published.
- All Kafka data records are organized into Topics. 
- Producers write data to the Topics and Consumers read data from the Topics
- Data Records plubished in the Cluster remain persisted until a specified Retention Period has passed by.
- Each Topic is divided into _Partitions_, which contain records in an unchangeable sequence. Each record within a Partition is assigned and identified by a unique Offset (ID)

Below is a visual representation of how these different components interact with one another:


<p align="center">
<img src= "images/Kafka Zookeeper Brokers.png" width=600>
</p>

For a video explanation on Topics, please watch the following video:

- [__Topic Introduction Video__](https://www.youtube.com/embed/kj9JH3ZdsBQ)


Now we have both the Zookeeper and Kafka Server running.  This prepares us to start Producing and Consuming messages!

The next step is to open other Ubuntu terminals and to create a Kafka __Topic__ which we'll use to share messages between the local Producer and Consumer.

There are some required parameters such as the __Partition number__ and the __Replication Factor__ along with the __Topic name__ and the __Server details__.

For the time being, we'll keep things simple and limit the number of Partitions to 1 and the Replication Factor to 1 also.

To achieve this, type the following command:

In [None]:
cd kafka_2.13-3.0.0 
bin/kafka-topics.sh --create --topic MyFirstKafkaTopic --partitions 1 --replication-factor 1 --bootstrap-server localhost:9092

The __`kafka-topics.sh`__ command, as the name implies, is used to create and configure Kafka Topics.

You can read more about __`kafka-topics.sh`__ in this link:
- __[Kafka Topic Documentation](https://jaceklaskowski.gitbooks.io/apache-kafka/content/kafka-admin-TopicCommand.html)__

It has many required and optional features which provides flexibility for Data Engineers.

__Required Arguments:__
- `--create`:
    -   Creates a new Topic.  This is required ony the first time we are dealing with a new Topic.
- `--topic`:
    -   The Topic name to create, alter, describe or delete.
- `--partitions`
    -   The number of Partitions for the Topic being created or altered.
- `--replication-factor`:
    -   The replication factor for each Partition in the Topic being created.  This is mandatory if there is no default setup in the cluster itself.
- `--bootstrap-server`:
    -   The Kafka server to connect to.  Localhost:9092 is to be used in case of a local stand-alone environment.

__Important Optional arguments:__
- `alter`:
    -   Alters the number of partitions, replica assignment and/or configuration for the Topic
- `config`:
    -   A Topic configuration override for the Topic being created or altered.  Allows configurations for:
        -   Cleanup policy
        -   Compression type
        -   Delete Retention time
        -   Flushing messages
- `delete`:
    -   Deletes a Topic
- `describe`:
    -   Lists the details for a particualr Topic
- `list`:
    -   Lists all the available Topics






Assuming the above code ran correctly, your output for this command should look something like:

![](images/kafka-topic.png)


__4.  Producer:__ 
- Connect to a Kafka cluster either through Zookeeper or directly to a Kafka Broker
- Sends records (data) to a broker.

For a video explanation on Producers, please watch the following:

- [__Producer Introduction Video__](https://www.youtube.com/embed/I7zm3on_cQQ)

A Producer can be configured and the properties can be updated in the __producer.properties__ file.

__Core configurations for a Producer consist of:__
1. `bootstrap.servers`
    - This is a list of Brokers used for bootstrapping knowledge about the rest of the cluster.
2. `compression.type`
    - Allows specifying the compression codec for all data generated (none, gzip, lz4, zstd)
3. `partitioner.class`
    - Name of the Partitioner class to be used for partitioning events (default is random spreading)
4. `request.timeout.ms`
    - The maximum amount of time the client will wait for the response of a request
5. `buffer.memory`
    -  The total bytes of memory the Producer can use to beffer records waiting to be sent to the server



Now, we need to open two additional terminals (so the total opened Linux terminals will be five).  One terminal will be for the Producer, and the second is for the Consumer.

We will immulate how Kafka operates in Production.  Messages entered into the Producer console will automatically arrive and be displayed in the Consumer console.

To try this, open a new Ubuntu terminal for the __Producer__ by using the following commands:


In [None]:
cd kafka_2.13-3.0.0/bin 
bash kafka-console-producer.sh --topic MyFirstKafkaTopic --bootstrap-server localhost:9092 

The __`kafka-console-producer.sh`__ command is used to initiate the Producer from a user console and provide it with the required configuration.

For further information about this command, check the following link:
- __[Kafka Producer Documentation](https://docs.cloudera.com/runtime/7.2.0/kafka-managing/topics/kafka-manage-cli-producer.html?)__

__Required Arguments:__
- `topic`:
    -   Topic name to which the Producer will send the data to
- `bootstrap-server`:
    -   The Kafka server to connect to


__Important Optional Arguments:__
- `batch-size`:
    -   Number of messages to send in a single batch if they're not being sent synchronously (default is set at 200)
- `compression-code`:
    -   The data compression codec used.  Can be one of the following (default is gzip):
        -   None
        - Gzip
        - Snappy
        - Lz4
        - Zstd
- `max-memory-bytes`:
    -   The total memory used by the Producer to buffer records waiting to be sent to the server
- `max-partitions-memory-bytes`:
    -   The buffer memory size allocated for a Partition.  When data records are received which are smaller than this size, the Producer will attempt to group them together until the specified size is reached
- `property`: A mechanism to pass user-defined properties in the form of Key = Value to the message reader.  This allows custom configurations for a user-defined message reader


__5.	Consumer:__ 
- Consumes batches of records (data) from the broker.
- Consumers can specify both the Topic and Partition from which they will consume data
- There are two types of Consumers:
 
        - Low-level 
        - High-level

For a video explanation on Consumers, please watch the following:


- [__Consumer Introduction Video__](https://www.youtube.com/embed/Z9g4jMQwog0)

A Consumer can be configured and the properties can be updated in the __consumer.properties__ file.

__Core configurations for a Consumer consist of:__
1. `bootstrap.servers`
    - This is a list of Brokers used for bootstrapping knowledge about the rest of the cluster.
2. `group.id`
    - The Consumer group ID
3. `auto.offset.reset`
    - Tells the Consumer what to do when there is no initial offset in Kafka or if the current offset does not exist anymore on the server.  Options include latest, earliest, none


To continue with our hands-on example, the final remaining step is to call the Kafka Consumer.  

To achieve this, in parallel, open a new Ubuntu terminal for the __Consumer__ by using the following commands:

In [None]:
cd kafka_2.13-3.0.0/bin 
bash kafka-console-consumer.sh --topic MyFirstKafkaTopic --from-beginning --bootstrap-server localhost:9092 

The __`kafka-console-consumer.sh`__ command is used to initiate the Consumer, which will then read(consume) the data records from the specified Topic.

__Required Arguments:__
- `topic`:
    -   Topic name from which to consume the data records.
- `bootstrap-server`:
    -   The Kafka server to connect to.

__Important Optional Arguments:__
- `consumer-property`:
    -   A mechansim to pass user-defined properties in the form of Kev=Value to the Consumer.
- `consumer.config`:
    -   The Consumer configuration properties file.  Note that the `consumer-property` settings take precendence over this file.
- `from-beginning`
    -   Tells the Consumer that if it doesn't already have an established Offset to consume from, start with the earliest message present in the log.
- `max-messages`:
    -   The maximum number of messages to consumer before exiting.  If it's not set, consumption is continual.
- `offset`:
    -   A non-negative number representing the Offset to consume data records from.  Can also use 'earlist' or 'latest'.  The default is 'latest'.
- `partition`:
    -   The Partition to consume data records from. The default is for consumption to start from the end of the Partition.

Make sure to check [the documentation](https://docs.cloudera.com/runtime/7.2.10/kafka-managing/topics/kafka-manage-cli-consumer.html) for more detail - get used to it!

Now, with both window terminals side by side, click on the __Producer__ terminal and enter the following JSON data:

`{
  "vehicleId":"0bf45cac-d1b8-4364-a906-980e1c2bdbcb",
  "vehicleType":"Taxi",
  "routeId":"Route-37",
  "longitude":"-95.255615",
  "latitude":"33.49808",
  "timestamp":"2017-10-16 12:31:03",
  "speed":49.0,
  "fuelLevel":38.0
}`

`{
  "vehicleId":"d5fd4b42-3742-11ec-8d3d-0242ac130003",
  "vehicleType":"Bus",
  "routeId":"Route-32",
  "longitude":"-81.615234",
  "latitude":"13.56599",
  "timestamp":"2017-10-17 14:22:03",
  "speed":37.0,
  "fuelLevel":19.0
}`

`{
  "vehicleId":"04be0177-8326-4b59-a15d-19f015c0be63",
  "vehicleType":"Passenger",
  "routeId":"Route-19",
  "longitude":"-15.611331",
  "latitude":"44.59816",
  "timestamp":"2017-10-18 09:07:08",
  "speed":75.0,
  "fuelLevel":48.0
}`

You should see the messages automatically show up on the Consumer terminal similr to the below output:

![](images/kafka-producer-consumer.png)


__Congratulations! You've just successfully completed your first Kafka hands-on excercise!__

# Data Engineer vs. DevOps in Organizations

In the real-world, it is not that common for Data Engineers to create an entirely new Kafka cluster from scratch.

Rather, in large organizations such as Fortune 500 companies, there are multiple teams that are responsible to setup the technology infrastructure and foundation for the Data Engineers to use.

These teams are often called "Infrastructure Operations" or "DevOps" teams.  These teams work with the senior Architects and are the ones responsible for creating the technology roadmap, system design and infrastructure for the rest of the organization.  For example, they will be responsible for creating and maintaining the Hadoop Cluster, Apache Spark environment, Kafka Cluster etc. They are also responsible for backups, disaster recovery, fault tolerance and the stability of the various systems.

The Data Engineering team mainly focuses on system, software, and ETL Development activities.  They will mainly be users of the corporate systems by obtaining access to the various servers and tools.  The Data Engineering team will also generally provide the technical requirements and specifications for the enviornment (such Kafka Topic details, retention period etc.) during the system design process and report any feedback they have of the current system.  

In smaller to mid-size companies, it could be possible that a Data Engineer will also support by creating infrastructure components such as deploying an Apache Spark cluster and connecting it to a Kafka cluster. This is sometimes called a "DataOps" role.

What is important for the Data Engineer to understand is the __configuration properties__ that each component provides and how to edit/update those properties so we're able to use them in the Data Pipeline.   

Here is a brief description of a typical role a DevOps Engineer plays in large organizations:
- __[DevOps Job Description](https://targetjobs.co.uk/careers-advice/job-descriptions/devops-engineer-job-description)__

#  Key Lesson Takeaways

By completing this lesson, you've achieved the following:
- Understood in greater detail the Kafka Topology and how each component interacts with other components.
- Reviewed Kafka Broker concepts and the various configurable parameters available. 
- Reviewed the Kafka Zookeeper concepts and the various configurable parameters available.
- Reviewed the Kafka Topic and Partition concepts and the various configurable parameters available.
- Reviewed the Kafka Producer concepts and the various configurable parameters available.
- Reviewed the Kafka Consumer concepts and the various configurable parameters available.
- Executed Kafka hands-on using a local device, created a new Topic,  and produced/consumed JSON messages in real-time.
- Bonus: Setup a Kafka cluster in an Amazon cloud and went through the various steps to configure and run the system
- Understood the differences between a Data Engineer and a DevOps Engineer job description in corporations.