No description, website, or topics provided.
Clone or download
Fetching latest commit…
Cannot retrieve the latest commit at this time.
Permalink
Type Name Latest commit message Commit time
Failed to load latest commit information.
Code
Docker
.DS_Store
README.md

README.md

Kafka

學習目標

  • Kafka 簡介與組成元件

Contents

簡介

Top

  • 由 LinkedIn 所開發並於 2011 年開源
  • 是一個分散式串流平台
  • 其平台使用 Scala 與 Java 所撰寫

特性

Top

  • 分散式 ( Distributed )
  • 容錯 ( fault tolerant )
  • 水平擴展 ( Horizontal scalability )
  • 即時 ( real-time )
  • 低延遲 ( low-latency )
  • 高吞吐量 ( high-throughput )

元件說明

Top

  • 示意圖

  • Record

    • 每個發佈到 Kafka 的訊息稱為 Record
    • Push 到 Kafka 的 Record 包含 Key、Value 與 timestamp 三個部份; Consumer 從 Kafka 取得的 Record 中包含 Key、Value、timestamp 與 offset 四個部份
    • 將不含 Key 的 Record Push 到擁有多個 Partition 的 Topic 中,則 Kafka 會透過 Round-Robin 的方式將此筆 Record 派送到 Partition 中;如指定 Key,則 Record 會被分派到相同的 Partition
  • Producer

    • 向 Kafka 發佈 Topic Record 的角色
  • Consumer

    • 向 Kafka 訂閲 Topic 並處理其發佈 Record 的角色
  • Consumer Groups

    • Consumer Group 中的 Consumer 稱為 Consumer Instance,亦即是,Consumer Group 是由一個或多個 Consumer Instance 組合而成
    • 每個 Record 可以分送給不同的 Group,但每個 Group 中只會有一個 Consumer 可以訂閲此 Record
    • 當 Consumer 處理完從 Kafka 所接受的資料後,會傳送 offset 的 commit 給 Kafka
    • Consumer 透過 topic-partition 方式確保所收到的資訊是先進先出 ( FIFO - First In, First Out )
  • Broker

    • Kafka Cluster 是由一台或多台 Broker 所組成
    • 每個 Broker 都可透過一個整數 ID ( Integer ) 來識別
    • 每個 Broker 包含特定的 Topic
    • 建議至少有 3 個 Broker
    • acknowledgement
      • acks = 0,Producer 不等待確認 ( 資料有可能遺失 )
      • acks = 1,Producer 等待 leader 確認 ( 資料遺失有限 )
      • acks = all (-1),leader 與副本確認 ( 資料不會遺失 )
  • Topic

    • Record 的不同分類
    • 建立 Topic 時要考慮到 Partitions Count 與 Replication Factor 這 2 個參數
  • Partitions

    • 一個 Topic 可以分為多個 Partition
    • 每個 Partition 是一個有序的 Queue
    • Partition 中的每個 Record 都會被分配一個有序的 id 又稱為 offset,亦即是,每增加一個 Record,則 offset 增加一
    • 此 offset 用來識別 Partition 中的每個 Record
    • 相同 Record 在不同的 Partition 中的 offset 不見得相同
    • Record 預設保留一週
    • Record 被寫到 Partition 中,則無法進行修改
    • Record 隨機寫到 Partition,除非有提供 Key
    • 在 Topic replication factor 機制中只會有一個 Leader 與多個 ISR ( in-sync replica )
    • 將每個 offset 的 commit 紀錄存放在 __consumer_offsets
    • 多 Partition 的優缺點
      • 優點
        • 較好的平行運作 ( better parallelism ) 與高吞吐量 ( better throughput )
      • 缺點
        • 會有更多的開啟檔案在系統中
        • 如果 Broker Fail 時,會有很多的 leader 選舉的發生
        • 增加副本延遲時間
      • Partition 數量
        • 建議每個 Topic 的 Partition 數目約為 Broker 數的一到兩倍
    • 在 Partition 中有多個 Segment,其中 Segment 是由多個 offset 所組成,但只會有一個 Segment 是 ACTIVE 狀態,亦即是,此 Segment 正可以被寫入的狀態
      • Segment 設定
        • log.segment.bytes
          • 單一 Segment 的最大大小
          • 預設為 1GB
        • log.segment.ms
          • 在 Commit 之前,如果 Segment 不為空,則 Kafka 等待的時間
        • log.roll.hours 或 log.roll.ms
          • 預設為 1 星期
        • index 種類
          • position index
            • 允許 Kafka 透過位置去找到訊息
          • timestamp index
            • 允許 Kafka 透過 timestamp 去找到訊息
        • log 清除 ( Cleanup )
          • 目的
            • 刪除過時的資料以控制硬碟上資料的容量
          • 特點
            • 常發在 Partition Segment
            • 因為使用 CPU 與 RAM 資源所以不應常常發生
            • 預設每 15 秒進行清除的確認工作
              • 在 log.cleaner.backoff.ms 中設定
          • 設定方式
            • log.cleanup.policy=delete
              • 以時間為依據
                • 設定位置
                  • log.retention.hours
                • 特性
                  • 根據資料存在的時間 ( 預設 7 天 - 168 小時 )
                  • 數字較高代表需要較多的硬碟空間
                  • 數字較小代表保留的資料較少
              • 以空間為依據
                • 設定位置
                  • log.retention.bytes
                • 特性
                  • 意指每個 Partition 的最大容量
                  • log 的最大容量 ( 預設為 -1,代表不限 ) 來刪除
                  • 用於將 log 的容量保持在閾值 ( threshold ) 之下
              • 範例
                • 保留一周
                  • log.retention.hours = 168 與 log.retention.bytes = -1
                • 保留時間不限,但容量設為 500 MB
                  • log.retention.hours = 17520 與 log.retention.bytes = 524288000
            • log.cleanup.policy=compact
              • 特性
                • log 壓縮能確保在 Partition 中特定 key 的最後數值
                • 適用於需要 SNAPSHOT,而不是完整歷程 ( history ) 時
                • 在 ACTIVE Segment 被 commit 之後將刪除舊有重複的資料,亦即是,只處理 INACTIVE Segment 的部份,而不處理 ACTIVE Segment 的部份
                • log 壓縮只移除部份訊息,但訊息的順序還是保持著
                • 訊息的 offset 是不變的,亦即是,當訊息移失時, offset 只是被跳過
                • 任何的 Consumer 從訊息開頭處進行讀取,則還是會看到傳送到這個 Topic 的所有訊息
              • 圖示
        • log 壓縮
          • 設定
            • compression.type
            • 選項有 gzip、snappy、lz4、uncompressed、producer
          • 壓縮與解壓縮位置
            • 壓縮交由 Producer、解壓縮交由 Consumer
          • 壓縮範圍
            • 只對 non-binary 資料進行壓縮,例如:JSON、XML、text 等,不對 binary 資料進行壓縮,例如:parquet、protobuf、avro 等
  • Zookeeper

    • 為開源管理分散式的服務套件,用來協調分散式應用程式的工作
  • Rebalance

    • 當 Consumer 或 Partition 數目變動時,則 Zookeeper 會進行 Rebalance 的動作
  • Mirror Maker

    • 做為不同 Kafka Cluster 資料同步的元件

Topic replication factor (副本數)

Top

  • 示意圖

  • 建立 Topic 時可同步設定 Topic 的副本數,以避免某個 Broker 發生問題時 Consumer 無法取得資料
  • replication factor 數量通常設定為 2 或 3
  • 每個 Partition 只會有一個 Leader,但有多個 ISR ( in-sync replica )
  • 當 replication factor 設定為 n,則 Producer 與 Consumer 能容忍 n-1 個 Broker 停機

透過 Docker 安裝 Kafka 與 Zookeeper

Top

  • 開始 Zookeeper 與 Kafka
    • docker-compose up -d
  • 確認 Zookeeper 與 Kafka 服務
    • docker-compose ps
  • 進到 Docker container
    • docker exec -it env_kafka_1 bash
  • 建立 topic
    • kafka-topics --create
      --topic test
      --replication-factor 1
      --partitions 1
      --zookeeper zookeeper:2181
  • 發佈訊息
    • kafka-console-producer
      --broker-list kafka:9092
      --topic test
  • 訂閱資訊
    • kafka-console-consumer
      --bootstrap-server kafka:9092
      --topic test
      --from-beginning
  • 停止 Zookeeper 與 Kafka
    • docker-compose stop
  • 開始存在的 Zookeeper 與 Kafka
    • docker-compose start
  • 移除 Zookeeper 與 Kafka Container
    • docker-compose down

啟動 docker-compose 發生 ERROR: Couldn’t connect to Docker daemon at http+docker://localunixsocket - is it running? 錯誤

ERROR: Couldn’t connect to Docker daemon at http+docker://localunixsocket - is it running?

If it’s at a non-standard location, specify the URL with the DOCKER_HOST environment variable.

Apache NiFi

  • 安裝 OpenJDK 8

    • sudo apt update
    • sudo apt install openjdk-8-jdk
    • java -version
    • sudo update-alternatives --config java
    • sudo nano /etc/environment
    • JAVA_HOME="/usr/lib/jvm/java-8-openjdk-amd64/jre/"
    • source /etc/environment
    • echo $JAVA_HOME
  • 下載

    • wget ftp://ftp.twaren.net/Unix/Web/apache/nifi/1.8.0/nifi-1.8.0-bin.tar.gz
  • 解壓縮

    • tar zxvf nifi-1.8.0-bin.tar.gz
  • 啟動

    • cd nifi-1.8.0
    • bin/nifi.sh start
  • 停止

    • cd nifi-1.8.0
    • bin/nifi.sh stop
  • Browse NiFi

mysql-connector