###✅ S3 이벤트 발생 : .fin 파일 Write to S3 

####✔ Workflow 이벤트 실행 : 플래그 파일 적재   

In [0]:
# ✔ 1) 플래그 파일 S3 경로 
output_path = "s3://act-dip-stg-dbrx-aws-ldz-bkt/HB1/HB1_INVOICE/dt=202505/database=MZ/payer=000126249136/event.fin"

# ✔ 2) Databricks dbutils 이용 플래그 파일 생성
dbutils.fs.put(output_path, "", overwrite=True)

print(f"✔ 플래그 파일 생성 완료: {output_path}")

####✔ ASP 이벤트 호출 테스트 

In [0]:
# ✔ 1) .parquet 파일 S3 경로 
output_path = "s3://act-dip-stg-dbrx-aws-ldz-bkt/ASP/HB1_INVOICE/dt=202505/database=MZ/event.parquet"

# ✔ 2) Databricks dbutils 이용 플래그 파일 생성
dbutils.fs.put(output_path, "", overwrite=True)

print(f"✔ 플래그 파일 생성 완료: {output_path}")


###✅ 수행결과 확인

####✔ 워크플로우 실행 로그 확인

In [0]:
%sql 

   SELECT * 
     FROM act_dip_stg_dbrx_wks.sbadm.tbb_etl_task_log
    WHERE job_nm = 'wf_hb1_tbs_aws_hb1_ym_inv_sm_d'
    and    inst_ddtm >= '2025-06-09 12:00:00.000'
 ORDER BY srt_ddtm DESC 


####✔ 워크플로우 실행 로그 테이블 초기화 

In [0]:
%sql 
DELETE FROM act_dip_stg_dbrx_wks.sbadm.tbb_etl_task_log

###✅ 데이터 검증

####✔ 랜딩존 (prd, stg) parquet 파일 확인 


In [0]:
prd_path_inv = "s3://act-dip-prd-dbrx-aws-ldz-bkt/HB1/HB1_INVOICE/dt=202505"
stg_path_inv = "s3://act-dip-stg-dbrx-aws-ldz-bkt/HB1/HB1_INVOICE/dt=202505"

prd_df_inv = spark.read.option("pathGlobFilter", "*.parquet").parquet(prd_path_inv)
stg_df_inv = spark.read.option("pathGlobFilter", "*.parquet").parquet(stg_path_inv)

# 임시 뷰 등록
prd_df_inv.createOrReplaceTempView("prd_inv_data")
stg_df_inv.createOrReplaceTempView("stg_inv_data")

# Invoice 
total = spark.sql("""
  SELECT 'production landing zone s3 invoice', COUNT(*), SUM(all_unblendedcost) AS usageamount
  FROM prd_inv_data
  
  UNION

  SELECT 'staging landing zone s3 invoice', COUNT(*), SUM(all_unblendedcost) AS usageamount
  FROM stg_inv_data
""")

# 결과 출력
display(total) 

####✔ prd, stg 영역 Bronze, Silver 영역 데이터 확인  

In [0]:
%sql 

select 'stg invoice bronze', COUNT(*), sum(all_unblendedcost)
from act_dip_stg_dbrx_aws_cat.sbaws.tbb_aws_hb1_ym_inv_sm  where dt = '202505' 

union 

select 'stg invoice silver', COUNT(*), sum(all_unblendedcost)
from act_dip_stg_dbrx_aws_cat.ssaws.tbs_aws_hb1_ym_inv_sm  where dt = '202505' 

union 

select 'prd invoice bronze', COUNT(*), sum(all_unblendedcost)
from act_dip_prd_dbrx_aws_cat.sbaws.tbb_aws_hb1_ym_inv_sm  where dt = '202505' 

union 

select 'prd invoice silver', COUNT(*), sum(all_unblendedcost)
from act_dip_prd_dbrx_aws_cat.ssaws.tbs_aws_hb1_ym_inv_sm  where dt = '202505' 

####✔ ASP 영역 전달 parquet 파일 확인 

In [0]:
# 결과 파일 경로 
prd_path_mz     = "s3://act-dip-prd-dbrx-aws-ldz-bkt/ASP/HB1_INVOICE/dt=202505/database=MZ"
prd_path_nexon  = "s3://act-dip-prd-dbrx-aws-ldz-bkt/ASP/HB1_INVOICE/dt=202505/database=NEXON"
stg_path_mz     = "s3://act-dip-stg-dbrx-aws-ldz-bkt/ASP/HB1_INVOICE/dt=202505/database=MZ"
stg_path_nexon  = "s3://act-dip-stg-dbrx-aws-ldz-bkt/ASP/HB1_INVOICE/dt=202505/database=NEXON"
stg_mz_bad_file    = stg_path_mz    + "/event.parquet"
stg_nexon_bad_file = stg_path_nexon + "/event.parquet"
prd_mz_bad_file    = prd_path_mz    + "/event.parquet"
prd_nexon_bad_file = prd_path_nexon + "/event.parquet"

# 플래그 파일 삭제 후 적재 parquet파일 값 확인 ( recurse=True : 디렉터리/파일 상관없이 강제 삭제) 
dbutils.fs.rm(stg_mz_bad_file   , recurse=True)
dbutils.fs.rm(stg_nexon_bad_file, recurse=True)
dbutils.fs.rm(prd_mz_bad_file   , recurse=True)
dbutils.fs.rm(prd_nexon_bad_file, recurse=True)

# dataframe 생성 
stg_mz_df_invoice    = spark.read.option("pathGlobFilter", "*.parquet").parquet(stg_path_mz   )
stg_nexon_df_invoice = spark.read.option("pathGlobFilter", "*.parquet").parquet(stg_path_nexon)
prd_mz_df_invoice    = spark.read.option("pathGlobFilter", "*.parquet").parquet(prd_path_mz   )
prd_nexon_df_invoice = spark.read.option("pathGlobFilter", "*.parquet").parquet(prd_path_nexon)

# 임시 뷰 등록
stg_mz_df_invoice.createOrReplaceTempView(   "stg_mz_inv_data"   )
stg_nexon_df_invoice.createOrReplaceTempView("stg_nexon_inv_data")
prd_mz_df_invoice.createOrReplaceTempView(   "prd_mz_inv_data"   )
prd_nexon_df_invoice.createOrReplaceTempView("prd_nexon_inv_data")

# Invoice 
total = spark.sql("""
  SELECT 'stg asp (MZ)    s3 invoice', COUNT(*), SUM(all_unblendedcost) AS usageamount  FROM stg_mz_inv_data 
  UNION ALL 
  SELECT 'stg asp (NEXON) s3 invoice', COUNT(*), SUM(all_unblendedcost) AS usageamount  FROM stg_nexon_inv_data 
  UNION ALL 
  SELECT 'prd asp (MZ)    s3 invoice', COUNT(*), SUM(all_unblendedcost) AS usageamount  FROM prd_mz_inv_data 
  UNION ALL 
  SELECT 'prd asp (NEXON) s3 invoice', COUNT(*), SUM(all_unblendedcost) AS usageamount  FROM prd_nexon_inv_data 
""")

# 결과 출력
display(total) 

###🚧 테스트 데이터 이관

####✔ Invoice 데이터 이관

In [0]:
# 원본 및 대상 S3 경로
prd_path = "s3://act-dip-prd-dbrx-aws-ldz-bkt/HB1/HB1_INVOICE/dt=202505"
stg_path = "s3://act-dip-stg-dbrx-aws-ldz-bkt/HB1/HB1_INVOICE/dt=202505"


def copy_s3_folder(src, dst):
    files = dbutils.fs.ls(src)
    for f in files:
        if f.isDir():
            # 재귀적으로 하위 폴더 복사
            new_dst = f"{dst}/{f.name}"
            copy_s3_folder(f.path, new_dst)
        else:
            # 파일 직접 복사
            dbutils.fs.cp(f.path, f"{dst}/{f.name}")

# 기존 stg 경로 삭제(옵션)
dbutils.fs.rm(stg_path, True)

print(f"Copying files from {prd_path} to {stg_path} ...")
copy_s3_folder(prd_path, stg_path)
print("✅ 1:1 파일 복사 완료")


####✔ 이관 Invoice 데이터 비교 

In [0]:

import re

databases = ['MZ', 'NEXON']                                      # 파티션(데이터베이스) 목록
dt        = '202505'                                             # 수행 월 
result    = []                                                   # 결과 저장 리스트
base_path = "s3://act-dip-stg-dbrx-aws-ldz-bkt/HB1/HB1_INVOICE"  # S3 경로

for db in databases:

    payer_root    = f"{base_path}/dt={dt}/database={db}/"
    payer_folders = [f.path for f in dbutils.fs.ls(payer_root) if f.isDir()]

    for payer_folder in payer_folders:

        match         = re.search(r'payer-([0-9]+)', payer_folder)
        payer_id      = match.group(1) if match else payer_folder.rstrip('/').split('/')[-1]          # payer_id 추출
        parquet_files = [f.path for f in dbutils.fs.ls(payer_folder) if f.path.endswith('.parquet')]  # payer 폴더 내 Parquet 파일 나열

        for file_path in parquet_files:
            try:
                df        = spark.read.parquet(file_path)
                row_count = df.count()
            except Exception as e:
                row_count = 0  
            result.append((db, payer_id, file_path, row_count))

# 결과 DataFrame
schema    = ["database", "payer_id", "file_path", "row_count"]
result_df = spark.createDataFrame(result, schema=schema)

result_df.display()

###🚧 파일 위치 확인 