<a href="https://colab.research.google.com/github/HieuNguyenPhi/ADJ_JOBS/blob/main/notebooks/ADJUST_JOB.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
import os
from azure.storage.blob import BlobServiceClient

account_name = os.getenv('ACCOUNT_NAME')
account_key = os.getenv('ACCOUNT_KEY')
# Replace with your Azure Storage account name and SAS token or connection string
connect_str = f"DefaultEndpointsProtocol=https;AccountName={account_name};AccountKey={account_key};EndpointSuffix=core.windows.net"
blob_service_client = BlobServiceClient.from_connection_string(connect_str)
container_list = blob_service_client.list_containers()
container_name = "adjuststbuatprocessed" #os.getenv('CONTAINER_NAME')
container_client = blob_service_client.get_container_client(container_name)
already_processed = [file.name.split('/')[1].split('.')[0] for file in container_client.list_blobs() if file.name.split('/')[0] == 'output']
already_processed[-5:]

In [None]:
from datetime import date
import pandas as pd
today = date.today().strftime('%Y-%m-%d')
need_process = pd.date_range(start=already_processed[-2], end=today).strftime('%Y-%m-%d').to_list()
need_process

In [None]:
container_name_uat = "adjuststbuat"
container_client_uat = blob_service_client.get_container_client(container_name_uat)
from collections import defaultdict
files = [i.name for i in container_client_uat.list_blobs()]
groups = defaultdict(list)
for f in files:
    dt = f.split('_')[1]
    groups[dt].append(f)
groups[dt]

In [None]:
from pathlib import Path
root = Path.cwd()
process_path = f'{root}/process/adjust_uat'
from tqdm import tqdm
import polars as pl

os.makedirs(process_path, exist_ok=True)

storage_options = {
    "account_name": account_name,
    "account_key":  account_key,
}

for ts, files in tqdm(groups.items()):
    dt = ts[:10]                       # "2025-06-25" -> partition dt=...
    if dt not in need_process:
        continue
    # break
    partition_dir = os.path.join(process_path, f"dt={dt}")
    os.makedirs(partition_dir, exist_ok=True)
    out_file = os.path.join(partition_dir, f"{ts}.parquet")

    dfs = []
    for f in files:
        df = (pl.scan_csv(f"az://adjuststbuat/{f}",                             # eager
                          storage_options=storage_options,
                          has_header=True,
                          null_values=["", "NULL"])       # rỗng → null
                .select(pl.all().cast(pl.Utf8)))          # tất cả cột → string
        dfs.append(df)

    df_all = pl.concat(dfs, how="diagonal")               # tự thêm null cột thiếu
    df_all.sink_parquet(out_file, compression="snappy")
    print(f'Done dt={dt}/{ts}.parquet')


In [None]:
local_folder_path = f"{root}/process/adjust_uat"

# Replace with the desired folder name in the Azure container
azure_folder_name = "processing"

# Iterate through files in the local folder and upload them
for root, dirs, files in tqdm(os.walk(local_folder_path)):
    for file in files:
        # Construct the full local file path
        local_file_path = os.path.join(root, file)

        # Construct the blob name (path within the Azure container)
        # This preserves the folder structure from the local path
        relative_path = os.path.relpath(local_file_path, local_folder_path)
        blob_name = os.path.join(azure_folder_name, relative_path)
        # print(blob_name)
        # Create a blob client for the current file
        blob_client = container_client.get_blob_client(blob_name)

        print(f"Uploading {local_file_path} to {container_name}/{blob_name}")

#         # Upload the file
        with open(local_file_path, "rb") as data:
            blob_client.upload_blob(data, overwrite=True)

print("\nFolder upload complete.")

In [None]:
for dt_folder in tqdm(glob.glob(os.path.join(process_path, "dt=*"))):
  dt = os.path.basename(dt_folder)[3:]
  df = pl.scan_parquet(
      f"az://adjuststbuatprocessed/live/processing/dt={dt}/*.parquet",
      storage_options=storage_options,
      glob=True,                 # rất quan trọng để expand '*'
  ).select(pl.all().cast(pl.Utf8)).with_columns(pl.lit(dt).alias("dt"))
  df.sink_parquet(
      f"az://adjuststbuatprocessed/live/output/dt={dt}.parquet",
      storage_options=storage_options,
      compression="snappy",
  )
# output_path = f'{root}/output/adjust_uat'
# os.makedirs(output_path, exist_ok=True)
# import glob
# import shutil
# for dt_folder in tqdm(glob.glob(os.path.join(process_path, "dt=*"))):
#     dt = os.path.basename(dt_folder)[3:]                 # "2025-06-25"
#     files_pq = glob.glob(os.path.join(dt_folder, "*T*.parquet"))
#     if not files_pq:
#         continue

#     out_path = os.path.join(output_path, f"{dt}.parquet")

#     # Nếu trước đó lỡ tạo cùng tên dưới dạng DIR → xoá
#     if os.path.isdir(out_path):
#         shutil.rmtree(out_path)

#     # ---------- ❶  Lazy scan tất cả Parquet ----------
#     lfs = [pl.scan_parquet(f) for f in files_pq]          # mỗi file → LazyFrame

#     # ---------- ❷  Concat diagonal + giữ schema linh hoạt ----------
#     lf_day = (
#         pl.concat(lfs, how="diagonal")                    # tự thêm null cột thiếu
#         .select(pl.all().cast(pl.Utf8))                   # đảm bảo mọi cột = string
#         .with_columns(pl.lit(dt).alias("dt"))             # thêm cột partition (tuỳ)
#     )

#     # ---------- ❸  Ghi duy nhất 1 Parquet ----------
#     lf_day.sink_parquet(out_path, compression="snappy")

In [None]:

# local_folder_path = f"{root}/output/adjust_uat"

# # Replace with the desired folder name in the Azure container
# azure_folder_name = "output"

# # Iterate through files in the local folder and upload them
# for root, dirs, files in tqdm(os.walk(local_folder_path)):
#     for file in files:
#         # Construct the full local file path
#         local_file_path = os.path.join(root, file)

#         # Construct the blob name (path within the Azure container)
#         # This preserves the folder structure from the local path
#         relative_path = os.path.relpath(local_file_path, local_folder_path)
#         blob_name = os.path.join(azure_folder_name, relative_path)
#         # print(blob_name)
#         # Create a blob client for the current file
#         blob_client = container_client.get_blob_client(blob_name)

#         print(f"Uploading {local_file_path} to {container_name}/{blob_name}")

# #         # Upload the file
#         with open(local_file_path, "rb") as data:
#             blob_client.upload_blob(data, overwrite=True)

# print("\nFolder upload complete.")

In [None]:
# Replace with the path to the local folder you want to delete
local_folder_paths = [f"{root}/process/adjust_uat",f"{root}/output/adjust_uat"]
# local_folder_path = f"data/process/adjust_live"
for local_folder_path in local_folder_paths:
    if os.path.exists(local_folder_path):
        print(f"Deleting local folder: {local_folder_path}")
        shutil.rmtree(local_folder_path)
        print("Local folder deleted.")
    else:
        print(f"Local folder not found: {local_folder_path}")

# Live

In [None]:
# already_processed = [file.name.split('/')[-1].split('.')[0] for file in container_client.list_blobs() if file.name[:12] == 'live/output/']
# already_processed[-5:]

In [None]:
# need_process = pd.date_range(start=already_processed[-1], end=today).strftime('%Y-%m-%d').to_list()
# need_process

In [None]:
# container_name_uat = "adjuststblive"
# container_client_uat = blob_service_client.get_container_client(container_name_uat)
# from collections import defaultdict
# files = [i.name for i in container_client_uat.list_blobs()]
# groups = defaultdict(list)
# for f in files:
#     dt = f.split('_')[1]
#     groups[dt].append(f)
# groups[dt]

In [None]:
# process_path = f'{root}/process/adjust_live'
# os.makedirs(process_path, exist_ok=True)

# for ts, files in tqdm(groups.items()):
#     # if ts not in need_process:
#     #     continue
#     dt = ts[:10]                       # "2025-06-25" -> partition dt=...
#     # print(ts)
#     # break
#     if dt not in need_process:
#         continue
#     # break
#     partition_dir = os.path.join(process_path, f"dt={dt}")
#     os.makedirs(partition_dir, exist_ok=True)
#     out_file = os.path.join(partition_dir, f"{ts}.parquet")

#     dfs = []
#     for f in files:
#         df = (pl.scan_csv(f"az://adjuststblive/{f}",                             # eager
#                           storage_options=storage_options,
#                           has_header=True,
#                           null_values=["", "NULL"],
#                           ignore_errors=True)       # rỗng → null
#                 .select(pl.all().cast(pl.Utf8)))          # tất cả cột → string
#         dfs.append(df)

#     df_all = pl.concat(dfs, how="diagonal")               # tự thêm null cột thiếu
#     df_all.sink_parquet(out_file, compression="snappy")
#     print(f'Done dt={dt}/{ts}.parquet')

In [None]:
# output_path = f'{root}/output/adjust_live'
# os.makedirs(output_path, exist_ok=True)
# for dt_folder in tqdm(glob.glob(os.path.join(process_path, "dt=*"))):
#     dt = os.path.basename(dt_folder)[3:]                 # "2025-06-25"
#     files_pq = glob.glob(os.path.join(dt_folder, "*T*.parquet"))
#     if not files_pq:
#         continue

#     out_path = os.path.join(output_path, f"{dt}.parquet")

#     # Nếu trước đó lỡ tạo cùng tên dưới dạng DIR → xoá
#     if os.path.isdir(out_path):
#         shutil.rmtree(out_path)

#     # ---------- ❶  Lazy scan tất cả Parquet ----------
#     lfs = [pl.scan_parquet(f) for f in files_pq]          # mỗi file → LazyFrame

#     # ---------- ❷  Concat diagonal + giữ schema linh hoạt ----------
#     lf_day = (
#         pl.concat(lfs, how="diagonal")                    # tự thêm null cột thiếu
#         .select(pl.all().cast(pl.Utf8))                   # đảm bảo mọi cột = string
#         .with_columns(pl.lit(dt).alias("dt"))             # thêm cột partition (tuỳ)
#     )

#     # ---------- ❸  Ghi duy nhất 1 Parquet ----------
#     lf_day.sink_parquet(out_path, compression="snappy")

In [None]:

# local_folder_path = f"{root}/process/adjust_live"

# # Replace with the desired folder name in the Azure container
# azure_folder_name = "live/processing"

# # Iterate through files in the local folder and upload them
# for root, dirs, files in tqdm(os.walk(local_folder_path)):
#     for file in files:
#         # Construct the full local file path
#         local_file_path = os.path.join(root, file)

#         # Construct the blob name (path within the Azure container)
#         # This preserves the folder structure from the local path
#         relative_path = os.path.relpath(local_file_path, local_folder_path)
#         blob_name = os.path.join(azure_folder_name, relative_path)
#         # print(blob_name)
#         # Create a blob client for the current file
#         blob_client = container_client.get_blob_client(blob_name)

#         print(f"Uploading {local_file_path} to {container_name}/{blob_name}")

# #         # Upload the file
#         with open(local_file_path, "rb") as data:
#             blob_client.upload_blob(data, overwrite=True)

# print("\nFolder upload complete.")

In [None]:
# local_folder_path = f"{root}/output/adjust_live"

# # Replace with the desired folder name in the Azure container
# azure_folder_name = "live/output"

# # Iterate through files in the local folder and upload them
# for root, dirs, files in tqdm(os.walk(local_folder_path)):
#     for file in files:
#         # Construct the full local file path
#         local_file_path = os.path.join(root, file)

#         # Construct the blob name (path within the Azure container)
#         # This preserves the folder structure from the local path
#         relative_path = os.path.relpath(local_file_path, local_folder_path)
#         blob_name = os.path.join(azure_folder_name, relative_path)
#         # print(blob_name)
#         # Create a blob client for the current file
#         blob_client = container_client.get_blob_client(blob_name)

#         print(f"Uploading {local_file_path} to {container_name}/{blob_name}")

# #         # Upload the file
#         with open(local_file_path, "rb") as data:
#             blob_client.upload_blob(data, overwrite=True)

# print("\nFolder upload complete.")

In [None]:
# # Replace with the path to the local folder you want to delete
# local_folder_paths = [f"{root}/process/adjust_live",f"{root}/output/adjust_live"]
# # local_folder_path = f"data/process/adjust_live"
# for local_folder_path in local_folder_paths:
#     if os.path.exists(local_folder_path):
#         print(f"Deleting local folder: {local_folder_path}")
#         shutil.rmtree(local_folder_path)
#         print("Local folder deleted.")
#     else:
#         print(f"Local folder not found: {local_folder_path}")