# Kafka with Python
- https://kafka-python.readthedocs.io/en/master/index.html
- https://github.com/dpkp/kafka-python

In [None]:
# !pip install kafka

In [None]:
# !pip install msgpack

In [465]:
from kafka import KafkaProducer
from kafka import KafkaConsumer
from kafka.client import KafkaClient
from kafka import TopicPartition

import json
import msgpack

<br></br><br></br>

## 0. Broker
- topic을 생성한 이후부터 확인 가능

In [300]:
zookeeper_servers = ["13.209.19.6:2181"]
bootstrap_servers = ["13.125.54.81:9092"]

### 0.1 브로커의 상태확인

In [466]:
client = KafkaClient(bootstrap_servers=bootstrap_servers)

In [467]:
client.cluster.brokers()

{BrokerMetadata(nodeId=1, host='ip-172-31-31-106.ap-northeast-2.compute.internal', port=9092, rack=None)}

In [468]:
## 브로커 아이디 입력
client.connected(1)

True

### 0.2 브로커의 정보확인

In [469]:
## 브로커 아이디 입력
metadata = client.cluster.broker_metadata(1)

In [470]:
print(metadata.nodeId)
print(metadata.host)
print(metadata.port)

1
ip-172-31-31-106.ap-northeast-2.compute.internal
9092


### 0.3 브로커의 파티션 확인

In [471]:
## 브로커 아이디 입력
metadata2 = client.cluster.partitions_for_broker(1)
metadata2

{TopicPartition(topic='topic-180812-1', partition=0),
 TopicPartition(topic='topic-180818-1', partition=0),
 TopicPartition(topic='topic-180818-2', partition=0),
 TopicPartition(topic='topic-180818-byte', partition=0)}

### 0.4 브로커의 토픽 확인

In [472]:
metadata3 = client.cluster.topics()
metadata3

{'topic-180812-1', 'topic-180818-1', 'topic-180818-2', 'topic-180818-byte'}

In [474]:
metadata4 = client.cluster.partitions_for_topic("topic-180812-1")
print(metadata4)

{0}


<br></br><br></br>

## 1. Producer
- https://github.com/dpkp/kafka-python/issues/601

### 1.1 producer 객체 생성
- 브로커의 ip주소와 port번호 입력

In [309]:
producer = KafkaProducer(bootstrap_servers=bootstrap_servers)

### 1.2 메시지 전송
- 브로커에게 메시지를 보낼 때는 byte자료형이나 json처럼 직렬화가 가능한 메시지만  전송가능
- int, string 등의 자료형은 직렬화가 불가능하기 때문에 에러 발생
- topic이 없을 경우에는 자동 생성
    - server.properties에 지정된 partition수 만큼 partition 생성
    
    
- 먼저 kafka에 "topic-180818-1"이라는 topic을 만들어준 후, 실습을 진행
    - replication factor = 1
    - partitions = 3

In [310]:
# 에러발생
# producer.send("topic-180818-1", 'error')

In [311]:
for i in range(4) :
    producer.send("topic-180818-byte", b'byte-msg-%d'%i)

In [312]:
for c in "5678" :
    producer.send("topic-180818-1", json.dumps("msg-{}".format(c)).encode('utf-8'))

### 1.3 직렬화 설정
- 객체를 생성할 때 직렬화 방식을 선택할 수 있음
- 직렬화란?
    - 객체의 내용을 바이트 단위로 변환하여 파일 또는 네트워크를 통해서 스트림(송수신)이 가능하게 하는것을 의미
    - 잡한 객체의 내용을 저장/복원하거나, 네트워크로 전송하기 위해서는 객체의 멤버변수의 각 내용을 일정한 형식으로 만들어 전송

In [313]:
producer = KafkaProducer(bootstrap_servers=bootstrap_servers, 
                         value_serializer=lambda v: json.dumps(v).encode('utf-8'))

In [314]:
for i in range(9,13) :
    producer.send("topic-180818-1", 'seriazlized-msg-{}'.format(i))

In [315]:
#producer = KafkaProducer(bootstrap_servers=bootstrap_servers, 
#                         value_serializer=msgpack.dumps)

In [316]:
#producer.send('topic-180818-key-value./binka', {'key1': 'value1'})

### 1.4 acks
- acks : kafka가 producer에 메시지 write에 대한 결과를 보내기 위해 필요한 승인(복사) 횟수
    - acks = 0 : 결과에 상관 없이 다음 메시지 전송
    - acks = 1 : 리더에게만 write가 되면 다음 메시지 전송
    - acks = all : ISR의 팔로워 한 대에 복사본이 만들어지면 다음 메시지 전송

In [317]:
producer = KafkaProducer(bootstrap_servers=bootstrap_servers, 
                         acks=1,
                         value_serializer=lambda v: json.dumps(v).encode('utf-8'))

In [318]:
for i in range(13,17) :
    producer.send("topic-180818-1", 'acks-msg-{}'.format(i))

### 1.5 key
- key를 지정하면 같은 key를 가진 데이터는 해당 topic의 같은 partition으로 넘어감

In [319]:
for i in range(17,21) :
    if i%2 == 1 :
        producer.send("topic-180818-1", 
                      key='1', 
                      value='key1-msg-{}'.format(i))
    else :
        producer.send("topic-180818-1", 
                      key='2', 
                      value='key2-msg-{}'.format(i))

### 1.6 동기 전송과 비동기 전송
- 전송결과를 확인
    - 동기 전송 : 전송결과가 도착하면, 다음 메시지 전송
    - 비동기 전송 : 전송결과에 상관없이 다음 메시지 전송

#### 1.6.1 메타데이터(전송결과) 확인

In [320]:
metadata = producer.send("topic-180818-1", 'sync msg')

In [321]:
print(metadata.get())
print("")

print(metadata.get().topic)
print(metadata.get().partition)
print(metadata.get().offset)

RecordMetadata(topic='topic-180818-1', partition=0, topic_partition=TopicPartition(topic='topic-180818-1', partition=0), offset=12, timestamp=-1, checksum=1620915337, serialized_key_size=-1, serialized_value_size=10)

topic-180818-1
0
12


#### 1.6.2 동기전송

In [322]:
producer = KafkaProducer(bootstrap_servers=bootstrap_servers, 
                         acks=1,
                         retries=3,
                         value_serializer=lambda v: json.dumps(v).encode('utf-8'))

In [323]:
for i in range(4) :
    metadata = producer.send("topic-180818-1", "sync msg-{}".format(i))
    
    try:
        result = metadata.get()
        print("Success", i)
        print(result.topic)
        print(result.partition)
        print(result.offset)
        print("")
    except KafkaError:
        print("Failure", i)
        log.exception()
        print("")

Success 0
topic-180818-1
0
13

Success 1
topic-180818-1
0
14

Success 2
topic-180818-1
0
15

Success 3
topic-180818-1
0
16



#### 1.6.3 비동기전송
- acks=0

In [324]:
producer = KafkaProducer(bootstrap_servers=bootstrap_servers, 
                         acks=0,
                         value_serializer=lambda v: json.dumps(v).encode('utf-8'))

In [325]:
def on_send_success(record_metadata):
    print("Success")
    print(record_metadata.topic)
    print(record_metadata.partition)
    print(record_metadata.offset)
    print("")

def on_send_error(excp):
    print("Failure")
    log.error('error', exc_info=excp)
    print("")

In [326]:
for _ in range(10):
    producer.send("topic-180818-1", 'async msg-{}'.format(_))

In [327]:
producer.send("topic-180818-1", 'asnync with callback').add_callback(on_send_success).add_errback(on_send_error)

<kafka.producer.future.FutureRecordMetadata at 0x7f4b9a96de80>

Success
topic-180818-1
0
-1



### 1.7 여러 개의 topic으로 메시지 보내기

In [328]:
for i in range(5):
    producer.send('topic-180818-1', 'producer to two topic msg-{}'.format(i))
    producer.send('topic-180818-2', 'producer to two topic msg-{}'.format(i))

### 1.8 한 토픽에 여러 producer가 메시지 보내기

In [329]:
producer1 = KafkaProducer(bootstrap_servers=bootstrap_servers, 
                          value_serializer=lambda v: json.dumps(v).encode('utf-8'))

producer2 = KafkaProducer(bootstrap_servers=bootstrap_servers, 
                          value_serializer=lambda v: json.dumps(v).encode('utf-8'))

In [330]:
meta1 = producer1.send('topic-180818-1', 'producer1 msg')
meta2 = producer2.send('topic-180818-1', 'producer2 msg')

In [331]:
print(meta1.get())
print("")
print(meta2.get())

RecordMetadata(topic='topic-180818-1', partition=0, topic_partition=TopicPartition(topic='topic-180818-1', partition=0), offset=34, timestamp=-1, checksum=1380428995, serialized_key_size=-1, serialized_value_size=15)

RecordMetadata(topic='topic-180818-1', partition=0, topic_partition=TopicPartition(topic='topic-180818-1', partition=0), offset=33, timestamp=-1, checksum=-724312467, serialized_key_size=-1, serialized_value_size=15)


### 1.9 topic의 partition 확인

In [332]:
producer.partitions_for("topic-180818-1")

{0}

In [333]:
producer.partitions_for("topic-180818-2")

{0}

### 1.10 producer의 환경 설정 확인
- 자유롭게 설정 가능

In [334]:
producer.config

{'acks': 0,
 'api_version': (0, 11, 0),
 'api_version_auto_timeout_ms': 2000,
 'batch_size': 16384,
 'bootstrap_servers': ['13.125.54.81:9092'],
 'buffer_memory': 33554432,
 'client_id': 'kafka-python-producer-30',
 'compression_type': None,
 'connections_max_idle_ms': 540000,
 'key_serializer': None,
 'linger_ms': 0,
 'max_block_ms': 60000,
 'max_in_flight_requests_per_connection': 5,
 'max_request_size': 1048576,
 'metadata_max_age_ms': 300000,
 'metric_reporters': [],
 'metrics_num_samples': 2,
 'metrics_sample_window_ms': 30000,
 'partitioner': <kafka.partitioner.default.DefaultPartitioner at 0x7f4bc0142160>,
 'receive_buffer_bytes': None,
 'reconnect_backoff_max': 1000,
 'reconnect_backoff_ms': 50,
 'request_timeout_ms': 30000,
 'retries': 0,
 'retry_backoff_ms': 100,
 'sasl_mechanism': None,
 'sasl_plain_password': None,
 'sasl_plain_username': None,
 'security_protocol': 'PLAINTEXT',
 'selector': selectors.EpollSelector,
 'send_buffer_bytes': None,
 'socket_options': [(6, 1, 1)]

### 1.7 버퍼가 채워지지 않더라도 메시지 보내기

In [335]:
#producer.flush()

<br></br><br></br>

## Consumer

### 1.1 consumer객체 생성
- 브로커의 ip주소와 port번호 입력
- consumer group id 설정
- topic 설정

In [357]:
consumer_byte = KafkaConsumer("topic-180818-byte",
                              bootstrap_servers=bootstrap_servers, 
                              group_id = None,
                              enable_auto_commit=True,
                              auto_offset_reset='earliest',
                              consumer_timeout_ms = 5000
                             )

### 1.2 메시지 가져오기
- 브로커에게 메시지를 보낼 때는 byte자료형이나 json처럼 직렬화가 가능한 메시지만 전송가능
- int, string 등의 자료형은 직렬화가 불가능하기 때문에 에러 발생

In [358]:
try : 
    for msg in consumer_byte :
        print(msg.value.decode("utf-8"))
        print(msg.topic, msg.partition, msg.offset, msg.key)
        print("")
    
except :
    consumer.close()
    print("finished --- 1")
    
finally :
    consumer.close()
    print("finished --- 2")

msg-0
topic-180818-byte 0 0 None

msg-1
topic-180818-byte 0 1 None

msg-2
topic-180818-byte 0 2 None

msg-3
topic-180818-byte 0 3 None

finished --- 2


In [359]:
consumer = KafkaConsumer("topic-180818-1",
                         bootstrap_servers=bootstrap_servers, 
                         group_id = None,
                         enable_auto_commit=True,
                         auto_offset_reset='earliest',
                         consumer_timeout_ms = 5000
                        )

In [360]:
try : 
    for msg in consumer :
        print(msg.value.decode("utf-8"))
        print(msg.topic)
        print(msg.partition)
        print(msg.offset)
        print(msg.key)
        print("")
    
except :
    print("finished --- 1")
    consumer.close()
    
finally :
    print("finished --- 2")
    consumer.close()

"byte-msg-5"
topic-180818-1
0
0
None

"byte-msg-6"
topic-180818-1
0
1
None

"byte-msg-7"
topic-180818-1
0
2
None

"byte-msg-8"
topic-180818-1
0
3
None

"seriazlized-msg-9"
topic-180818-1
0
4
None

"seriazlized-msg-10"
topic-180818-1
0
5
None

"seriazlized-msg-11"
topic-180818-1
0
6
None

"seriazlized-msg-12"
topic-180818-1
0
7
None

"acks-msg-13"
topic-180818-1
0
8
None

"acks-msg-14"
topic-180818-1
0
9
None

"acks-msg-15"
topic-180818-1
0
10
None

"acks-msg-16"
topic-180818-1
0
11
None

"sync msg"
topic-180818-1
0
12
None

"sync msg-0"
topic-180818-1
0
13
None

"sync msg-1"
topic-180818-1
0
14
None

"sync msg-2"
topic-180818-1
0
15
None

"sync msg-3"
topic-180818-1
0
16
None

"async msg-0"
topic-180818-1
0
17
None

"async msg-1"
topic-180818-1
0
18
None

"async msg-2"
topic-180818-1
0
19
None

"async msg-3"
topic-180818-1
0
20
None

"async msg-4"
topic-180818-1
0
21
None

"async msg-5"
topic-180818-1
0
22
None

"async msg-6"
topic-180818-1
0
23
None

"async msg-7"
topic-180818-1
0
24


### 1.3 여러 topic으로부터 메시지 가져오기

In [364]:
consumer = KafkaConsumer("topic-180818-1", "topic-180818-2",
                         bootstrap_servers=bootstrap_servers, 
                         group_id = None,
                         enable_auto_commit=True,
                         auto_offset_reset='earliest',
                         consumer_timeout_ms = 5000
                        )

In [365]:
for msg in consumer :
    print(msg.value.decode("utf-8"))
    print(msg.topic, msg.partition, msg.offset, msg.key)
    print("")
    
consumer.close()

"producer to two topic msg-0"
topic-180818-2 0 0 None

"producer to two topic msg-1"
topic-180818-2 0 1 None

"producer to two topic msg-2"
topic-180818-2 0 2 None

"producer to two topic msg-3"
topic-180818-2 0 3 None

"producer to two topic msg-4"
topic-180818-2 0 4 None

"byte-msg-5"
topic-180818-1 0 0 None

"byte-msg-6"
topic-180818-1 0 1 None

"byte-msg-7"
topic-180818-1 0 2 None

"byte-msg-8"
topic-180818-1 0 3 None

"seriazlized-msg-9"
topic-180818-1 0 4 None

"seriazlized-msg-10"
topic-180818-1 0 5 None

"seriazlized-msg-11"
topic-180818-1 0 6 None

"seriazlized-msg-12"
topic-180818-1 0 7 None

"acks-msg-13"
topic-180818-1 0 8 None

"acks-msg-14"
topic-180818-1 0 9 None

"acks-msg-15"
topic-180818-1 0 10 None

"acks-msg-16"
topic-180818-1 0 11 None

"sync msg"
topic-180818-1 0 12 None

"sync msg-0"
topic-180818-1 0 13 None

"sync msg-1"
topic-180818-1 0 14 None

"sync msg-2"
topic-180818-1 0 15 None

"sync msg-3"
topic-180818-1 0 16 None

"async msg-0"
topic-180818-1 0 17 None


### 1.4 현재 구독하는 topic 확인

In [363]:
consumer.subscription()

{'topic-180818-1', 'topic-180818-2', 'topic-180818-byte'}

### 1.5 구독하는 topic 추가
- Consumer를 생성할 때, 객체 생성을 해주지 않았을 경우에 사용

In [370]:
consumer = KafkaConsumer(bootstrap_servers=bootstrap_servers, 
                         group_id = None,
                         enable_auto_commit=True,
                         auto_offset_reset='earliest',
                         consumer_timeout_ms = 5000
                        )

In [371]:
consumer.subscribe("topic-180818-byte")
consumer.subscription()

{'topic-180818-byte'}

In [372]:
for msg in consumer :
    print(msg.value.decode("utf-8"))
    print(msg.topic, msg.partition, msg.offset, msg.key)
    print("")
    
consumer.close()

msg-0
topic-180818-byte 0 0 None

msg-1
topic-180818-byte 0 1 None

msg-2
topic-180818-byte 0 2 None

msg-3
topic-180818-byte 0 3 None



### 1.6 여러 컨슈머가 한 토픽으로 부터 메시지 가져오기

In [490]:
consumer1 = KafkaConsumer("topic-180818-1",
                          bootstrap_servers=bootstrap_servers, 
                          group_id = None,
                          enable_auto_commit=True,
                          auto_offset_reset='earliest',
                          consumer_timeout_ms = 5000
                         )

consumer2 = KafkaConsumer("topic-180818-1",
                          bootstrap_servers=bootstrap_servers, 
                          group_id = None,
                          enable_auto_commit=True,
                          auto_offset_reset='earliest',
                          consumer_timeout_ms = 5000
                         )

In [383]:
print("-----Consuemr 1-----")
for msg in consumer1 :
    print(msg.value.decode("utf-8"))
    print(msg.topic, msg.partition, msg.offset, msg.key)
    print("")

consumer1.close()

print("\n\n")
print("-----Consuemr 2-----")
for msg in consumer2 :
    print(msg.value.decode("utf-8"))
    print(msg.topic, msg.partition, msg.offset, msg.key)
    print("")

consumer2.close()

-----Consuemr 1-----
"byte-msg-5"
topic-180818-1 0 0 None

"byte-msg-6"
topic-180818-1 0 1 None

"byte-msg-7"
topic-180818-1 0 2 None

"byte-msg-8"
topic-180818-1 0 3 None

"seriazlized-msg-9"
topic-180818-1 0 4 None

"seriazlized-msg-10"
topic-180818-1 0 5 None

"seriazlized-msg-11"
topic-180818-1 0 6 None

"seriazlized-msg-12"
topic-180818-1 0 7 None

"acks-msg-13"
topic-180818-1 0 8 None

"acks-msg-14"
topic-180818-1 0 9 None

"acks-msg-15"
topic-180818-1 0 10 None

"acks-msg-16"
topic-180818-1 0 11 None

"sync msg"
topic-180818-1 0 12 None

"sync msg-0"
topic-180818-1 0 13 None

"sync msg-1"
topic-180818-1 0 14 None

"sync msg-2"
topic-180818-1 0 15 None

"sync msg-3"
topic-180818-1 0 16 None

"async msg-0"
topic-180818-1 0 17 None

"async msg-1"
topic-180818-1 0 18 None

"async msg-2"
topic-180818-1 0 19 None

"async msg-3"
topic-180818-1 0 20 None

"async msg-4"
topic-180818-1 0 21 None

"async msg-5"
topic-180818-1 0 22 None

"async msg-6"
topic-180818-1 0 23 None

"async msg-7"

### 1.7 컨슈머에게 읽어야하는 토픽과 partition 배정하기
- 특정 토픽에 대하여 원하는 파티션만 읽기

In [462]:
consumer = KafkaConsumer(bootstrap_servers=bootstrap_servers, 
                          group_id = None,
                          enable_auto_commit=True,
                          auto_offset_reset='earliest',
                          consumer_timeout_ms = 5000
                         )

In [463]:
consumer.assign([TopicPartition("topic-180818-2", 0), TopicPartition("topic-180818-byte", 0)])

In [464]:
for msg in consumer :
    print(msg.value.decode("utf-8"))
    print(msg.topic, msg.partition, msg.offset, msg.key)
    print("")

consumer.close()

msg-0
topic-180818-byte 0 0 None

msg-1
topic-180818-byte 0 1 None

msg-2
topic-180818-byte 0 2 None

msg-3
topic-180818-byte 0 3 None

"producer to two topic msg-0"
topic-180818-2 0 0 None

"producer to two topic msg-1"
topic-180818-2 0 1 None

"producer to two topic msg-2"
topic-180818-2 0 2 None

"producer to two topic msg-3"
topic-180818-2 0 3 None

"producer to two topic msg-4"
topic-180818-2 0 4 None



### 1.8 원하는 파티션의 특정 offset만 확인하기
- 지정된 offset 이후의 메시지만 확인가능

In [578]:
consumer = KafkaConsumer(bootstrap_servers=bootstrap_servers, 
                         group_id = None,
                         enable_auto_commit=True,
                         consumer_timeout_ms = 5000,
                         auto_offset_reset='smallest'
                        )

In [579]:
consumer.assign([TopicPartition("topic-180818-2", 0), TopicPartition("topic-180818-byte", 0)])

In [580]:
consumer.seek(TopicPartition("topic-180818-2", 0), 4)

In [581]:
for msg in consumer :
    print(msg.value.decode("utf-8"))
    print(msg.topic, msg.partition, msg.offset, msg.key)
    print("")

consumer.close()

msg-0
topic-180818-byte 0 0 None

msg-1
topic-180818-byte 0 1 None

msg-2
topic-180818-byte 0 2 None

msg-3
topic-180818-byte 0 3 None

"producer to two topic msg-4"
topic-180818-2 0 4 None



### 1.9 원하는 파티션의 처음부터 읽기

In [560]:
consumer = KafkaConsumer(bootstrap_servers=bootstrap_servers, 
                         group_id = None,
                         enable_auto_commit=True,
                         consumer_timeout_ms = 5000,
                         auto_offset_reset='smallest'
                        )

In [561]:
consumer.assign([TopicPartition("topic-180818-2", 0), TopicPartition("topic-180818-byte", 0)])

In [562]:
consumer.seek_to_beginning(TopicPartition("topic-180818-2", 0))

In [563]:
for msg in consumer :
    print(msg.value.decode("utf-8"))
    print(msg.topic, msg.partition, msg.offset, msg.key)
    print("")

consumer.close()

"producer to two topic msg-0"
topic-180818-2 0 0 None

"producer to two topic msg-1"
topic-180818-2 0 1 None

"producer to two topic msg-2"
topic-180818-2 0 2 None

"producer to two topic msg-3"
topic-180818-2 0 3 None

"producer to two topic msg-4"
topic-180818-2 0 4 None

msg-0
topic-180818-byte 0 0 None

msg-1
topic-180818-byte 0 1 None

msg-2
topic-180818-byte 0 2 None

msg-3
topic-180818-byte 0 3 None



### 1.10 원하는 파티션의 마지막부터 읽기
- 해당 파티션은 모두 읽었다고 처리하는 것

In [574]:
consumer = KafkaConsumer(bootstrap_servers=bootstrap_servers, 
                         group_id = None,
                         enable_auto_commit=True,
                         consumer_timeout_ms = 5000,
                         auto_offset_reset='smallest'
                        )

In [575]:
consumer.assign([TopicPartition("topic-180818-2", 0), TopicPartition("topic-180818-byte", 0)])

In [576]:
consumer.seek_to_end(TopicPartition("topic-180818-byte", 0))

In [577]:
for msg in consumer :
    print(msg.value.decode("utf-8"))
    print(msg.topic, msg.partition, msg.offset, msg.key)
    print("")

consumer.close()

"producer to two topic msg-0"
topic-180818-2 0 0 None

"producer to two topic msg-1"
topic-180818-2 0 1 None

"producer to two topic msg-2"
topic-180818-2 0 2 None

"producer to two topic msg-3"
topic-180818-2 0 3 None

"producer to two topic msg-4"
topic-180818-2 0 4 None



### 1.9 토픽 파티션의 offset 확인하기
- 토픽의 파티션 정보
- 지정된 시간 이후의 가장 빠른 offset
- 해당 partition의 마지막 offset
- 현재 컨슈머가 commit한 offet 위치

In [508]:
consumer = KafkaConsumer("topic-180818-2",
                         bootstrap_servers=bootstrap_servers, 
                         group_id = None,
                         enable_auto_commit=True,
                         auto_offset_reset='earliest',
                         consumer_timeout_ms = 5000
                        )

In [514]:
# 토픽의 파티션 정보
consumer.partitions_for_topic("topic-180818-2")

{0}

In [515]:
# 지정된 시간 이후의 가장 빠른 offset
consumer.offsets_for_times({TopicPartition("topic-180818-2", 0) :1000})

{TopicPartition(topic='topic-180818-2', partition=0): OffsetAndTimestamp(offset=0, timestamp=1534532425705)}

In [516]:
# 해당 partition의 마지막 offset
consumer.end_offsets([TopicPartition("topic-180818-2", 0)])

{TopicPartition(topic='topic-180818-2', partition=0): 5}

In [517]:
# 현재 컨슈머가 commit한 offet 위치
consumer.position(TopicPartition("topic-180818-2", 0))

0

### 1.10 토픽 commit하기

In [None]:
#consumer.commit()

### 1.11 환경설정 확인하기

In [390]:
consumer.config

{'api_version': (0, 11, 0),
 'api_version_auto_timeout_ms': 2000,
 'auto_commit_interval_ms': 5000,
 'auto_offset_reset': 'earliest',
 'bootstrap_servers': ['13.125.54.81:9092'],
 'check_crcs': True,
 'client_id': 'kafka-python-1.3.5',
 'connections_max_idle_ms': 540000,
 'consumer_timeout_ms': 5000,
 'default_offset_commit_callback': <function kafka.consumer.group.KafkaConsumer.<lambda>>,
 'enable_auto_commit': True,
 'exclude_internal_topics': True,
 'fetch_max_bytes': 52428800,
 'fetch_max_wait_ms': 500,
 'fetch_min_bytes': 1,
 'group_id': None,
 'heartbeat_interval_ms': 3000,
 'key_deserializer': None,
 'max_in_flight_requests_per_connection': 5,
 'max_partition_fetch_bytes': 1048576,
 'max_poll_records': 500,
 'metadata_max_age_ms': 300000,
 'metric_group_prefix': 'consumer',
 'metric_reporters': [],
 'metrics_num_samples': 2,
 'metrics_sample_window_ms': 30000,
 'partition_assignment_strategy': (kafka.coordinator.assignors.range.RangePartitionAssignor,
  kafka.coordinator.assigno