### Connecting from my host machine to my VM using SSH

SSH Command -> ```ssh {USERNAME}@{VIRTUAL_MACHINE_IP_ADDRESS}``` </br>

Enter passphrase for key '/Users/{USERNAME}/.ssh/id_rsa': ```{password}```</br>

Welcome to Ubuntu 20.04.3 LTS (GNU/Linux 5.4.0-100-generic x86_64) </br>

 * Documentation:  https://help.ubuntu.com
 * Management:     https://landscape.canonical.com
 * Support:        https://ubuntu.com/advantage

### Using linux systemctl to start, stop and check the status of zookeeper and kafka

### Zookeeper

Starting Zookeeper -> ```sudo systemctl start zookeeper```</br>

zookeeper.service
     Loaded: loaded (/etc/systemd/system/zookeeper.service; enabled; vendor preset: enabled)
     Active: active (running) since Sat 2022-03-05 04:57:08 UTC; 4h 21min ago
   Main PID: 19306 (java)
      Tasks: 27 (limit: 2339)

</br>

Stopping Zookeeper -> ```sudo systemctl stop zookeeper```</br>

zookeeper.service
     Loaded: loaded (/etc/systemd/system/zookeeper.service; enabled; vendor preset: enabled)
     Active: failed (Result: exit-code) since Sat 2022-03-05 09:24:52 UTC; 9s ago
    Process: 19306 ExecStart=/opt/kafka/bin/zookeeper-server-start.sh /opt/kafka/config/zookeeper.properties (code=exited, status=143)
    Process: 25422 ExecStop=/opt/kafka/bin/zookeeper-server-stop.sh (code=exited, status=0/SUCCESS)
   Main PID: 19306 (code=exited, status=143)
   
</br>

Checking Zookeeper Status -> ```sudo systemctl status zookeeper```</br>

### Kafka

Starting Kafka -> ```sudo systemctl start kafka``` </br>

kafka.service
     Loaded: loaded (/etc/systemd/system/kafka.service; enabled; vendor preset: enabled)
     Active: active (running) since Sat 2022-03-05 05:12:46 UTC; 4h 9min ago
   Main PID: 22340 (sh)
      Tasks: 71 (limit: 2339)
     Memory: 359.6M


Stopping Kafka --> ```sudo systemctl stop kafka``` </br>

kafka.service
     Loaded: loaded (/etc/systemd/system/kafka.service; enabled; vendor preset: enabled)
     Active: inactive (dead) since Sat 2022-03-05 09:24:52 UTC; 2min 12s ago
    Process: 22340 ExecStart=/bin/sh -c /opt/kafka/bin/kafka-server-start.sh /opt/kafka/config/server.properties > /opt/kafka/kafka.log 2>&1 (code=killed,>
    Process: 25408 ExecStop=/opt/kafka/bin/kafka-server-stop.sh (code=exited, status=0/SUCCESS)
   Main PID: 22340 (code=killed, signal=TERM)

</br>

Checking Kafka Status -> ```sudo systemctl status kafka``` </br>

### Confirming that Zookeeper is listening on port 2182 and Kafka on port 9092
Confirming Zookeeper is listening on port 2182 -> ```sudo netstat –tunlp | grep 2181```</br>
Confirming Kafka is listening on port 9092 -> ```sudo netstat –tunlp | grep 9092```</br>

### Create two new topics

In [1]:
#use confluent kafka to create admin client and two topics

from confluent_kafka.admin import AdminClient, NewTopic

admin_client = AdminClient({
    "bootstrap.servers": "localhost:9092"
})

topic_list = []
topic_list.append(NewTopic("Topic1", 1, 1))
topic_list.append(NewTopic("Topic2", 1, 1))
admin_client.create_topics(topic_list)

{'Topic1': <Future at 0x7f585402c910 state=running>,
 'Topic2': <Future at 0x7f58456261f0 state=running>}

After publishing and consuming the messages I received the log down below from the broker connection. (I'm attaching seperate python scripts since I'm unable to publish messages for both problem 3 and 4 in jupyter notebook since it's single threaded)

got msg ConsumerRecord(topic='Test3', partition=0, offset=32, timestamp=1646496365256, timestamp_type=0, key=None, value=b'test3 message', headers=[], checksum=None, serialized_key_size=-1, serialized_value_size=13, serialized_header_size=-1)
0 32

In [2]:
from kafka import KafkaProducer, KafkaConsumer

topic = 'Test3'
producer = KafkaProducer(bootstrap_servers='localhost:9092')

producer.send(topic, b'test3 message')

<kafka.producer.future.FutureRecordMetadata at 0x7f584443cfd0>

In [None]:
from __future__ import print_function
from kafka import KafkaConsumer, TopicPartition
from kafka.errors import KafkaError
import logging
import sys
from kafka.errors import OffsetOutOfRangeError

logging.basicConfig(level=logging.INFO)
log = logging.getLogger(__name__)

def main2(broker_str, topic):
    try:
        consumer = KafkaConsumer(group_id='my-group', bootstrap_servers=['localhost:9092'], auto_offset_reset="latest")
        consumer.subscribe([topic])
        while True:
            try:
                k_msg = consumer.poll(timeout_ms=200)
            except OffsetOutOfRangeError:
                log.info("Offset out of range")
            else:
                if k_msg:
                    for msgs in list(k_msg.values()):
                        for msg in msgs:
                            print("got msg", str(msg))
                            print(msg.partition, msg.offset)
    except:
        print("executution error")

In [None]:
main2('localhost:9092', 'Test3')

In [3]:
# Create topic to start the producer and consumer

# from confluent_kafka.admin import AdminClient, NewTopic


# admin_client = AdminClient({
#     "bootstrap.servers": "localhost:9092"
# })

# topic_list = []
topic_list.append(NewTopic("Test22", 1, 1))
admin_client.create_topics(topic_list)

{'Topic1': <Future at 0x7f58443db430 state=finished raised KafkaException>,
 'Topic2': <Future at 0x7f58443db4f0 state=finished raised KafkaException>,
 'Test22': <Future at 0x7f58443db5e0 state=finished raised KafkaException>}

#### Producer Code from page 53 of the lecture notes

In order to get the below to work we needed to change the way that we were concatenating the byte to a string for the second argument in the producer.send method.

In [4]:
from kafka import KafkaProducer
import time
topic = 'Test22'
producer = KafkaProducer(bootstrap_servers='localhost:9092')

for batch in range(3):
    print('Starting batch #' + str(batch))
    for i in range(4):
        print('sending message #' + str(i))
        # needed to update code to concatenate a byte and a string
        producer.send(topic, b'test message #'.join([str(i).encode('utf-8')]))
    print('Finished batch #' + str(batch))
    print('Sleeping for 5 seconds ...')
    time.sleep(5)
print('Done sending messages')

Starting batch #0
sending message #0
sending message #1
sending message #2
sending message #3
Finished batch #0
Sleeping for 5 seconds ...
Starting batch #1
sending message #0
sending message #1
sending message #2
sending message #3
Finished batch #1
Sleeping for 5 seconds ...
Starting batch #2
sending message #0
sending message #1
sending message #2
sending message #3
Finished batch #2
Sleeping for 5 seconds ...
Done sending messages


#### Consumer Code from page 54 and 55 of the lecture notes

In order to get the below working I added an except block to end the try/except block and invoked the main function by passing in the broker str as the first argument and the topic as the second.

In [5]:
from __future__ import print_function
from kafka import KafkaConsumer, TopicPartition
from kafka.errors import KafkaError
import logging
import sys
from kafka.errors import OffsetOutOfRangeError

logging.basicConfig(level=logging.INFO)
log = logging.getLogger(__name__)

def main(broker_str, topic):
#topic = "Test22"
    group = "my-group1"
#bootstrap_servers = ['localhost:9092']
    bootstrap_servers = [broker_str]
    print('Topic is: ', topic)
    print('Group is: ', group)
    
    try:
        consumer = KafkaConsumer(group_id=group, bootstrap_servers=bootstrap_servers,auto_offset_reset="latest")
        consumer.subscribe([topic])     
        while True:
    # Process messages
            try:
                k_msg = consumer.poll(timeout_ms=200)
            except OffsetOutOfRangeError:
                log.info("Offset out of range. Seeking to begining")
                # consumer.seek_to_beginning(tp)
                # You can save `consumer.position(tp)` to redis after this,
                # but it will be saved after next message anyway
            else:
                if k_msg:
                    for msgs in list(k_msg.values()):
                        for msg in msgs:
                            print('got msg: ', str(msg))
                            # Process message and increment offset
                            print('partition: ', msg.partition, 'message offset: ', msg.offset)
    except:
        print("execution completed")
        consumer.close()

In [None]:
main('localhost:9092', 'Test22')

INFO:kafka.conn:<BrokerConnection node_id=bootstrap-0 host=localhost:9092 <connecting> [IPv4 ('127.0.0.1', 9092)]>: connecting to localhost:9092 [('127.0.0.1', 9092) IPv4]
INFO:kafka.conn:Probing node bootstrap-0 broker version
INFO:kafka.conn:<BrokerConnection node_id=bootstrap-0 host=localhost:9092 <connecting> [IPv4 ('127.0.0.1', 9092)]>: Connection complete.
INFO:kafka.conn:Broker version identified as 2.5.0
INFO:kafka.conn:Set configuration api_version=(2, 5, 0) to skip auto check_version requests on startup
INFO:kafka.consumer.subscription_state:Updating subscribed topics to: ['Test22']
INFO:kafka.cluster:Group coordinator for my-group1 is BrokerMetadata(nodeId='coordinator-0', host='localhost', port=9092, rack=None)
INFO:kafka.coordinator:Discovered coordinator coordinator-0 for group my-group1
INFO:kafka.coordinator:Starting new heartbeat thread
INFO:kafka.coordinator.consumer:Revoking previously assigned partitions set() for group my-group1
INFO:kafka.conn:<BrokerConnection no

Topic is:  Test22
Group is:  my-group1


INFO:kafka.coordinator:(Re-)joining group my-group1
INFO:kafka.coordinator:Successfully joined group my-group1 with generation 45
INFO:kafka.consumer.subscription_state:Updated partition assignment: [TopicPartition(topic='Test22', partition=0)]
INFO:kafka.coordinator.consumer:Setting newly assigned partitions {TopicPartition(topic='Test22', partition=0)} for group my-group1
INFO:kafka.conn:<BrokerConnection node_id=0 host=localhost:9092 <connecting> [IPv4 ('127.0.0.1', 9092)]>: connecting to localhost:9092 [('127.0.0.1', 9092) IPv4]
INFO:kafka.conn:<BrokerConnection node_id=0 host=localhost:9092 <connecting> [IPv4 ('127.0.0.1', 9092)]>: Connection complete.
