In [1]:
import os
import pandas as pd
import clickhouse_connect

In [None]:
client = clickhouse_connect.get_client(
    host=os.getenv("CLICKHOUSE_HOST"),
    port=int(os.getenv("CLICKHOUSE_PORT")),
    username=os.getenv("CLICKHOUSE_USER"),
    password=os.getenv("CLICKHOUSE_PASSWORD")
)

alert = client.query_df("""
SELECT server_ip, time_window
FROM cem_kpi.debug_alert_p20
WHERE alert IN (2,3)
GROUP BY server_ip, time_window 
ORDER BY server_ip, time_window
""")
#kết nối đến clickhouse và lấy dữ liệu từ bảng debug_alert_p20 với điều kiện alert_server là 2 hoặc 3, lưu vào dataframe alert

In [4]:
alert.head(100000).to_csv('alert_p20.csv', index=False)

In [5]:
def get_raw_table(time_window):
    if time_window.month == 12 and time_window.day == 26:
        return None
    date_str=time_window.strftime('%Y_%d_%m')
    return f"cem_network_test.result_full_flow_{date_str}"
#hàm để lấy dữ liệu thô từ bảng tương ứng với time_window và trả về bảng dữ liệu chứa dataframe tương ứng

In [13]:
# duyệt qua từng dòng trong dataframe alert với index là chỉ số của dòng và row là nội dung dòng
for idx, row in alert.iterrows():
    #kiểm tra tiến trình mỗi 10 dòng một lần
    if idx % 10 == 0:
        print(f"Xử lý: {idx}/{len(alert)}")
    server_ip = row['server_ip']
    time_window = row['time_window']
    #gọi hàm get_raw_table để lấy bảng dữ liệu thô tương ứng với time_window
    table = get_raw_table(time_window)
    if table is None:
        continue
    start_ts = int(time_window.timestamp())
    end_ts = start_ts + 1800
    query = f"""
    SELECT
        client_ip,
        client_mac,
        tls_domain,
        dns_domain,
        COUNT(*) as total_flow,
        '{server_ip}' as server_ip,
        toDateTime({start_ts}) as time_window
    FROM {table}
    WHERE 
        server_ip = '{server_ip}'
        AND from_time >= {start_ts}
        AND from_time < {end_ts}
    GROUP BY client_ip, client_mac, tls_domain, dns_domain
    LIMIT 10000
    """
    # bắt đầu khối lệnh có thể gây lỗi và xử lý ngoại lệ
    try:
        agg = client.query_df(query)
        print(f"  → {len(agg)} grouped rows cho {server_ip}")
    except Exception as e:
        print(f"  ✗ Lỗi: {e}")
        continue
    
    if agg.empty:
        continue
    
    # Insert trực tiếp
    try:
        client.insert_df('cem_kpi.alert_client_trace_test', agg[[
            'time_window',
            'client_ip',
            'server_ip',
            'client_mac',
            'dns_domain',
            'tls_domain',
            'total_flow',
        ]])
        print(f"  ✓ Insert {len(agg)} rows")
    except Exception as e:
        print(f"  ✗ Lỗi insert: {e}")

print("✓ Hoàn thành!")

Xử lý: 0/273
  → 10000 grouped rows cho 101.47.17.233
  ✓ Insert 10000 rows
  → 10000 grouped rows cho 101.91.111.172
  ✓ Insert 10000 rows
  → 10000 grouped rows cho 101.91.22.249
  ✓ Insert 10000 rows
  → 10000 grouped rows cho 101.91.22.249
  ✓ Insert 10000 rows
  → 0 grouped rows cho 103.151.4.115
  → 10000 grouped rows cho 111.91.32.34
  ✓ Insert 10000 rows
  ✗ Lỗi: Received ClickHouse exception, code: 241, server response: Code: 241. DB::Exception: (total) memory limit exceeded: would use 6.52 GiB (attempt to allocate chunk of 4.05 MiB), current RSS: 1.91 GiB, maximum: 6.52 GiB. OvercommitTracker decision: Query was selected to stop by OvercommitTracker: while reading column from_time at /var/lib/clickhouse/store/e90/e90c06e1-697e-40e0-a946-3376517b6403/: While executing Log. (MEMORY_LIMIT_EXCEEDED) (for url http://192.168.100.8:8123)
  ✗ Lỗi: Received ClickHouse exception, code: 241, server response: Code: 241. DB::Exception: (total) memory limit exceeded: would use 6.53 GiB (at