In [0]:
%pip install -r ../../requirements.txt
dbutils.library.restartPython()

In [0]:
import os
import time
import requests
import logging
from typing import Dict, Any, List, Optional, Tuple
from pathlib import Path
from typing import Tuple
import os
import re
import shutil
import logging
from datetime import datetime
import pandas as pd

logging.basicConfig(
    level=logging.INFO,
    format="%(asctime)s %(levelname)s %(message)s"
)
logger = logging.getLogger(__name__)

instance_name = dbutils.notebook.entry_point.getDbutils().notebook().getContext().tags().apply('browserHostName')
DATABRICKS_INSTANCE = f"https://{instance_name}"
TOKEN = dbutils.notebook.entry_point.getDbutils().notebook().getContext().apiToken().get()
CURRENT_PATH = os.getcwd()
NOTEBOOK_PATH = os.path.join(Path(CURRENT_PATH).parent.parent, "notebooks")
NODE_TYPE_ID = "Standard_DS3_v2"
CATALOG_NAME = "dbxmetagen"
SCHEMA_NAME = "metadata_results"

In [0]:
logging.basicConfig(
    level=logging.INFO,
    format="%(asctime)s %(levelname)s %(message)s"
)
logger = logging.getLogger(__name__)

def sanitize_email(email: str) -> str:
    """
    Replace all non-alphanumeric characters in the email with underscores.
    """
    return re.sub(r'[^a-zA-Z0-9]', '_', email)

def get_current_date_str() -> str:
    """
    Get current date as YYYYMMDD (no hyphens).
    """
    return datetime.now().strftime("%Y%m%d")

def list_files_with_extensions(folder: str, extensions: List[str]) -> List[str]:
    """
    List files in a folder with given extensions.
    """
    if not os.path.exists(folder):
        logger.warning(f"Folder does not exist: {folder}")
        return []
    return [
        f for f in os.listdir(folder)
        if os.path.isfile(os.path.join(folder, f)) and any(f.lower().endswith(ext) for ext in extensions)
    ]

def count_rows_in_file(filepath: str) -> Tuple[int, Optional[bool]]:
    """
    Count the number of rows in a file. Returns (row_count, has_header).
    Supports .xlsx, .tsv, .sql.
    """
    ext = os.path.splitext(filepath)[1].lower()
    try:
        if ext == ".xlsx":
            df = pd.read_excel(filepath)
            return len(df), True
        elif ext == ".tsv":
            df = pd.read_csv(filepath, sep='\t')
            return len(df), True
        elif ext == ".sql":
            with open(filepath, 'r', encoding='utf-8') as f:
                lines = f.readlines()
            # Heuristic: .sql files may not have a header
            return len(lines), False
        else:
            logger.warning(f"Unsupported file extension: {ext}")
            return 0, None
    except Exception as e:
        logger.error(f"Failed to count rows in {filepath}: {e}")
        return 0, None

def ensure_empty_folder(folder: str) -> None:
    """
    Ensure the folder exists and is empty.
    """
    os.makedirs(folder, exist_ok=True)
    for f in os.listdir(folder):
        path = os.path.join(folder, f)
        try:
            if os.path.isfile(path):
                os.remove(path)
            elif os.path.isdir(path):
                shutil.rmtree(path)
        except Exception as e:
            logger.error(f"Failed to clean folder {folder}: {e}")

def copy_file(src: str, dst_folder: str) -> None:
    """
    Copy a file to the destination folder.
    """
    try:
        shutil.copy2(src, dst_folder)
        logger.info(f"Copied {src} to {dst_folder}")
    except Exception as e:
        logger.error(f"Failed to copy {src} to {dst_folder}: {e}")

def main(
    base_volume_path: str,
    current_user: str
) -> None:
    """
    Main workflow for file checking and management.
    """
    sanitized_user = sanitize_email(current_user)
    date_str = get_current_date_str()
    user_folder = os.path.join(base_volume_path, sanitized_user)
    date_folder = os.path.join(user_folder, date_str)
    reviewed_outputs_folder = os.path.join(user_folder, "reviewed_outputs")
    run_logs_folder = os.path.join(date_folder, "exportable_run_logs")

    extensions = [".xlsx", ".tsv", ".sql"]
    files = list_files_with_extensions(date_folder, extensions)
    logger.info(f"Found {len(files)} file(s) in {date_folder}: {files}")

    for f in files:
        path = os.path.join(date_folder, f)
        row_count, _ = count_rows_in_file(path)
        logger.info(f"File: {f} | Rows: {row_count}")

    run_log_files = list_files_with_extensions(run_logs_folder, extensions)
    if len(run_log_files) != 1:
        logger.error(f"Expected 1 file in {run_logs_folder}, found {len(run_log_files)}")
        return
    run_log_file = run_log_files[0]
    run_log_path = os.path.join(run_logs_folder, run_log_file)
    row_count, has_header = count_rows_in_file(run_log_path)
    logger.info(f"Run log file: {run_log_file} | Format: {os.path.splitext(run_log_file)[1]} | Rows: {row_count} | Header: {has_header}")

    ensure_empty_folder(reviewed_outputs_folder)

    copy_file(run_log_path, reviewed_outputs_folder)

if __name__ == "__main__":
    current_user = sanitize_email(spark.sql("SELECT current_user()").collect()[0][0])
    base_volume_path = f"/Volumes/{CATALOG_NAME}/{SCHEMA_NAME}/generated_metadata/"
    main(base_volume_path, current_user)
