In [13]:
from google.cloud import bigquery
from google.oauth2 import service_account
import os
from dotenv import load_dotenv
# Nạp biến môi trường
load_dotenv()

# Bigquery credentials
TYPE_BQ = os.getenv("TYPE_BQ")   
PROJECT_ID_BQ = os.getenv("PROJECT_ID_BQ")  
PRIVATE_KEY_ID_BQ = os.getenv("PRIVATE_KEY_ID_BQ")  
PRIVATE_KEY_BQ = os.getenv("PRIVATE_KEY_BQ").replace("\\n", "\n")
CLIENT_EMAIL_BQ = os.getenv("CLIENT_EMAIL_BQ")  
CLIENT_ID_BQ = os.getenv("CLIENT_ID_BQ")  
AUTH_URI_BQ = os.getenv("AUTH_URI_BQ")  
TOKEN_URI_BQ = os.getenv("TOKEN_URI_BQ")  
AUTH_PROVIDER_X509_CERT_URL_BQ = os.getenv("AUTH_PROVIDER_X509_CERT_URL_BQ")  
CLIENT_X509_CERT_URL_BQ = os.getenv("CLIENT_X509_CERT_URL_BQ")  
UNIVERSE_DOMAIN_BQ = os.getenv("UNIVERSE_DOMAIN_BQ")   

# --- Cấu hình BigQuery ---
# Lấy thông tin credentials từ biến môi trường
try:
    # Tạo thông tin credentials từ biến môi trường
    credentials_dict = {
        "type": TYPE_BQ,
        "project_id": PROJECT_ID_BQ,
        "private_key_id": PRIVATE_KEY_ID_BQ,
        "private_key": PRIVATE_KEY_BQ,
        "client_email": CLIENT_EMAIL_BQ,
        "client_id": CLIENT_ID_BQ,
        "auth_uri": AUTH_URI_BQ,
        "token_uri": TOKEN_URI_BQ,
        "auth_provider_x509_cert_url": AUTH_PROVIDER_X509_CERT_URL_BQ,
        "client_x509_cert_url": CLIENT_X509_CERT_URL_BQ,
        "universe_domain": UNIVERSE_DOMAIN_BQ,
    }
    # Tạo credentials từ dictionary
    credentials = service_account.Credentials.from_service_account_info(credentials_dict)
    PROJECT_ID = credentials_dict['project_id']

except Exception as e:
    print(f"Lỗi khi nạp thông tin BigQuery credentials từ biến môi trường: {e}")
    exit() # Thoát nếu không thể nạp credentials

# Khởi tạo client BigQuery
try:
    client = bigquery.Client(credentials=credentials, project=PROJECT_ID)
    print(f"Kết nối BigQuery thành công tới Project: {PROJECT_ID}")
except Exception as e:
    print(f"Lỗi khi kết nối tới BigQuery: {e}")
    exit() # Thoát nếu không thể kết nối

# Thay thế {DATASET_ID_BQ} bằng ID Dataset thực tế của bạn trong BigQuery
DATASET_ID_BQ = os.getenv("DATASET_ID_BQ") # Nên đặt Dataset ID vào biến môi trường
if not DATASET_ID_BQ:
    print("Lỗi: Biến môi trường DATASET_ID_BQ chưa được đặt.")
    exit()

Kết nối BigQuery thành công tới Project: gen-lang-client-0478626769


# Xóa Database cũ

In [14]:
# Lấy danh sách tất cả các bảng trong dataset
try:
    tables = list(client.list_tables(DATASET_ID_BQ))
    if not tables:
        print(f"Dataset '{DATASET_ID_BQ}' không có bảng nào.")
    else:
        for table in tables:
            table_id = f"{PROJECT_ID}.{DATASET_ID_BQ}.{table.table_id}"
            client.delete_table(table_id, not_found_ok=True)
            print(f"✅ Đã xóa bảng: {table_id}")
except Exception as e:
    print(f"❌ Lỗi khi xóa bảng trong dataset: {e}")


Dataset 'warehouse_loc' không có bảng nào.


# Tạo database

In [15]:
create_tables_bq = f"""
-- BigQuery Standard SQL for creating tables

-- Replace `{DATASET_ID_BQ}` with your actual BigQuery dataset ID
-- Example: `my_gcp_project.my_dataset.group_mc`

-- 1. Create group_mc table
-- Referenced by 'machine' and 'machine_pos'
CREATE TABLE IF NOT EXISTS {DATASET_ID_BQ}.group_mc (
    id INT64, -- Corresponds to INTEGER PRIMARY KEY
    mc_name STRING -- Corresponds to VARCHAR(255)
);

-- 2. Create machine_type table
-- Referenced by 'spare_parts'
CREATE TABLE IF NOT EXISTS {DATASET_ID_BQ}.machine_type (
    id INT64, -- Corresponds to INTEGER PRIMARY KEY
    machine STRING -- Corresponds to VARCHAR(255)
);

-- 3. Create machine table
-- References 'group_mc'
CREATE TABLE IF NOT EXISTS {DATASET_ID_BQ}.machine (
    name STRING, -- Corresponds to VARCHAR(255) PRIMARY KEY
    group_mc_id INT64 -- Corresponds to INTEGER
    -- FOREIGN KEY constraint is not enforced in BigQuery
);

-- 4. Create employees table
-- Referenced by 'import_export'
CREATE TABLE IF NOT EXISTS {DATASET_ID_BQ}.employees (
    amann_id STRING, -- Corresponds to VARCHAR(255) PRIMARY KEY
    name STRING, -- Corresponds to VARCHAR(255)
    title STRING, -- Corresponds to VARCHAR(255)
    level STRING, -- Corresponds to VARCHAR(255)
    active BOOL, -- Corresponds to INTEGER (boolean 0/1), mapped to BOOL TRUE/FALSE
    birthday DATE, -- Corresponds to TEXT (ISO8601 date 'YYYY-MM-DD'), mapped to DATE
    start_date DATE, -- Corresponds to TEXT (ISO8601 date 'YYYY-MM-DD'), mapped to DATE
    check_in_time TIME, -- Corresponds to TEXT (ISO8601 time 'HH:MM:SS'), mapped to TIME
    check_out_time TIME, -- Corresponds to TEXT (ISO8601 time 'HH:MM:SS'), mapped to TIME
    address STRING, -- Corresponds to VARCHAR(255)
    phone_number STRING, -- Corresponds to VARCHAR(50)
    email STRING, -- Corresponds to VARCHAR(255)
    gender STRING, -- Corresponds to VARCHAR(10)
    years_worked INT64, -- Corresponds to INTEGER
    age INT64 -- Corresponds to INTEGER
);

-- 5. Create machine_pos table
-- References 'group_mc', Referenced by 'import_export'
CREATE TABLE IF NOT EXISTS {DATASET_ID_BQ}.machine_pos (
    mc_pos STRING, -- Corresponds to VARCHAR(255) PRIMARY KEY
    mc_id INT64 -- Corresponds to INTEGER
    -- FOREIGN KEY constraint is not enforced in BigQuery
);

-- 6. Create spare_parts table
-- References 'machine_type', Referenced by 'import_export'
CREATE TABLE IF NOT EXISTS {DATASET_ID_BQ}.spare_parts (
    material_no STRING, -- Corresponds to VARCHAR(255) PRIMARY KEY
    part_no STRING, -- Corresponds to VARCHAR(255)
    description STRING, -- Corresponds to TEXT
    machine_type_id INT64, -- Corresponds to INTEGER
    bin STRING, -- Corresponds to VARCHAR(50)
    cost_center STRING, -- Corresponds to VARCHAR(255)
    price FLOAT64, -- Corresponds to REAL/NUMERIC, mapped to FLOAT64
    stock INT64, -- Corresponds to INTEGER
    safety_stock INT64, -- Corresponds to INTEGER
    safety_stock_check BOOL, -- Corresponds to INTEGER (boolean 0/1), mapped to BOOL TRUE/FALSE
    image_url STRING, -- Corresponds to VARCHAR(255)
    import_date TIMESTAMP, -- Corresponds to TEXT (datetime), mapped to TIMESTAMP
    export_date TIMESTAMP -- Corresponds to TEXT (datetime), mapped to TIMESTAMP
    -- FOREIGN KEY constraint is not enforced in BigQuery
);

-- 7. Create import_export table
-- References 'spare_parts', 'machine_pos', and 'employees'
CREATE TABLE IF NOT EXISTS {DATASET_ID_BQ}.import_export (
    part_id STRING, -- Corresponds to VARCHAR(255)
    quantity INT64, -- Corresponds to INTEGER
    mc_pos_id STRING, -- Corresponds to VARCHAR(255)
    empl_id STRING, -- Corresponds to VARCHAR(255)
    date DATE, -- Corresponds to DATE, mapped to DATE
    reason STRING, -- Corresponds to TEXT
    im_ex_flag BOOL -- Corresponds to INTEGER (boolean 0/1), mapped to BOOL TRUE/FALSE
    -- FOREIGN KEY constraints are not enforced in BigQuery
);

-- CREATE INDEX is not used in BigQuery.
-- Performance is managed via clustering and partitioning.
"""

# Insert dữ liệu

In [16]:
insert_data_bq = f"""
-- BigQuery Standard SQL for inserting data

-- Replace `{DATASET_ID_BQ}` with your actual BigQuery dataset ID
-- Example: `my_gcp_project.my_dataset.group_mc`

-- Insert data into group_mc table
INSERT INTO {DATASET_ID_BQ}.group_mc (id, mc_name)
VALUES
(1, 'Chang Yi'),
(2, 'Fong'),
(3, 'SSM'),
(4, 'Graf'),
(5, 'Ugolini'),
(6, 'Tecnorama'),
(7, 'Fadis'),
(8, 'DETTIN');

-- Insert data into machine_type table
INSERT INTO {DATASET_ID_BQ}.machine_type (id, machine)
VALUES
(1, 'Chang Yi'),
(2, 'Fong'),
(3, 'SSM'),
(4, 'Graf'),
(5, 'Ugolini'),
(6, 'Tecnorama'),
(7, 'Fadis'),
(8, 'DETTIN');

-- Insert data into machine table
INSERT INTO {DATASET_ID_BQ}.machine (name, group_mc_id)
VALUES
('Chang Yi-1', 1),
('Chang Yi-2', 1),
('Chang Yi-3', 1),
('Chang Yi-4', 1),
('Chang Yi-5', 1),
('Fong-1', 2),
('Fong-2', 2),
('Fong-3', 2),
('Fong-4', 2),
('SSM-1', 3),
('SSM-2', 3),
('SSM-3', 3),
('SSM-4', 3),
('Graf-1', 4),
('Graf-2', 4),
('Graf-3', 4),
('Graf-4', 4),
('Ugolini-1', 5),
('Ugolini-2', 5),
('Ugolini-3', 5),
('Tecnorama-1', 6),
('Tecnorama-2', 6),
('Fadis-1', 7),
('Fadis-2', 7),
('DETTIN-1', 8);

-- Insert data into spare_parts table
-- Added image_url, import_date, export_date columns with NULL values as they were not in the original insert
INSERT INTO {DATASET_ID_BQ}.spare_parts (
    material_no, part_no, description, machine_type_id, bin, cost_center, price, stock, safety_stock, safety_stock_check, image_url, import_date, export_date
) VALUES
('SP001', 'P001', 'M0F67-MECHANICALSEAL', 1, 'A1', 'D.10', 50.00, 10, 20, TRUE, NULL, NULL, NULL), -- 1 becomes TRUE
('SP002', 'P002', 'MF86E-SL2HOSESCREWONCONNECTIONWITHMALETHREAD618', 2, 'A2', 'CC002', 75.00, 150, 30, FALSE, NULL, NULL, NULL), -- 0 becomes FALSE
('SP003', 'P003', 'MC1E9-SL2CERAMICSPLITEYELETD68H6D28MM', 1, 'B1', 'D.11', 20.00, 60, 15, TRUE, NULL, NULL, NULL), -- 1 becomes TRUE
('SP004', 'P004', 'MC29E-SL2AGCERAMICSPLITEYELETD65H68D38MM', 1, 'B2', 'CC003', 25.00, 80, 20, TRUE, NULL, NULL, NULL), -- 1 becomes TRUE
('SP005', 'P005', 'MD7A5-SL2PUGLASSCOVERGEARS', 2, 'B3', 'CC004', 18.00, 40, 10, FALSE, NULL, NULL, NULL), -- 0 becomes FALSE
('SP006', 'P006', 'M1E71-SL2SEALRUBBER', 2, 'B4', 'D.12', 12.50, 90, 25, FALSE, NULL, NULL, NULL), -- 0 becomes FALSE
('SP007', 'P007', 'MB509-BEARING60002Z', 1, 'C1', 'D.13', 10.00, 100, 30, TRUE, NULL, NULL, NULL), -- 1 becomes TRUE
('SP008', 'P008', 'MB862-HOSEPVCTRANSPARENT64X1MM', 3, 'C2', 'D.14', 15.00, 70, 20, TRUE, NULL, NULL, NULL), -- 1 becomes TRUE
('SP009', 'P009', 'MAB39-NEEDLEFORPWINDING69MMLI', 3, 'C3', 'CC005', 7.50, 30, 10, FALSE, NULL, NULL, NULL), -- 0 becomes FALSE
('SP010', 'P010', 'MC40B-SUPPLYPACKAGEPLATE', 1, 'C4', 'D.15', 22.00, 45, 15, TRUE, NULL, NULL, NULL), -- 1 becomes TRUE
('SP011', 'P011', 'M5BF7-NAN', 2, 'D1', 'CC006', 30.00, 50, 20, FALSE, NULL, NULL, NULL); -- 0 becomes FALSE

-- Insert data into machine_pos table
INSERT INTO {DATASET_ID_BQ}.machine_pos (mc_pos, mc_id)
VALUES
('POS1', 1),
('POS2', 1),
('POS3', 1),
('POS5', 1),
('FO1', 2),
('FO2', 2),
('FO3', 2),
('SM1', 3),
('SM2', 3),
('SM3', 3),
('GR1', 4),
('GR2', 4),
('GR3', 4),
('GR4', 4),
('GR5', 4),
('UGO1', 5),
('UGO2', 5),
('UGO3', 5),
('UGO4', 5);

-- Insert data into import_export table
INSERT INTO {DATASET_ID_BQ}.import_export (part_id, quantity, mc_pos_id, empl_id, date, reason, im_ex_flag)
VALUES
    ('SP001', 100, 'POS1', 'EMP001', '2025-05-10', 'Nhập hàng để thay thế', TRUE), -- 1 becomes TRUE
    ('SP002', 50, 'FO1', 'EMP002', '2025-05-11', 'Xuất hàng cho bảo trì', FALSE), -- 0 becomes FALSE
    ('SP003', 200, 'SM1', 'EMP003', '2025-05-12', 'Nhập hàng để bổ sung kho', TRUE), -- 1 becomes TRUE
    ('SP004', 10, 'GR1', 'EMP004', '2025-05-13', 'Xuất hàng cho sửa chữa', FALSE), -- 0 becomes FALSE
    ('SP005', 150, 'UGO1', 'EMP005', '2025-05-14', 'Nhập hàng từ nhà cung cấp', TRUE); -- 1 becomes TRUE

-- Insert data into employees table
-- Calculate age and years_worked directly during insertion
INSERT INTO {DATASET_ID_BQ}.employees (amann_id, name, title, level, active, birthday, start_date, check_in_time, check_out_time, address, phone_number, email, gender, years_worked, age)
VALUES
('EMP001', 'Nguyễn Văn A', 'Senior Engineer', 'Senior', TRUE, '1991-01-10', '2019-03-15', '08:00:00', '17:00:00', 'Tam Kỳ', '0901234501', 'nguyenvana@amann.com', 'Nam', EXTRACT(YEAR FROM CURRENT_DATE()) - EXTRACT(YEAR FROM PARSE_DATE('%Y-%m-%d', '2019-03-15')), EXTRACT(YEAR FROM CURRENT_DATE()) - EXTRACT(YEAR FROM PARSE_DATE('%Y-%m-%d', '1991-01-10'))),
('EMP002', 'Trần Thị B', 'Middle Technician', 'Middle', TRUE, '1992-02-12', '2020-04-18', '08:00:00', '17:00:00', 'Đà Nẵng', '0901234502', 'tranthib@amann.com', 'Nữ', EXTRACT(YEAR FROM CURRENT_DATE()) - EXTRACT(YEAR FROM PARSE_DATE('%Y-%m-%d', '2020-04-18')), EXTRACT(YEAR FROM CURRENT_DATE()) - EXTRACT(YEAR FROM PARSE_DATE('%Y-%m-%d', '1992-02-12'))),
('EMP003', 'Hoàng Trung', 'Lead Supervisor', 'Lead', TRUE, '1993-03-20', '2021-05-10', '08:30:00', '17:30:00', 'Tam Kỳ', '0901234503', 'hoangtrung@amann.com', 'Nam', EXTRACT(YEAR FROM CURRENT_DATE()) - EXTRACT(YEAR FROM PARSE_DATE('%Y-%m-%d', '2021-05-10')), EXTRACT(YEAR FROM CURRENT_DATE()) - EXTRACT(YEAR FROM PARSE_DATE('%Y-%m-%d', '1993-03-20'))),
('EMP004', 'Lê Hoàng', 'Senior Specialist', 'Senior', TRUE, '1995-06-25', '2018-06-01', '08:00:00', '16:30:00', 'TP.HCM', '0901234504', 'lehoang@amann.com', 'Nam', EXTRACT(YEAR FROM CURRENT_DATE()) - EXTRACT(YEAR FROM PARSE_DATE('%Y-%m-%d', '2018-06-01')), EXTRACT(YEAR FROM CURRENT_DATE()) - EXTRACT(YEAR FROM PARSE_DATE('%Y-%m-%d', '1995-06-25'))),
('EMP005', 'Ngọc Lan', 'Middle Operator', 'Middle', TRUE, '1990-07-15', '2020-01-15', '09:00:00', '18:00:00', 'Tam Kỳ', '0901234505', 'ngoclan@amann.com', 'Nữ', EXTRACT(YEAR FROM CURRENT_DATE()) - EXTRACT(YEAR FROM PARSE_DATE('%Y-%m-%d', '2020-01-15')), EXTRACT(YEAR FROM CURRENT_DATE()) - EXTRACT(YEAR FROM PARSE_DATE('%Y-%m-%d', '1990-07-15'))),
('EMP006', 'Thanh Nguyên', 'Middle Engineer', 'Middle', TRUE, '1994-10-05', '2021-07-20', '08:00:00', '17:00:00', 'Số 16, Quận 6, TP.HCM', '0901234506', 'thanhnguyen@amann.com', 'Nam', EXTRACT(YEAR FROM CURRENT_DATE()) - EXTRACT(YEAR FROM PARSE_DATE('%Y-%m-%d', '2021-07-20')), EXTRACT(YEAR FROM CURRENT_DATE()) - EXTRACT(YEAR FROM PARSE_DATE('%Y-%m-%d', '1994-10-05'))),
('EMP007', 'Khánh Hòa', 'Junior Technician', 'Junior', TRUE, '1996-11-23', '2022-08-30', '08:30:00', '17:30:00', 'Tam Kỳ', '0901234507', 'khanhhoa@amann.com', 'Nữ', EXTRACT(YEAR FROM CURRENT_DATE()) - EXTRACT(YEAR FROM PARSE_DATE('%Y-%m-%d', '2022-08-30')), EXTRACT(YEAR FROM CURRENT_DATE()) - EXTRACT(YEAR FROM PARSE_DATE('%Y-%m-%d', '1996-11-23'))),
('EMP008', 'Đức Phát', 'Middle Specialist', 'Middle', TRUE, '1990-12-01', '2019-09-10', '08:00:00', '16:30:00', 'Đà Nẵng', '0901234508', 'ducphat@amann.com', 'Nam', EXTRACT(YEAR FROM CURRENT_DATE()) - EXTRACT(YEAR FROM PARSE_DATE('%Y-%m-%d', '2019-09-10')), EXTRACT(YEAR FROM CURRENT_DATE()) - EXTRACT(YEAR FROM PARSE_DATE('%Y-%m-%d', '1990-12-01'))),
('EMP009', 'Tùng Tiến', 'Junior Operator', 'Junior', TRUE, '1995-09-18', '2021-04-01', '09:00:00', '18:00:00', 'Đà Nẵng', '0901234509', 'tungtien@amann.com', 'Nam', EXTRACT(YEAR FROM CURRENT_DATE()) - EXTRACT(YEAR FROM PARSE_DATE('%Y-%m-%d', '2021-04-01')), EXTRACT(YEAR FROM CURRENT_DATE()) - EXTRACT(YEAR FROM PARSE_DATE('%Y-%m-%d', '1995-09-18'))),
('EMP010', 'Thùy Linh', 'Middle Engineer', 'Middle', TRUE, '1990-05-10', '2020-02-20', '08:30:00', '17:30:00', 'Huế', '0901234510', 'thuylinh@amann.com', 'Nữ', EXTRACT(YEAR FROM CURRENT_DATE()) - EXTRACT(YEAR FROM PARSE_DATE('%Y-%m-%d', '2020-02-20')), EXTRACT(YEAR FROM CURRENT_DATE()) - EXTRACT(YEAR FROM PARSE_DATE('%Y-%m-%d', '1990-05-10'))),
('EMP011', 'Bảo Hiền', 'Junior Technician', 'Junior', TRUE, '1992-04-22', '2021-09-05', '08:00:00', '16:30:00', 'Huế', '0901234511', 'baohien@amann.com', 'Nữ', EXTRACT(YEAR FROM CURRENT_DATE()) - EXTRACT(YEAR FROM PARSE_DATE('%Y-%m-%d', '2021-09-05')), EXTRACT(YEAR FROM CURRENT_DATE()) - EXTRACT(YEAR FROM PARSE_DATE('%Y-%m-%d', '1992-04-22')));

-- No separate UPDATE needed for age/years_worked as they are calculated during INSERT now.
"""

In [17]:
def execute_bigquery_sql(sql_string, client):
    """Thực thi chuỗi SQL trong BigQuery."""
    # Tách các câu lệnh SQL dựa trên dấu chấm phẩy, nhưng cẩn thận với dấu chấm phẩy trong chuỗi
    # Đây là một cách đơn giản; cho các script phức tạp hơn, cần parser tốt hơn
    statements = [stmt.strip() for stmt in sql_string.split(';') if stmt.strip()]

    for statement in statements:
        print(f"Đang thực thi: {statement[:100]}...") # In vài ký tự đầu để debug
        try:
            query_job = client.query(statement)
            query_job.result() # Chờ cho đến khi query hoàn thành
            print("Thành công.")
        except Exception as e:
            print(f"Lỗi khi thực thi câu lệnh SQL: {e}")
            # Tùy chọn: Tiếp tục hoặc dừng lại khi gặp lỗi
            # raise e # uncomment để dừng lại khi có lỗi

# --- Thực thi quy trình ---

print("Bắt đầu tạo bảng trong BigQuery...")
execute_bigquery_sql(create_tables_bq, client)
print("\nĐã hoàn thành tạo bảng.")

print("\nBắt đầu chèn dữ liệu vào BigQuery...")
execute_bigquery_sql(insert_data_bq, client)
print("\nĐã hoàn thành chèn dữ liệu.")

print("\nHoàn thành quá trình mô phỏng dữ liệu và tải lên BigQuery.")

Bắt đầu tạo bảng trong BigQuery...
Đang thực thi: -- BigQuery Standard SQL for creating tables

-- Replace `warehouse_loc` with your actual BigQuery d...
Thành công.
Đang thực thi: -- 2. Create machine_type table
-- Referenced by 'spare_parts'
CREATE TABLE IF NOT EXISTS warehouse_...
Thành công.
Đang thực thi: -- 3. Create machine table
-- References 'group_mc'
CREATE TABLE IF NOT EXISTS warehouse_loc.machine...
Thành công.
Đang thực thi: -- 4. Create employees table
-- Referenced by 'import_export'
CREATE TABLE IF NOT EXISTS warehouse_l...
Thành công.
Đang thực thi: -- 5. Create machine_pos table
-- References 'group_mc', Referenced by 'import_export'
CREATE TABLE ...
Thành công.
Đang thực thi: -- 6. Create spare_parts table
-- References 'machine_type', Referenced by 'import_export'
CREATE TA...
Thành công.
Đang thực thi: -- 7. Create import_export table
-- References 'spare_parts', 'machine_pos', and 'employees'
CREATE ...
Thành công.
Đang thực thi: -- CREATE INDEX is not used in Bi