```
dags/kafka-stream.py
spark-stream.py
------
1. 2 File này là gì, 
2. Vai trò là gì trong luồng 

Nói về luồng tổng quan trước
3. Bỏ đi thì saoa
```

### Tổng quan về luồng dữ liệu

Luồng dữ liệu trong hai file `kafka-stream.py` và `spark-stream.py` là một kiến trúc **real-time streaming** kết hợp giữa Apache Kafka, Apache Spark, và Cassandra. Dưới đây là mô tả tổng quan:

1. **Luồng dữ liệu tổng quan:**
   - **Dữ liệu đầu vào:** Được lấy từ API `https://randomuser.me/api/` (cung cấp dữ liệu người dùng ngẫu nhiên).
   - **Apache Kafka:** Là hệ thống message queue dùng để truyền tải dữ liệu thời gian thực từ `kafka-stream.py` đến `spark-stream.py`.
   - **Apache Spark:** Đọc dữ liệu từ Kafka, xử lý dữ liệu, và lưu vào cơ sở dữ liệu NoSQL (Cassandra).
   - **Cassandra DB:** Lưu trữ dữ liệu đã được xử lý để phục vụ các ứng dụng downstream (phân tích, báo cáo, etc.).

2. **Dòng chảy dữ liệu chính:**
   - `kafka-stream.py` (Producer): 
     1. Lấy dữ liệu từ API.
     2. Định dạng dữ liệu (format).
     3. Gửi dữ liệu đến **Kafka topic `users_created`**.
   - `spark-stream.py` (Consumer):
     1. Kết nối đến Kafka để nhận dữ liệu từ topic `users_created`.
     2. Xử lý dữ liệu (cấu trúc lại schema).
     3. Lưu dữ liệu đã xử lý vào bảng `created_users` trong Cassandra.

---

### 1. Mô tả từng file và vai trò trong luồng

| **File**              | **Vai trò**                                                                                                            |
|-----------------------|------------------------------------------------------------------------------------------------------------------------|
| **`kafka-stream.py`** | Là **producer** trong Kafka. File này lấy dữ liệu từ API, xử lý và đẩy dữ liệu lên Kafka topic (`users_created`).       |
| **`spark-stream.py`** | Là **consumer** trong Kafka. File này đọc dữ liệu từ Kafka, xử lý dữ liệu bằng Apache Spark và lưu kết quả vào Cassandra.|

---

#### **`kafka-stream.py` (Producer)**
- **Vai trò:**
  - Kết nối đến API (`https://randomuser.me/api/`) để lấy dữ liệu người dùng.
  - Xử lý (format) dữ liệu đầu vào thành JSON với các thông tin cụ thể như `first_name`, `address`, `email`, etc.
  - Đẩy dữ liệu lên Kafka topic `users_created`.

- **Các bước chính:**
  1. **Gửi request đến API:** Lấy dữ liệu người dùng ngẫu nhiên.
  2. **Xử lý dữ liệu:** Biến đổi dữ liệu thành định dạng JSON dễ sử dụng.
  3. **Gửi dữ liệu vào Kafka:** Đẩy dữ liệu lên topic `users_created`.

#### **`spark-stream.py` (Consumer)**
- **Vai trò:**
  - Kết nối đến Kafka để nhận dữ liệu từ topic `users_created`.
  - Xử lý dữ liệu bằng Apache Spark (chuyển đổi schema).
  - Lưu dữ liệu đã xử lý vào Cassandra trong bảng `created_users`.

- **Các bước chính:**
  1. **Kết nối Spark với Kafka:** Nhận dữ liệu từ topic.
  2. **Chuyển đổi dữ liệu:** Xác định schema, chuyển đổi JSON thành dataframe.
  3. **Lưu vào Cassandra:** Lưu dữ liệu vào bảng `created_users`.

---

### 2. Vai trò trong luồng tổng quan

| **Thành Phần**           | **Vai Trò**                                                                                                  |
|--------------------------|------------------------------------------------------------------------------------------------------------|
| **API (Random User)**    | Nguồn dữ liệu ban đầu cung cấp thông tin người dùng.                                                       |
| **`kafka-stream.py`**    | Tạo dữ liệu (producer) và gửi lên Kafka để truyền tải tới các thành phần khác trong thời gian thực.          |
| **Kafka (Message Queue)**| Trung gian truyền tải dữ liệu thời gian thực giữa producer (`kafka-stream.py`) và consumer (`spark-stream.py`).|
| **`spark-stream.py`**    | Xử lý dữ liệu từ Kafka, định dạng lại, và lưu trữ vào Cassandra.                                           |
| **Cassandra**            | Cơ sở dữ liệu lưu trữ kết quả cuối cùng của luồng dữ liệu.                                                |

---

### 3. Nếu bỏ đi một trong các thành phần

| **Thành Phần Bị Loại Bỏ** | **Hậu Quả**                                                                                                                                                   |
|---------------------------|---------------------------------------------------------------------------------------------------------------------------------------------------------------|
| **`kafka-stream.py`**     | - Dữ liệu sẽ không được đẩy lên Kafka, dẫn đến luồng dữ liệu không khởi động. <br>- Consumer (`spark-stream.py`) sẽ không nhận được dữ liệu để xử lý.          |
| **`spark-stream.py`**     | - Dữ liệu từ Kafka không được xử lý và lưu vào Cassandra. <br>- Cassandra sẽ không chứa dữ liệu để phục vụ phân tích hoặc sử dụng downstream.                  |
| **Kafka (Message Queue)** | - Mất kênh trung gian truyền tải dữ liệu giữa producer và consumer. <br>- `spark-stream.py` sẽ không nhận được dữ liệu từ `kafka-stream.py`.                  |
| **Cassandra**             | - Không có nơi lưu trữ kết quả cuối cùng của luồng. <br>- Dữ liệu sẽ chỉ tồn tại trong Spark hoặc Kafka và không được lưu trữ lâu dài để sử dụng sau này.      |

---

### Kết luận
Hai file này đóng vai trò cốt lõi trong kiến trúc **real-time streaming**:
- `kafka-stream.py`: Thu thập và gửi dữ liệu vào Kafka (producer).
- `spark-stream.py`: Nhận và xử lý dữ liệu từ Kafka, sau đó lưu trữ vào Cassandra (consumer).

Nếu bất kỳ thành phần nào bị bỏ đi, luồng dữ liệu sẽ không thể hoạt động hoàn chỉnh. Để đảm bảo luồng hoạt động tốt, cần kiểm tra kỹ các kết nối (Kafka, Cassandra) và định dạng dữ liệu trong cả hai file.

### Hiểu đơn giản về Kafka và cách hoạt động của 2 file `kafka-stream.py` và `spark-stream.py`

---

### **Kafka là gì?**
Hãy tưởng tượng **Kafka** giống như **hệ thống băng chuyền thông minh** trong một nhà máy.  
- **Nhiệm vụ của Kafka:** Vận chuyển dữ liệu từ nơi này đến nơi khác một cách nhanh chóng và liên tục.
- **Producer (Người gửi):** Gửi dữ liệu lên băng chuyền (Kafka).  
- **Consumer (Người nhận):** Lấy dữ liệu từ băng chuyền để xử lý hoặc lưu trữ.

---

### **File 1: `kafka-stream.py`**

#### **Vai trò:**
- File này là **Producer**: Tạo dữ liệu và gửi nó lên **Kafka**.
- Tưởng tượng: Đây là **người gửi hàng** trong nhà máy. Họ lấy thông tin từ nguồn (API) và đặt nó lên băng chuyền (Kafka).

#### **Cách hoạt động:**
1. **Lấy dữ liệu từ API `randomuser.me`:**
   - API này cung cấp thông tin về một người dùng ngẫu nhiên (họ tên, địa chỉ, email, ...).
   - Dữ liệu này giống như "nguyên liệu thô" mà nhà máy cần xử lý.

2. **Định dạng dữ liệu:**
   - File này "đóng gói" thông tin người dùng sao cho dễ hiểu và sẵn sàng để gửi.

3. **Gửi dữ liệu lên Kafka:**
   - Dữ liệu được gửi lên một **topic** tên là `users_created` trong Kafka.
   - Topic này giống như một "khu vực riêng" trên băng chuyền, nơi người gửi và người nhận giao tiếp.

#### **Nếu không có file này:**
- Không ai tạo hoặc gửi dữ liệu lên Kafka. Băng chuyền sẽ trống và hệ thống không hoạt động được.

---

### **File 2: `spark-stream.py`**

#### **Vai trò:**
- File này là **Consumer**: Lấy dữ liệu từ Kafka và xử lý nó.
- Tưởng tượng: Đây là **công nhân nhà máy**, nhận dữ liệu từ băng chuyền và biến nó thành sản phẩm hoàn chỉnh.

#### **Cách hoạt động:**
1. **Kết nối đến Kafka:**
   - File này kết nối tới Kafka và lắng nghe dữ liệu từ topic `users_created`.

2. **Xử lý dữ liệu với Spark:**
   - Dữ liệu được "chuyển đổi" và chuẩn bị cho việc lưu trữ. Spark làm nhiệm vụ này giống như công nhân nhà máy chế biến nguyên liệu.

3. **Lưu dữ liệu vào Cassandra:**
   - Sau khi xử lý xong, dữ liệu được lưu trữ trong cơ sở dữ liệu Cassandra (giống như đưa sản phẩm vào kho).

#### **Nếu không có file này:**
- Không ai nhận và xử lý dữ liệu từ Kafka. Dữ liệu trên băng chuyền sẽ không được sử dụng.

---

### **Tóm lại luồng hoạt động:**

1. **File `kafka-stream.py`:**
   - **Lấy dữ liệu từ API** → **Định dạng dữ liệu** → **Gửi lên Kafka** (băng chuyền).
   - Vai trò: **Producer** (Người gửi).

2. **Kafka:**
   - Lưu trữ và vận chuyển dữ liệu từ **Producer (kafka-stream.py)** đến **Consumer (spark-stream.py)**.

3. **File `spark-stream.py`:**
   - **Nhận dữ liệu từ Kafka** → **Xử lý bằng Spark** → **Lưu vào Cassandra**.
   - Vai trò: **Consumer** (Người nhận).

---

### **Nếu không có Kafka thì sao?**
- Kafka chính là "băng chuyền". Nếu không có Kafka:
  - **Producer** (file `kafka-stream.py`) không có cách nào gửi dữ liệu đi.
  - **Consumer** (file `spark-stream.py`) không nhận được dữ liệu để xử lý.
  - Hệ thống sẽ không hoạt động.

---

### **Tóm tắt dễ hiểu:**
- **Kafka**: Là "băng chuyền" vận chuyển dữ liệu.
- **`kafka-stream.py`**: Là "người gửi hàng", tạo dữ liệu và đặt lên băng chuyền (Kafka).
- **`spark-stream.py`**: Là "công nhân", lấy dữ liệu từ băng chuyền (Kafka), xử lý và lưu lại.

Dựa trên thông tin bạn cung cấp và hình ảnh, vấn đề có thể xảy ra do một số nguyên nhân sau:

---

### **Nguyên nhân 1: Control Center không hiển thị dữ liệu cũ**
- **Control Center chỉ hiển thị dữ liệu được gửi vào sau khi trang được mở**. Dữ liệu đã có sẵn trong topic (xác minh bằng `kafka-console-consumer`) sẽ không được hiển thị nếu chúng đã được gửi trước đó.

#### **Giải pháp:**
1. **Gửi thêm dữ liệu mới vào topic:**
   - Chạy lại `kafka-stream.py` để gửi thêm dữ liệu vào Kafka.
2. **Refresh Control Center:**
   - Mở lại tab `Messages` trong Control Center sau khi dữ liệu mới đã được gửi.

---

### **Nguyên nhân 2: Không có Schema được xác định cho topic**
- **Control Center có thể không hiển thị dữ liệu nếu không có schema (Avro, JSON, etc.) được cấu hình trong Schema Registry**. Dữ liệu được gửi dưới dạng raw binary sẽ không hiển thị trên giao diện.

#### **Giải pháp:**
1. **Kiểm tra định dạng dữ liệu:**
   - Trong script `kafka-stream.py`, kiểm tra định dạng dữ liệu gửi lên Kafka. Đảm bảo rằng dữ liệu được gửi dưới dạng JSON.
   - Dòng gửi dữ liệu phải giống như:
     ```python
     producer.send('users_created', json.dumps(data).encode('utf-8'))
     ```

2. **Xác minh Schema Registry:**
   - Kiểm tra Schema Registry bằng cách truy cập:
     ```
     http://localhost:8081
     ```
   - Đảm bảo rằng topic `users_created` có schema tương ứng được đăng ký.

---

### **Nguyên nhân 3: Consumer đang tiêu thụ dữ liệu**
- **Một Consumer khác có thể đang tiêu thụ hết dữ liệu từ topic `users_created`**, khiến Control Center không thể hiển thị dữ liệu nào trong tab `Messages`.

#### **Giải pháp:**
1. **Kiểm tra consumer:**
   - Chạy lệnh kiểm tra các consumer group đang sử dụng topic `users_created`:
     ```bash
     docker exec -it broker kafka-consumer-groups --bootstrap-server broker:29092 --describe --all-groups
     ```
   - Xem các thông tin như `OFFSET` và `LAG` để kiểm tra dữ liệu đã được tiêu thụ hay chưa.

2. **Tạm dừng consumer khác:**
   - Nếu có consumer đang tiêu thụ dữ liệu, tạm dừng hoặc dừng consumer đó để xác minh.

---

### **Nguyên nhân 4: Offset không được đặt đúng**
- **Control Center có thể không tự động đọc từ offset `earliest`.** Nếu offset mặc định được đặt là `latest`, nó sẽ không hiển thị dữ liệu đã được gửi trước đó.

#### **Giải pháp:**
1. **Đặt lại offset về `earliest`:**
   - Trong Control Center, sử dụng tùy chọn `Jump to offset` và chọn offset `0`.
   - Hoặc chạy lệnh sau để kiểm tra:
     ```bash
     docker exec -it broker kafka-console-consumer \
       --bootstrap-server broker:29092 \
       --topic users_created \
       --from-beginning
     ```

---

### **Nguyên nhân 5: Vấn đề hiển thị trong Control Center**
- **Control Center có thể bị lỗi caching hoặc refresh chậm.**

#### **Giải pháp:**
1. **Xóa cache trình duyệt:**
   - Refresh trang hoặc thử mở lại Control Center trong trình duyệt ẩn danh.
2. **Kiểm tra log Control Center:**
   - Chạy lệnh kiểm tra log:
     ```bash
     docker logs control-center
     ```
   - Xem các lỗi liên quan đến kết nối hoặc hiển thị.

---

### **Tóm tắt các bước cần thực hiện**
1. **Gửi thêm dữ liệu mới vào Kafka và refresh Control Center.**
2. **Đảm bảo dữ liệu được gửi dưới dạng JSON hoặc Avro.**
3. **Kiểm tra các consumer khác đang tiêu thụ topic.**
4. **Đặt offset về `earliest` để xem toàn bộ dữ liệu.**
5. **Kiểm tra log của Control Center để phát hiện lỗi.**

Nếu vẫn không khắc phục được, bạn có thể chia sẻ thêm thông tin log từ Control Center hoặc Kafka để phân tích chi tiết hơn!

## Check thử 
```
2. **Xác minh Schema Registry:**
   - Kiểm tra Schema Registry bằng cách truy cập:
     ```
     http://localhost:8081
     ```
   - Đảm bảo rằng topic `users_created` có schema tương ứng được đăng ký.

---

### **Nguyên nhân 3: Consumer đang tiêu thụ dữ liệu**
- **Một Consumer khác có thể đang tiêu thụ hết dữ liệu từ topic `users_created`**, khiến Control Center không thể hiển thị dữ liệu nào trong tab `Messages`.

#### **Giải pháp:**
1. **Kiểm tra consumer:**
   - Chạy lệnh kiểm tra các consumer group đang sử dụng topic `users_created`:
     ```bash
     docker exec -it broker kafka-consumer-groups --bootstrap-server broker:29092 --describe --all-groups
     ```
   - Xem các thông tin như `OFFSET` và `LAG` để kiểm tra dữ liệu đã được tiêu thụ hay chưa.

2. **Tạm dừng consumer khác:**
   - Nếu có consumer đang tiêu thụ dữ liệu, tạm dừng hoặc dừng consumer đó để xác minh.

```

### Phân tích vấn đề từ dữ liệu bạn cung cấp


#### 2. **Không thấy Schema tại `localhost:8081`:**
- Giao diện Schema Registry tại `http://localhost:8081` trả về `{}`.
- Điều này cho thấy không có schema nào được đăng ký cho topic `users_created`.

**Hệ quả:**
- Nếu topic được ghi dữ liệu dưới dạng `JSON` hoặc `Avro`, nhưng không đăng ký schema, Control Center sẽ không biết cách diễn giải dữ liệu, và do đó không hiển thị dữ liệu trong tab **Messages**.

---

#### 3. **Consumer group `_confluent-controlcenter-7-4-0-lastProduceTimeConsumer` không có thành viên hoạt động:**
- **Thông báo:** `Consumer group '_confluent-controlcenter-7-4-0-lastProduceTimeConsumer' has no active members.`
- Điều này cho thấy consumer group không có ứng dụng nào đang hoạt động để tiêu thụ dữ liệu từ topic `users_created`.

---

### Giải pháp để khắc phục



#### 2. **Đăng ký Schema cho topic `users_created`:**
- Nếu dữ liệu được gửi vào topic dưới dạng JSON hoặc Avro, cần đăng ký schema.
- Đăng ký schema qua lệnh `curl` hoặc REST API của Schema Registry:
  ```bash
  curl -X POST -H "Content-Type: application/vnd.schemaregistry.v1+json" \
       --data '{
         "schema": "{\"type\":\"record\",\"name\":\"User\",\"fields\":[{\"name\":\"first_name\",\"type\":\"string\"},{\"name\":\"last_name\",\"type\":\"string\"},{\"name\":\"gender\",\"type\":\"string\"},{\"name\":\"address\",\"type\":\"string\"},{\"name\":\"post_code\",\"type\":\"string\"},{\"name\":\"email\",\"type\":\"string\"},{\"name\":\"username\",\"type\":\"string\"},{\"name\":\"dob\",\"type\":\"string\"},{\"name\":\"registered_date\",\"type\":\"string\"},{\"name\":\"phone\",\"type\":\"string\"},{\"name\":\"picture\",\"type\":\"string\"}]}"
       }' http://localhost:8081/subjects/users_created-value/versions
  ```

**Lưu ý:**
- Thay schema trong JSON trên bằng định dạng thực tế của dữ liệu bạn đang gửi.

---

#### 3. **Khởi động lại consumer group cho topic `users_created`:**
- Đảm bảo consumer group `_confluent-controlcenter-7-4-0-lastProduceTimeConsumer` có thành viên hoạt động.
- Nếu cần, restart `Control Center`:
  ```bash
  docker-compose restart control-center
  ```
- Check lại Control Center


---



### fix 1 hồi thì là do cần phải **Đăng ký Schema cho topic `users_created`:**
```
curl -X POST -H "Content-Type: application/vnd.schemaregistry.v1+json" \
     --data '{
       "schema": "{\"type\":\"record\",\"name\":\"User\",\"fields\":[{\"name\":\"first_name\",\"type\":\"string\"},{\"name\":\"last_name\",\"type\":\"string\"},{\"name\":\"gender\",\"type\":\"string\"},{\"name\":\"address\",\"type\":\"string\"},{\"name\":\"post_code\",\"type\":\"string\"},{\"name\":\"email\",\"type\":\"string\"},{\"name\":\"username\",\"type\":\"string\"},{\"name\":\"dob\",\"type\":\"string\"},{\"name\":\"registered_date\",\"type\":\"string\"},{\"name\":\"phone\",\"type\":\"string\"},{\"name\":\"picture\",\"type\":\"string\"}]}"
     }' http://localhost:8081/subjects/users_created-value/versions

```