In [0]:
# 01_table_manager.py
from pyspark.sql import SparkSession
from typing import List
import re

class InvalidNamespaceError(ValueError):
    pass

class TableManager:
    """
    Manage Unity Catalog catalog/schema/table objects using Spark SQL.

    Usage:
        tm = TableManager(spark, catalog="demo_catalog", schema="demo_schema")
        tm.create_managed_table("demo_table", "id INT, name STRING")
        tm.list_tables()
    """

    DOT_ALLOWED = re.compile(r"^[A-Za-z0-9_]+$")  # simple allowed-name check

    def __init__(self, spark: SparkSession, catalog: str, schema: str):
        self.spark = spark
        self.catalog = catalog.strip()
        self.schema = schema.strip()
        self._validate_namespace()
        return(self.catalog,self.schema)

    def _validate_namespace(self):
        # Ensure neither catalog nor schema is empty and they do not contain dots
        if not self.catalog or not self.schema:
            raise InvalidNamespaceError("Catalog and schema must be non-empty.")
        if "." in self.catalog or "." in self.schema:
            raise InvalidNamespaceError("Catalog and schema must not contain dots. Use catalog and schema separately.")
        # optional: check allowed characters
        if not self.DOT_ALLOWED.match(self.catalog) or not self.DOT_ALLOWED.match(self.schema):
            raise InvalidNamespaceError("Catalog/schema names contain invalid characters. Use alphanumeric and underscore only.")

    @property
    def full_schema(self) -> str:
        return f"{self.catalog}.{self.schema}"

    def use_schema(self):
        """Switch current session to the catalog and schema."""
        # Use uppercase SQL keywords to be clear
        self.spark.sql(f"USE CATALOG {self.catalog}")
        self.spark.sql(f"USE SCHEMA {self.catalog}.{self.schema}")

    def create_managed_table(self, table_name: str, columns_sql: str):
        """
        Create a managed Delta table in Unity Catalog.
        table_name: single identifier (no dots)
        columns_sql: e.g. "id INT, name STRING, amount DOUBLE"
        """
        if "." in table_name:
            raise InvalidNamespaceError("table_name must be a single identifier (no dots).")
        self.use_schema()
        create_sql = f"CREATE TABLE IF NOT EXISTS demo_catalog.demo_schema.{table_name} ({columns_sql}) USING DELTA"
        self.spark.sql(create_sql)

    def drop_table(self, table_name: str):
        if "." in table_name:
            raise InvalidNamespaceError("table_name must be a single identifier (no dots).")
        self.use_schema()
        self.spark.sql(f"DROP TABLE IF EXISTS {table_name}")

    def list_tables(self) -> List[str]:
        """
        Return a list of table names in the configured catalog.schema.

        Implementation notes:
          - Uses SHOW TABLES IN <catalog>.<schema> then extracts tableName column.
          - Avoids nested/empty namespace mistakes by using validated full_schema.
        """
        self.use_schema()
        df = self.spark.sql(f"SHOW TABLES IN {self.full_schema}")
        # df has columns: database, tableName, isTemporary
        if "tableName" in df.columns:
            return [row["tableName"] for row in df.collect()]
        # fallback: try to extract 'name'/'table' fields
        return [row[0] for row in df.collect()]

    def describe_table(self, table_name: str):
        if "." in table_name:
            raise InvalidNamespaceError("table_name must be a single identifier (no dots).")
        self.use_schema()
        return self.spark.sql(f"DESCRIBE TABLE {table_name}").collect()

    def run_query(self, sql: str):
        """Run an arbitrary SQL statement in this schema (switches to schema first)."""
        self.use_schema()
        return self.spark.sql(sql)
