In [0]:
CATALOG_NAME = dbutils.widgets.get("catalog")
SCHEMA_NAME = dbutils.widgets.get("schema")
table = dbutils.widgets.get("table")

path = f"{CATALOG_NAME}.{SCHEMA_NAME}.{table}"

In [0]:
df = spark.read.table(path)
display(df)

In [0]:
import os
from urllib.parse import urlparse

urls = df.select("url").collect()

for row in urls:
    url = row['url']
    
    try:
        # Extract filename
        parsed_url = urlparse(url)
        filename = os.path.basename(parsed_url.path)
        table_name = filename.replace('.csv', '').replace('-', '_')
        table_path = f"{CATALOG_NAME}.{SCHEMA_NAME}.{table_name}"
        
        # Download to DBFS
        dbfs_path = f"/tmp/{filename}"
        
        try:
            dbutils.fs.cp(url, f"dbfs:{dbfs_path}")
        except Exception as download_error:
            print(f"Skipping {filename}: File not found or download failed - {str(download_error)}")
            continue
        
        # Load CSV
        try:
            df_csv = spark.read.format("csv").option("header", "true").option("inferSchema", "true").load(f"dbfs:{dbfs_path}")
        except Exception as read_error:
            print(f"Skipping {filename}: Failed to read CSV - {str(read_error)}")
            # Clean up the downloaded file if it exists
            try:
                dbutils.fs.rm(f"dbfs:{dbfs_path}")
            except:
                pass
            continue
        
        # Drop table if it exists to avoid schema conflicts
        spark.sql(f"DROP TABLE IF EXISTS {table_path}")
        
        # Create new table
        df_csv.write.format("delta").mode("overwrite").saveAsTable(table_path)
        spark.sql(f"ALTER TABLE {table_path} SET TAGS ('concurso' = 'bcie')")
        
        print(f"Created table: {table_name}")
        
        # Clean up
        try:
            dbutils.fs.rm(f"dbfs:{dbfs_path}")
        except Exception as cleanup_error:
            print(f"Warning: Could not clean up temporary file {dbfs_path} - {str(cleanup_error)}")
            
    except Exception as general_error:
        print(f"Unexpected error processing {url}: {str(general_error)}")
        continue

In [0]:
tablas_bcie = spark.sql(f"""
SELECT
  tt.catalog_name,
  tt.schema_name,
  tt.table_name,
  tt.tag_name,
  tt.tag_value
FROM system.information_schema.table_tags tt
WHERE tt.tag_name = 'concurso'
  AND tt.tag_value = 'bcie'
  -- Optional scoping:
  AND tt.catalog_name = '{CATALOG_NAME}'
  AND tt.schema_name = '{SCHEMA_NAME}'
ORDER BY tt.catalog_name, tt.schema_name, tt.table_name;
""")
display(tablas_bcie)

In [0]:
from pyspark.sql.functions import col, trim, lit
from pyspark.sql import SparkSession
from concurrent.futures import ThreadPoolExecutor, as_completed
import traceback

# Use the driver's SparkSession
spark = SparkSession.builder.getOrCreate()

def get_table_description(table_name, catalog_name, schema_name):
    """
    Get table description from DESCRIBE DETAIL command.
    """
    try:
        detail_result = spark.sql(f"DESCRIBE DETAIL {catalog_name}.{schema_name}.{table_name}")
        # Get the description from the result - it's typically in the 'description' column
        description_row = detail_result.select("description").collect()
        if description_row and description_row[0]["description"]:
            return description_row[0]["description"]
        else:
            return None
    except Exception as e:
        print(f"Warning: Could not get description for {table_name}: {str(e)}")
        return None

def process_single_table(table_name, catalog_name, schema_name):
    """
    Process a single table to extract metadata and description.
    """
    try:
        # Get table description first
        table_description = get_table_description(table_name, catalog_name, schema_name)
        
        # Get column metadata using DESCRIBE TABLE EXTENDED
        metadata = spark.sql(f"DESCRIBE TABLE EXTENDED {catalog_name}.{schema_name}.{table_name}")
        
        # Filter and transform metadata
        table_metadata = metadata.filter(
            col("col_name").isNotNull() &
            (trim(col("col_name")) != "") &
            col("comment").isNotNull() &
            (trim(col("comment")) != "")
        ).select(
            lit(catalog_name).alias("catalog"),
            lit(schema_name).alias("schema"),
            lit(table_name).alias("table_name"),
            lit(table_description).alias("table_description"),  # Add table description
            col("col_name").alias("column_name"),
            col("data_type").alias("data_type"),
            col("comment").alias("comment")
        )
        
        # Collect rows to driver
        rows = table_metadata.collect()
        return {"table": table_name, "rows": rows}
        
    except Exception as e:
        return {"table": table_name, "error": str(e), "trace": traceback.format_exc()}

def process_tables_parallel(table_list, catalog_name, schema_name, max_workers=6):
    all_rows = []
    errors = []
    
    with ThreadPoolExecutor(max_workers=max_workers) as exec:
        futures = {
            exec.submit(process_single_table, tbl, catalog_name, schema_name): tbl
            for tbl in table_list
        }
        for fut in as_completed(futures):
            res = fut.result()
            if "rows" in res:
                all_rows.extend(res["rows"])
                print(f"✓ {res['table']}: {len(res['rows'])} columns")
            else:
                errors.append(res)
                print(f"✗ {res['table']}: {res['error']}")
    
    return all_rows, errors

# Main execution
table_list = [r["table_name"] for r in tablas_bcie.select("table_name").collect()]
rows, errors = process_tables_parallel(table_list, CATALOG_NAME, SCHEMA_NAME)

# Build DataFrame with updated schema to include table description
from pyspark.sql.types import StructType, StructField, StringType
schema = StructType([
    StructField("catalog", StringType()),
    StructField("schema", StringType()),
    StructField("table_name", StringType()),
    StructField("table_description", StringType()),  # New field for table description
    StructField("column_name", StringType()),
    StructField("data_type", StringType()),
    StructField("comment", StringType())
])

final_df = spark.createDataFrame(rows, schema)
final_df.write.mode("overwrite").saveAsTable(f"{CATALOG_NAME}.{SCHEMA_NAME}.table_metadata")
display(final_df)