

#### **2. Batch Layer**

```
1. Start the Apache Airflow instance:
docker-compose up -d
Access the Apache Airflow web UI (localhost:8080) and run the DAG

@docker-compose.yaml 


2. Start Apache Spark
spark-shell

3. Start Apache zookeeper
zookeeper-server-start.bat C:/kafka_2.13_2.6.0/config/zookeeper.properties

4. Start Kafka server
kafka-server-start.bat C:/kafka_2.13_2.6.0/config/server.properties
Run the kafka producer
kafka-console-producer.bat --topic smartphoneTopic --bootstrap-server localhost:9092
Run the kafka consumer
kafka-console-consumer.bat --topic smartphoneTopic --from-beginning --bootstrap-server localhost:9092

5. Run HDFS and yarn (start-all or start-dfs and start-yarn)
start-all  

======

Viết thành @docker-compose-batch.yml 
```


```
1. Check: Airflow và Dags: 
- Lỗi đường dẫn nên mình đã sửa bằng cách đưa hàm batch_layer vào dags luôn. 
- Check = PORT: 8080
2. Check: 
Apach Spark Master và Apach Spark Worker là gì ?? 
- Master: 7077 (bug), và 9090:8080  (ok)
- Worker: ??? ko có port

"""
Apache Spark Master và Worker là hai thành phần chính trong kiến trúc phân tán của Apache Spark:

1. **Spark Master (Node chủ)**:
- Là trung tâm điều phối của cụm Spark
- Nhiệm vụ chính:
  - Phân phối công việc cho các Worker nodes
  - Quản lý tài nguyên trong cụm
  - Theo dõi trạng thái của các Worker
  - Điều phối việc thực thi các task
- Giống như "người quản lý" điều phối công việc

2. **Spark Worker (Node công nhân)**:
- Là các node thực thi công việc
- Nhiệm vụ chính:
  - Thực thi các task được Master giao
  - Báo cáo trạng thái cho Master
  - Quản lý tài nguyên cục bộ (CPU, RAM)
- Giống như "công nhân" thực hiện công việc

Ví dụ hoạt động:
```
1. Client gửi job đến Master
2. Master phân tích job và chia thành các task
3. Master phân phối tasks cho các Worker
4. Workers thực thi tasks và gửi kết quả về
5. Master tổng hợp kết quả và trả về Client
```

Trong docker-compose của bạn:
```yaml
spark-master:
    environment:
      - SPARK_MODE=master
      - SPARK_WORKER_MEMORY=1G
      - SPARK_WORKER_CORES=1

spark-worker:
    environment:
      - SPARK_MODE=worker
      - SPARK_MASTER_URL=spark://spark-master:7077
      - SPARK_WORKER_MEMORY=1G
```

- Master được cấu hình với 1GB RAM và 1 core
- Worker kết nối đến Master qua port 7077
- Worker cũng được cấp 1GB RAM để xử lý task

"""

"""
Ah, tôi hiểu vấn đề của bạn:

1. **Port 7077**:
- Port này là RPC port (Remote Procedure Call) dùng cho giao tiếp nội bộ giữa Master và Worker
- Không phải là Web UI port nên không thể truy cập bằng browser
- Chỉ dùng cho Spark internal communication

2. **Worker Port**:
- Thiếu port mapping cho Worker UI
- Sửa lại docker-compose như sau:

```yaml:Main/Lambda/Batch_layer/docker-compose-batch.yml
spark-worker:
    image: bitnami/spark:latest
    container_name: spark-worker
    environment:
      - SPARK_MODE=worker
      - SPARK_MASTER_URL=spark://spark-master:7077
      - SPARK_WORKER_MEMORY=1G
      - SPARK_WORKER_CORES=1
      - SPARK_RPC_MESSAGE_MAX_SIZE=2047
      - SPARK_NETWORK_MAX_MESSAGE_SIZE=2047
      # Thêm port cho Worker UI
      - SPARK_WORKER_WEBUI_PORT=8081
    ports:
      - "8081:8081"  # Export Worker UI port
    depends_on:
      - spark-master
    networks:
      - batch_network
```

Sau khi sửa:
1. Spark Master UI: `http://localhost:9090`
2. Spark Worker UI: `http://localhost:8081`
3. Port 7077: Chỉ dùng cho internal communication

Kiểm tra:
```bash
# Restart services
docker-compose -f docker-compose-batch.yml down
docker-compose -f docker-compose-batch.yml up -d

# Check logs
docker logs spark-master
docker logs spark-worker
```

Bây giờ bạn có thể:
- Xem Master UI ở port 9090
- Xem Worker UI ở port 8081
- Port 7077 vẫn hoạt động cho internal communication

"""



Check UI tại: 
- Master: `http://localhost:9090`
- Worker: `http://localhost:8081`



### 3. Apach Kafka: 

Các thay đổi chính:
Tăng MESSAGE_MAX_BYTES và các giới hạn liên quan lên 2GB
Tăng buffer size cho socket
Tăng fetch size cho partition

Nếu vẫn gặp lỗi, có thể:
- Giảm kích thước message gửi đi
- Hoặc tăng thêm các giới hạn:





Memory Manager trong Kafka batch có các thành phần chính:

1. **Heap Memory Management**:
```yaml
KAFKA_HEAP_OPTS: "-Xmx4G -Xms4G"
```
- `-Xmx4G`: Maximum heap size (tối đa 4GB)
- `-Xms4G`: Initial heap size (khởi tạo 4GB)
- Dùng cho: Java runtime, broker operations, cache

2. **Network Memory**:
```yaml
KAFKA_SOCKET_RECEIVE_BUFFER_BYTES: 1024000
KAFKA_SOCKET_SEND_BUFFER_BYTES: 1024000
```
- Buffer cho network operations
- Xử lý incoming/outgoing requests

3. **Message Size Management**:
```yaml
KAFKA_MESSAGE_MAX_BYTES: 2000000000        # Max message size
KAFKA_MAX_REQUEST_SIZE: 2000000000         # Max request size
KAFKA_MAX_PARTITION_FETCH_BYTES: 2000000000 # Max data per partition fetch
```

4. **Thread Management**:
```yaml
KAFKA_NUM_NETWORK_THREADS: 3    # Network threads
KAFKA_NUM_IO_THREADS: 8         # I/O threads
KAFKA_BACKGROUND_THREADS: 10    # Background processing
```

5. **Queue Management**:
```yaml
KAFKA_QUEUED_MAX_REQUESTS: 500  # Max requests in queue
```

Cách hoạt động:
1. Network threads nhận requests
2. Requests được đưa vào queue
3. I/O threads xử lý từ queue
4. Background threads xử lý maintenance tasks

Khi gặp vấn đề memory:
1. Tăng heap size
2. Giảm message size
3. Điều chỉnh số lượng threads
4. Monitor memory usage



### 4. Apach Zookeeper: 
- Vẫn còn lỗi "Len error 1195725856" với Zookeeper. 
- Có thể do các config không đúng. 
- Cần kiểm tra lại các config trong docker-compose-batch.yml và các file cấu hình khác. 


Vẫn còn lỗi "Len error 1195725856" với Zookeeper. Hãy thử điều chỉnh lại cấu hình Zookeeper như sau:

```yaml:Main/Lambda/Batch_layer/docker-compose-batch.yml
zookeeper:
    image: confluentinc/cp-zookeeper:7.4.0
    container_name: zookeeper-batch
    ports:
      - "2182:2181"
    environment:
      ZOOKEEPER_CLIENT_PORT: 2181
      ZOOKEEPER_TICK_TIME: 2000
      ZOOKEEPER_SYNC_LIMIT: 2
      ZOOKEEPER_INIT_LIMIT: 5
      
      # Tăng buffer size và memory
      JVMFLAGS: >-
        -Djava.awt.headless=true
        -Xmx2G -Xms2G
        -Dzookeeper.maxClientCnxns=60
        -Dzookeeper.jute.maxbuffer=8388608
        -XX:+UseG1GC
        -XX:MaxGCPauseMillis=200
      
      # Giới hạn kết nối và timeout
      ZOOKEEPER_MAX_CLIENT_CNXNS: 60
      ZOOKEEPER_MAX_SESSION_TIMEOUT: 60000
      ZOOKEEPER_MIN_SESSION_TIMEOUT: 6000
      
      # Snapshot và cleanup
      ZOOKEEPER_SNAP_RETAIN_COUNT: 5
      ZOOKEEPER_PURGE_INTERVAL: 12
      ZOOKEEPER_AUTOPURGE_PURGEINTERVAL: 12
      ZOOKEEPER_AUTOPURGE_SNAPRETAINCOUNT: 5

    volumes:
      - zookeeper_data:/var/lib/zookeeper/data
      - zookeeper_log:/var/lib/zookeeper/log
    ulimits:
      nofile:
        soft: 65536
        hard: 65536
    networks:
      - batch_network
```

Các thay đổi chính:

1. Tăng memory và buffer:
   - Heap size: 2GB
   - jute.maxbuffer: 8MB
   - Thêm G1GC collector

2. Điều chỉnh timeout và connections:
   - maxClientCnxns: 60 
   - session timeout: 6s-60s

3. Thêm cấu hình cleanup:
   - Tự động purge mỗi 12 giờ
   - Giữ 5 snapshots

4. Thêm ulimits để tăng số file descriptors

5. Thêm các tham số JVM tối ưu

Sau khi sửa:

```bash
# Xóa volumes cũ
docker-compose -f docker-compose-batch.yml down -v

# Restart với cấu hình mới
docker-compose -f docker-compose-batch.yml up -d

# Kiểm tra logs
docker logs zookeeper-batch
```

Nếu vẫn gặp lỗi:
1. Tăng jute.maxbuffer lên 16MB hoặc 32MB
2. Tăng heap size lên 4GB
3. Giảm số lượng kết nối đồng thời
4. Kiểm tra message size từ Kafka







Trong Kafka, việc có 2 ports 9092 và 9093 là để phục vụ 2 mục đích khác nhau:

1. **Port 9092 (PLAINTEXT://kafka:9092)**:
- Dùng cho giao tiếp nội bộ giữa các containers trong Docker network
- Các services trong cùng network như Spark, Airflow sẽ dùng port này
- Địa chỉ `kafka:9092` chỉ có thể truy cập được từ bên trong Docker network

2. **Port 9093 (PLAINTEXT_HOST://localhost:9093)**:
- Dùng cho giao tiếp từ bên ngoài Docker (từ máy host)
- Cho phép các ứng dụng chạy trên máy host kết nối đến Kafka
- Địa chỉ `localhost:9093` có thể truy cập từ máy host

Điều này được cấu hình trong:
````yaml
environment:
  KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
  KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:9092,PLAINTEXT_HOST://localhost:9093
````

Ví dụ sử dụng:
1. Từ container khác:
````python
# Dùng port 9092
kafka_bootstrap_servers = 'kafka:9092'
````

2. Từ máy host:
````python
# Dùng port 9093 
kafka_bootstrap_servers = 'localhost:9093'
````


Để xem UI của Kafka, chúng ta cần thêm Kafka UI tool như Kafdrop hoặc AKHQ. Thêm service này vào docker-compose:

```yaml
# Thêm vào docker-compose-batch.yml
  kafdrop:
    image: obsidiandynamics/kafdrop:latest
    container_name: kafdrop
    depends_on:
      - kafka
    ports:
      - "9000:9000"  # Port cho Web UI
    environment:
      KAFKA_BROKERCONNECT: kafka:9092
      JVM_OPTS: "-Xms32M -Xmx64M"
      SERVER_SERVLET_CONTEXTPATH: "/"
    networks:
      - batch_network
```

Sau đó:

1. Restart services:
```bash
docker-compose -f docker-compose-batch.yml up -d
```

2. Truy cập Kafka UI:
- Mở browser và truy cập: http://localhost:9000

Qua UI này bạn có thể:
- Xem danh sách topics
- Xem/tạo partitions 
- Xem messages trong topics
- Xem broker metrics
- Quản lý consumer groups

Port 9092/9093 chỉ là ports để kết nối tới Kafka broker, không phải để hiển thị UI.


- View Kafka on Confluence 

update Confluent Control Center into @docker-compose-batch.yml  (giữ nguyên các phần khác) 



Lỗi này xảy ra do Kafka Streams đang cố gắng rebalance. Cần thêm một số cấu hình cho Control Center và Kafka:

````yaml:Main/Lambda/Batch_layer/docker-compose-batch.yml
  kafka:
    # ... các config cũ giữ nguyên
    environment:
      # Thêm các config sau
      KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0
      KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1
      KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1
      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
      KAFKA_AUTO_CREATE_TOPICS_ENABLE: "true"

  control-center:
    image: confluentinc/cp-enterprise-control-center:7.4.0
    hostname: control-center
    container_name: control-center
    depends_on:
      - kafka
    ports:
      - "9021:9021"
    environment:
      CONTROL_CENTER_BOOTSTRAP_SERVERS: 'kafka:9092'
      CONTROL_CENTER_REPLICATION_FACTOR: 1
      CONTROL_CENTER_INTERNAL_TOPICS_PARTITIONS: 1
      CONTROL_CENTER_MONITORING_INTERCEPTOR_TOPIC_PARTITIONS: 1
      CONFLUENT_METRICS_TOPIC_REPLICATION: 1
      CONFLIENT_METRICS_ENABLE: 'false'
      # Thêm các config sau
      CONTROL_CENTER_STREAMS_NUM_STREAM_THREADS: 1
      CONTROL_CENTER_COMMAND_TOPIC_REPLICATION: 1
      CONTROL_CENTER_METRICS_TOPIC_REPLICATION: 1
      CONTROL_CENTER_STREAMS_CONSUMER_REQUEST_TIMEOUT_MS: 960000
      CONTROL_CENTER_STREAMS_PRODUCER_REQUEST_TIMEOUT_MS: 960000
      PORT: 9021
    networks:
      - batch_network
    healthcheck:
      test: [ "CMD", "curl", "-f", "http://localhost:9021/health" ]
      interval: 30s
      timeout: 10s
      retries: 5
````

Sau đó:

````bash
# Xóa volumes cũ
docker volume rm $(docker volume ls -q)

# Restart
docker-compose -f docker-compose-batch.yml down
docker-compose -f docker-compose-batch.yml up -d

# Check logs
docker logs control-center
````

Các thay đổi chính:
1. Thêm cấu hình replication và transaction cho Kafka
2. Thêm cấu hình streams và timeout cho Control Center
3. Đặt replication factor = 1 vì chỉ có 1 broker
