Properties ๊ฐ์ฒด๋ฅผ ์์ฑํ์ฌ Kafka ์๋ฒ์ ๊ด๋ จ๋ ์ค์ ์ ํ๋ค.
Kafka ํด๋ฌ์คํฐ์ ์ฐ๊ฒฐํ๊ธฐ ์ํ ์ฃผ์๋ค.
์ฌ๊ธฐ์๋ "localhost:9092"๋ก ๋ก์ปฌ์์ ์คํ ์ค์ธ Kafka ์๋ฒ์ ์ฐ๊ฒฐํ๋ค.
Consumer Group์ ID๋ฅผ ์ค์ ํ๋ค.
Consumer Group์ ์ฌ๋ฌ Consumer ์ธ์คํด์ค๋ฅผ ๋ฌถ์ด ์ฒ๋ฆฌ๋์ ํ์ฅํ๊ฑฐ๋ ๋ฉ์์ง๋ฅผ ๋ถ๋ฐฐํ๋ ๋ฐ ์ฌ์ฉ๋๋ค.
Kafka ๋ฉ์์ง๋ ํค์ ๊ฐ์ ์์ผ๋ก ๊ตฌ์ฑ๋๋ค.
์ด๋ค์ Kafka๋ก๋ถํฐ ๋ฉ์์ง๋ฅผ ์์ ํ ๋ ์ด๋ป๊ฒ ์ง๋ ฌํ ํด์ ํ ์ง ๊ฒฐ์ .
์์์๋ ๋ฌธ์์ด๋ก ์ง๋ ฌํ ํด์ ํ๋ StringDeserializer๋ฅผ ์ฌ์ฉ.
์์์ ์ ์ํ properties๋ฅผ ์ฌ์ฉํ์ฌ KafkaConsumer ๊ฐ์ฒด๋ฅผ ์์ฑํ๋ค.
Consumer๊ฐ ๋ฉ์์ง๋ฅผ ์์ ํ ํ ํฝ์ ๊ตฌ๋ ํ๋ค. ์ ์์ ์์๋ "simple"์ด๋ผ๋ ํ ํฝ์ ๊ตฌ๋ ํ๋ค.
while ๋ฃจํ ๋ด์์ consumer.poll() ๋ฉ์๋๋ฅผ ์ฌ์ฉํ์ฌ Kafka๋ก๋ถํฐ ๋ฉ์์ง๋ฅผ ์์ ํ๋ค.
poll() ๋ฉ์๋๋ ์ง์ ๋ ์๊ฐ ๋์ ๋ฉ์์ง๋ฅผ ๋๊ธฐํ๋ฉฐ, ์ฌ๊ธฐ์๋ 100๋ฐ๋ฆฌ์ด ๋์ ๋๊ธฐํ๋ค.
์์ ๋ ๊ฐ ๋ฉ์์ง(๋ ์ฝ๋)์ ๋ํด, ๊ทธ ๋ด์ฉ์ ์ถ๋ ฅํ๋ค. ์ฌ๊ธฐ์๋ ๋ฉ์์ง์ ๊ฐ, ํ ํฝ, ํํฐ์ ๋ฒํธ, ์คํ์ ์ ์ถ๋ ฅํ๋ค.
์์ ์ด ๋๋๋ฉด Consumer๋ฅผ ์ข ๋ฃํ๋ค.
์ด ์ฝ๋๋ Kafka์์ ๋ฉ์์ง๋ฅผ ์์ ํ๋ ๊ธฐ๋ณธ์ ์ธ ๋ฐฉ๋ฒ์ ๋ณด์ฌ์ค๋ค.
Consumer Group A ๋ด์๋ Consumer1 ํ๋๋ง ์กด์ฌํ๋ฉฐ, ์ด Consumer๋ ๋ ๊ฐ์ ํํฐ์ ๋ชจ๋์์ ๋ฉ์์ง๋ฅผ ์์ ํ๊ณ ์๋ค.
ํํฐ์ ์ ์ด์ ๋ Consumer ์ฌ์ด์ ๊ท ๋ฑํ๊ฒ ๋ถ๋ฐฐ๋๋ค. Consumer1์ topic1 - p0 ํํฐ์ ์์, Consumer2๋ topic1 - p1 ํํฐ์ ์์ ๋ฉ์์ง๋ฅผ ์์ ํ๋ค.
์ฃผ์ํ ์ ์ ํ์ฌ ํ ํฝ topic1์๋ ํํฐ์ ์ด 2๊ฐ๋ง ์๊ธฐ ๋๋ฌธ์, ํ Consumer๋ ํํฐ์ ์ ํ ๋น๋ฐ์ง ๋ชปํ๋ค. ๋ฐ๋ผ์ Consumer1๊ณผ Consumer2๋ ๊ฐ๊ฐ ํ๋์ ํํฐ์ ์์ ๋ฉ์์ง๋ฅผ ๊ณ์ ์์ ํ๋ฉฐ, Consumer3๋ ํํฐ์ ์ ํ ๋น๋ฐ์ง ๋ชปํด ๋ฉ์์ง๋ฅผ ์์ ํ์ง ์๋๋ค.
์ ์ด๋ฏธ์ง๋ ์นดํ์นด Consumer์ ๋์ ์๋ฆฌ ์ค, ์คํ์ ๊ด๋ฆฌ์ ๋ฉ์์ง ๊ฐ์ ธ์ค๊ธฐ์ ๊ณผ์ ์ ๊ฐ๋ตํ ๋ํ๋ด๊ณ ์๋ค.
poll ํธ์ถ์ ํตํด ๋ฉ์์ง๋ฅผ ๊ฐ์ ธ์ฌ ๋๋ง๋ค, Consumer๋ ์ด๋๊น์ง ๋ฉ์์ง๋ฅผ ์ฝ์๋์ง(์ด์ ์ปค๋ฐ ์คํ์ ), ๊ทธ๋ฆฌ๊ณ ์ด๋๊น์ง ๋ฉ์์ง๋ฅผ ์ฝ์ ์ ์๋์ง(์คํ์ ์ปค๋ฐ)๋ฅผ ์ ๋ฐ์ดํธํ๋ฉฐ ๋์ํ๋ค.
์ ์ด๋ฏธ์ง๋ ์นดํ์นด(Kafka)์์์ Consumer ๋์ ๋ฐฉ์ ์ค ์คํ์ ์ด๊ธฐํ ์ ๋ต(auto.offset.reset ์์ฑ)์ ๊ดํ ๋ด์ฉ์ ์ค๋ช .
๊ฐ์ฅ ์ด๊ธฐ์ ์คํ์ ๋ถํฐ ๋ฉ์์ง๋ฅผ ์์ํ์ฌ ๊ฐ์ ธ์จ๋ค. ์ ์ด๋ฏธ์ง์์๋ 0์ด๋ผ๋ ์์น๋ก ํ์๋์ด ์๋ค. ์ด ์ค์ ์ Consumer๊ฐ ์ฒ์ ์์๋ ๋๋, ์ ์ฅ๋ ์คํ์ ์ด ๋ ์ด์ ์ ํจํ์ง ์์ ๊ฒฝ์ฐ(์: ํด๋น ์คํ์ ์ ๋ฉ์์ง๊ฐ ์ด๋ฏธ ์ญ์ ๋ ๊ฒฝ์ฐ) ์ฌ์ฉ๋๋ค.
๊ฐ์ฅ ์ต๊ทผ์ ์คํ์ ๋ถํฐ ๋ฉ์์ง๋ฅผ ๊ฐ์ ธ์จ๋ค. ์ด๋ Consumer๊ฐ ๋์ ์ค์ผ ๋ ์๋ก์ด ๋ฉ์์ง๋ง ๊ฐ์ ธ์ค๊ณ ์ถ์ ๋ ์ฌ์ฉํ๋ค.
์ด ์ค์ ์ ์ ์ฅ๋ ์คํ์ ์ด ์์ ๋ ์์ธ๋ฅผ ๋ฐ์์ํต๋ค. ์ฆ, ์ ์ฅ๋ ์คํ์ ์ ๋ณด๊ฐ ์๊ฑฐ๋ ์ ํจํ์ง ์์ ๊ฒฝ์ฐ Consumer๋ ๋์์ ์ค์งํ๊ฒ ๋๋ค.
์ ์ค์ ์ Consumer๊ฐ ๋ฉ์์ง๋ฅผ ๊ฐ์ ธ์ค๊ธฐ ์์ํ๋ ์์น๋ฅผ ๊ฒฐ์ ํ๋ ์ค์ํ ์ญํ ์ ํ๋ค. ์๋ฅผ ๋ค์ด, earliest๋ฅผ ์ค์ ํ๋ฉด Consumer๋ ํ ํฝ์ ์์๋ถํฐ ๋ชจ๋ ๋ฉ์์ง๋ฅผ ๊ฐ์ ธ์ค๊ฒ ๋๋ฉฐ, latest๋ฅผ ์ค์ ํ๋ฉด Consumer๋ ๊ฐ์ฅ ์ต๊ทผ์ ์ถ๊ฐ๋ ๋ฉ์์ง๋ง ๊ฐ์ ธ์ค๊ฒ ๋๋ค.
์ด๋ฌํ ์ ํ์ ์ ํ๋ฆฌ์ผ์ด์ ์ ์๊ตฌ์ฌํญ๊ณผ ๋ฐ์ดํฐ ์ฒ๋ฆฌ ์ ๋ต์ ๋ฐ๋ผ ๊ฒฐ์ ๋๋ค.
์ ์ด๋ฏธ์ง๋ ์นดํ์นด(Kafka) Consumer์ ๋ฐ์ดํฐ ๊ฐ์ ธ์ค๊ธฐ(fetch) ๊ด๋ จ ์ค์ ์ ๋ํด ์ค๋ช ํ๊ณ ์๋ค.
Consumer๊ฐ ๋ฐ์ดํฐ๋ฅผ ๊ฐ์ ธ์ค๊ธฐ ์์ํ๊ธฐ ์ํด ํ์ํ ์ต์ ๋ฐ์ดํธ ์๋ค.
๊ธฐ๋ณธ๊ฐ: 1
: ์ด๋ Consumer๊ฐ ์ต์ 1๋ฐ์ดํธ์ ๋ฐ์ดํฐ๋ง ์์ด๋ ๋ฐ์ดํฐ๋ฅผ ๊ฐ์ ธ์ฌ ์ ์๋ค๋ ๊ฒ์ ์๋ฏธํ๋ค.
์ฃผ์ ํฌ์ธํธ
: ๊ธฐ๋ณธ๊ฐ ์ค์ ์ ๋๊ฒ ํ๋ฉด, Consumer๋ ๋ ํฐ ์์ ๋ฐ์ดํฐ๊ฐ ์์ผ ๋๊น์ง ๊ธฐ๋ค๋ฆฐ ๋ค์ ๊ฐ์ ธ์ค๊ฒ ๋๋ค. ์ด๊ฐ์ ๋ฐฉ๋ฒ์ ๋คํธ์ํฌ I/O๋ฅผ ์ค์ด๊ณ ํจ์จ์ฑ์ ๋์ด๊ธฐ ์ํ ์ ๋ต์ผ๋ก ์ฌ์ฉ๋ ์ ์๋ค.
Consumer๊ฐ fetch.min.bytes ์ค์ ์ ๋ฐ๋ฅธ ์ต์ ๋ฐ์ดํธ ์๋ฅผ ์ถฉ์กฑํ๊ธฐ ์ํด ๊ธฐ๋ค๋ฆฌ๋ ์ต๋ ์๊ฐ์ด๋ค.
๊ธฐ๋ณธ๊ฐ: 500 ๋ฐ๋ฆฌ์ด(ms)
์ฃผ์ ํฌ์ธํธ
: ๋ง์ฝ ์ค์ ๋ ์๊ฐ ๋์ fetch.min.bytes์ ์ง์ ๋ ๋ฐ์ดํธ ์๊ฐ ์ถฉ์กฑ๋์ง ์๋๋ผ๋, ํด๋น ์๊ฐ์ด ์ง๋๋ฉด Consumer๋ ํ์ฌ ์ฌ์ฉ ๊ฐ๋ฅํ ๋ฐ์ดํฐ๋ฅผ ๊ฐ์ ธ์ค๊ฒ ๋๋ค.
Consumer๊ฐ ํ ๋ฒ์ poll() ์์ฒญ์ผ๋ก ๊ฐ ํํฐ์ ์์ ๊ฐ์ ธ์ฌ ์ ์๋ ์ต๋ ๋ฐ์ดํธ ์๋ค.
๊ธฐ๋ณธ๊ฐ: 1048576 ๋ฐ์ดํธ, ์ฆ 1MB
์ฃผ์ ํฌ์ธํธ
: ์ด ์ค์ ์ ๊ฐ ํํฐ์
์์ ๊ฐ์ ธ์ค๋ ๋ฐ์ดํฐ์ ํฌ๊ธฐ๋ฅผ ์ ํํ๋ค. ์ด๋ฅผ ํตํด Consumer๊ฐ ๋๋ฌด ๋ง์ ๋ฐ์ดํฐ๋ฅผ ํ ๋ฒ์ ์ฒ๋ฆฌํ๋ ๊ฒ์ ๋ฐฉ์งํ๋ฉฐ, ๋ฉ๋ชจ๋ฆฌ ์ฌ์ฉ๋์ ๊ด๋ฆฌํ ์ ์๋ค.
์ด ์ค์ ๋ค์ ์นดํ์นด Consumer์ ์ฑ๋ฅ๊ณผ ํจ์จ์ฑ์ ์ต์ ํํ๊ธฐ ์ํด ์ค์ํ๋ค.
์ด ๋ ๋ฉ์๋๋ offset ์ปค๋ฐ๊ณผ ๊ด๋ จ๋ ์์ ์ ์ํํ๋ค.
poll(): Consumer๊ฐ ์นดํ์นด์์ ์๋ก์ด ๋ฉ์์ง๋ฅผ ๊ฐ์ ธ์ค๋ ๋์, ์ค์ ๋ auto.commit.interval.ms ์ฃผ๊ธฐ์ ๋ฐ๋ผ offset์ ์๋ ์ปค๋ฐํ ์ ์๋ค.
close(): Consumer๊ฐ ์ข ๋ฃ๋๊ธฐ ์ ์ ๋ง์ง๋ง์ผ๋ก ์ฝ์ ๋ฉ์์ง์ offset์ ์ปค๋ฐํ๋ค.
์ฃผ์ ํฌ์ธํธ: ์ฌ๋ฐ๋ฅธ offset ๊ด๋ฆฌ๋ ๋ฉ์์ง ์ค๋ณต ์ฒ๋ฆฌ๋ ๋๋ฝ์ ๋ฐฉ์งํ๋๋ฐ ์ค์ํ๋ค.
ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(1));
Kafka์ ํ ํฝ์์ ๋ฉ์์ง๋ฅผ ๊ฐ์ ธ์ค๋ ์ฝ๋๋ค. ์ ์ฝ๋๋ 1์ด ๋์ ๋ฉ์์ง๋ฅผ ๊ธฐ๋ค๋ฆฐ๋ค.
for (ConsumerRecord<String, String> record : records) { ... }:
๊ฐ์ ธ์จ ๋ฉ์์ง๋ฅผ ์ํํ๋ฉฐ ์ฒ๋ฆฌํ๋ ๋ฃจํ๋ค.
consumer.commitSync();
์ฒ๋ฆฌํ ๋ฉ์์ง์ offset์ ๋๊ธฐ์ ์ผ๋ก ์ปค๋ฐํ๋ค.
commitSync ๋ฉ์๋๋ ์ปค๋ฐ ์์ฒญ์ด ์ฑ๊ณตํ ๋๊น์ง ๋ธ๋กํน๋๋ค. ๋ง์ฝ ์ปค๋ฐ์ด ์คํจํ๋ฉด ์์ธ๊ฐ ๋ฐ์ํ๋ค.
๋๊ธฐ ์ปค๋ฐ์ ์ฒ๋ฆฌ๊ฐ ํ์คํ๊ณ ์์ธก ๊ฐ๋ฅํ์ง๋ง, ์ปค๋ฐ์ ์คํจํ ๊ฒฝ์ฐ ๋ฆฌํธ๋ผ์ด ๋ฉ์ปค๋์ฆ์ ๊ตฌํํด์ผ ํ๋ค.
ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(1));
์์ ์ฝ๋์ ๋ง์ฐฌ๊ฐ์ง๋ก Kafka์ ํ ํฝ์์ ๋ฉ์์ง๋ฅผ ๊ฐ์ ธ์ค๋ ์ฝ๋๋ค.
for (ConsumerRecord<String, String> record : records) { ... }
๊ฐ์ ธ์จ ๋ฉ์์ง๋ฅผ ์ํํ๋ฉฐ ์ฒ๋ฆฌํ๋ ๋ฃจํ๋ค.
consumer.commitAsync(OffsetCommitCallback callback);
์ฒ๋ฆฌํ ๋ฉ์์ง์ offset์ ๋น๋๊ธฐ์ ์ผ๋ก ์ปค๋ฐํ๋ค.
commitAsync๋ ์ฆ์ ๋ฐํ๋๋ฉฐ, ์ฑ๊ณต ๋๋ ์คํจ์ ๋ํ ์ฝ๋ฐฑ์ ๋ฐ์ ์ ์๋ค.
๋น๋๊ธฐ ์ปค๋ฐ์ ๋น ๋ฅด๊ณ , ๋ธ๋กํน์ด ์์ผ๋ฏ๋ก ์ฒ๋ฆฌ ์๋๊ฐ ๋น ๋ฅด๋ค. ํ์ง๋ง ์คํจํ ์ปค๋ฐ์ ๋ํด ์๋์ผ๋ก ์ฌ์๋ํ์ง ์๋๋ค.
๋ฉฑ๋ฑ์ฑ์ ๋ฐ์ดํฐ ์ฒ๋ฆฌ์์ ๋งค์ฐ ์ค์ํ ๊ฐ๋ ์ด๋ค.
์๋ฅผ ๋ค์ด, ํ ์ฌ์ฉ์๊ฐ ๋ฌผ๊ฑด์ ๊ตฌ๋งคํ๋ ๊ฒฐ์ ์์ฒญ์ ๋ณด๋์ ๋, ๋คํธ์ํฌ ๋ฌธ์ ๋ก ์ธํด ๊ฐ์ ์์ฒญ์ ์ฌ๋ฌ ๋ฒ ๋ณด๋ด๊ฒ ๋ ์ ์๋ค. ๋ฉฑ๋ฑ์ฑ์ด ๋ณด์ฅ๋์ง ์์ผ๋ฉด, ์ฌ์ฉ์๋ ๊ฐ์ ๋ฌผ๊ฑด์ ์ฌ๋ฌ ๋ฒ ๊ตฌ๋งคํ๊ฒ ๋ ์ ์๋ค.
๋ฐ๋ผ์, ์์คํ ์ ๊ฐ์ ์์ฒญ์ ์ฌ๋ฌ ๋ฒ ๋ฐ์๋ ํ ๋ฒ๋ง ์ฒ๋ฆฌํ๊ฑฐ๋, ๋์ผํ ๊ฒฐ๊ณผ๋ฅผ ๋ฐํํด์ผ ํ๋ค.
์์ ๊ฐ๋ ์ ๋ฐ์ดํฐ๋ฅผ ์์ ํ๊ฒ ์ฒ๋ฆฌํ๊ธฐ ์ํด ์ค์ํ๋ฉฐ, ํนํ ๋ถ์ฐ ์์คํ , ๋คํธ์ํฌ ํต์ , ๋ฐ์ดํฐ๋ฒ ์ด์ค ๋ฑ์์ ์์ฃผ ๋ค๋ฃจ์ด์ง๋ค.
Kafka์์ ๋ฆฌ๋ฐธ๋ฐ์ฑ์ Consumer Group ๋ด์ Consumer๋ค ์ฌ์ด์ ํํฐ์ ์ ์ฌ๋ถ๋ฐฐํ๋ ๊ณผ์ ์ ๋งํ๋ค.
Consumer ์ถ๊ฐ/์ ๊ฑฐ
: Consumer Group์ ์๋ก์ด Consumer๊ฐ ์ถ๊ฐ๋๊ฑฐ๋ ๊ธฐ์กด์ Consumer๊ฐ ์ ๊ฑฐ๋ ๋, ํํฐ์
์ ๋ถ๋ฐฐ๊ฐ ํ์ํ๋ค.
ํํฐ์
์ถ๊ฐ
: ํ ํฝ์ ์๋ก์ด ํํฐ์
์ด ์ถ๊ฐ๋ ๊ฒฝ์ฐ, ์ด ์ ํํฐ์
์ ์๋นํ๊ธฐ ์ํด ํํฐ์
๋ถ๋ฐฐ๊ฐ ํ์ํ๋ค.
Consumer ์ฅ์
: ์ด๋ค Consumer๊ฐ ์ฅ์ ๋ก ์ธํด ๋์ํ์ง ์๊ฒ ๋๋ฉด, ๊ทธ Consumer๊ฐ ๋ด๋นํ๋ ํํฐ์
๋ค์ ๋ค๋ฅธ Consumer๋ค์๊ฒ ๋ถ๋ฐฐํด์ผ ํ๋ค.
๋ฆฌ๋ฐธ๋ฐ์ฑ ์์ ์, Consumer Group ๋ด์ ๋ชจ๋ Consumer๋ ํ์ฌ ์๋น ์ค์ธ ํํฐ์ ์์ ๋ถ๋ฆฌ๋๋ค.
๊ทธ ํ, Group Coordinator๋ ์๋ก์ด ํํฐ์ ํ ๋น์ ๊ฒฐ์ ํ๊ณ ๊ฐ Consumer์๊ฒ ์๋ฆฐ๋ค.
๊ฐ Consumer๋ ์๋ก ํ ๋น๋ ํํฐ์ ๋ค์ ์์์ (์: ๊ฐ์ฅ ๋ง์ง๋ง์ผ๋ก ์ปค๋ฐ๋ ์คํ์ )๋ถํฐ ์๋นํ๊ธฐ ์์ํ๋ค.
๋ฆฌ๋ฐธ๋ฐ์ฑ์ ํ์ํ ์์ ์ด์ง๋ง, ๊ณผ์ ์ค์๋ ํด๋น Consumer Group์ ๋ฉ์์ง ์๋น๊ฐ ์ผ์์ ์ผ๋ก ์ค๋จ๋ ์ ์๋ค.
๋ฆฌ๋ฐธ๋ฐ์ฑ์ด ๋น๋ฒํ๊ฒ ๋ฐ์ํ๋ฉด, ์ ์ฒด ์์คํ ์ ์ฒ๋ฆฌ๋๊ณผ ์ฑ๋ฅ์ ์ํฅ์ ์ค ์ ์๊ธฐ ๋๋ฌธ์, ์ด๋ฅผ ์ต์ํํ๋ ์ค์ ๊ณผ ์ ๋ต์ด ํ์ํ๋ค.
์ ์ด๋ฏธ์ง์์ ์ธ๊ธ๋ session.timeout.ms ๋ฐ heartbeat.interval.ms๋ ๋ฆฌ๋ฐธ๋ฐ์ฑ๊ณผ ๊ด๋ จ๋ ์ค์ ๊ฐ์ด๋ค.
heartbeat.interval.ms
๋ Consumer๊ฐ Group Coordinator์๊ฒ "๋ ์์ง ์ด์์์ด!"๋ผ๊ณ ์๋ฆฌ๋ ์ ํธ๋ฅผ ๋ณด๋ด๋ ์ฃผ๊ธฐ๋ฅผ ์ค์ ํ๋ฉฐ, session.timeout.ms
๋ ํด๋น ์ ํธ๋ฅผ ๋ฐ์ง ๋ชปํ์ ๋ Consumer๋ฅผ ์ฅ์ ๋ก ํ๋จํ๊ณ ๋ฆฌ๋ฐธ๋ฐ์ฑ์ ์์ํ๋ ์๊ฐ์ ๊ฒฐ์ ํ๋ค.
Apache Kafka์์์ Consumer๋ ์ค๋ ๋์ ์์ ํ์ง ์๋ค. ์ด๋ KafkaConsumer์ ์ธ์คํด์ค๊ฐ ์ฌ๋ฌ ์ค๋ ๋์ ์ํด ๋์์ ์ก์ธ์ค๋ ๋ ์์ ์ ์ธ ๋์์ ๋ณด์ฅํ์ง ์๋๋ค๋ ๊ฒ์ ์๋ฏธํ๋ค.
KafkaConsumer์ ๋ด๋ถ ๊ตฌ์กฐ์ ์ํ ๊ด๋ฆฌ ๋ฐฉ์์ ๋จ์ผ ์ค๋ ๋์์ ๋์ํ๊ธฐ ์ํด ์ค๊ณ๋์๋ค.
์ฌ๋ฌ ์ค๋ ๋์์ ๋์์ KafkaConsumer์ ๋ฉ์๋๋ฅผ ํธ์ถํ๊ฒ ๋๋ฉด, ๊ทธ ์ํ์ ๋์์ด ์๊ธฐ์น ์๊ฒ ๋ณ๊ฒฝ๋ ์ ์์ด ๋ฐ์ดํฐ ์์ค ๋๋ ์ค๋ณต ์ฒ๋ฆฌ์ ๊ฐ์ ๋ฌธ์ ๊ฐ ๋ฐ์ํ ์ ์๋ค.
๋ณ๋ ฌ ์ฒ๋ฆฌ๋ฅผ ์ํด์ ๊ฐ ์ค๋ ๋๋ง๋ค ๋ณ๋์ KafkaConsumer ์ธ์คํด์ค๋ฅผ ์ฌ์ฉํด์ผ ํฉ๋๋ค.
์ด๋ ๊ฒ ํ๋ฉด ๊ฐ ์ค๋ ๋๋ ๋ ๋ฆฝ์ ์ผ๋ก ๋ฉ์์ง๋ฅผ ์๋นํ ์ ์๋ค.
wakeup()์ KafkaConsumer๊ฐ poll() ๋ฉ์๋๋ฅผ ํธ์ถ ์ค์ผ ๋, ์ด๋ฅผ ์ค๋จ์ํค๊ธฐ ์ํ ๋ฉ์๋๋ค.
๋ค๋ฅธ ์ค๋ ๋์์ ์ด ๋ฉ์๋๋ฅผ ํธ์ถํ๋ฉด poll()์ด ์คํ ์ค์ธ ์ค๋ ๋๋ WakeupException์ ๋ฐ์์ํค๊ณ ์ค๋จ๋๋ค.
์ด ๊ธฐ๋ฅ์ ์ฃผ๋ก KafkaConsumer๊ฐ ๊ธด poll() ์์ ์ ์ํํ๋ ๋์ ๊ทธ ์์ ์ ๊ฐ์ ๋ก ์ข ๋ฃ์ํค๊ธฐ ์ํด ์ฌ์ฉ๋๋ค. ์๋ฅผ ๋ค์ด, ์ฆ๊ฐ์ ์ธ ์ข ๋ฃ๊ฐ ํ์ํ ์ํฉ์์ ์ ์ฉํฉ๋๋ค.
๊ฒฐ๋ก ์ ์ผ๋ก, KafkaConsumer๋ฅผ ์ฌ์ฉํ ๋๋ ์ค๋ ๋์ ์์ ํ๊ฒ ๋์ํ๋๋ก ์ฃผ์๊ฐ ํ์ํ๋ฉฐ, ๋ณ๋ ฌ ์ฒ๋ฆฌ๊ฐ ํ์ํ ๊ฒฝ์ฐ ๊ฐ ์ค๋ ๋์ ๋ ๋ฆฝ์ ์ธ KafkaConsumer ์ธ์คํด์ค๋ฅผ ํ ๋นํด์ผ ํ๋ค.