In [None]:
from pyspark.sql.functions import col, when
from pyspark.sql.types import IntegerType, FloatType, BooleanType, DateType
import datetime

def profile_and_cast_dataframe(df):
    """
    Profiles a PySpark DataFrame with string columns and attempts to infer and cast columns to appropriate data types.
    
    Parameters:
        df (DataFrame): Input PySpark DataFrame with all string columns.
    
    Returns:
        DataFrame: A new DataFrame with columns cast to inferred data types.
    """
    def infer_data_type(value):
        if value is None:
            return None
        try:
            int(value)
            return IntegerType()
        except ValueError:
            pass
        try:
            float(value)
            return FloatType()
        except ValueError:
            pass
        if value.lower() in ['true', 'false']:
            return BooleanType()
        try:
            datetime.datetime.strptime(value, '%Y-%m-%d')
            return DateType()
        except ValueError:
            pass
        return None

    inferred_types = {}
    for column in df.columns:
        sample_values = df.select(column).distinct().limit(100).rdd.flatMap(lambda x: x).collect()
        types = set(filter(None, (infer_data_type(value) for value in sample_values)))
        
        if len(types) == 1:
            inferred_types[column] = types.pop()
        else:
            inferred_types[column] = None

    for column, data_type in inferred_types.items():
        if data_type == IntegerType():
            df = df.withColumn(column, when(col(column).rlike('^-?\\d+$'), col(column).cast(IntegerType())))
        elif data_type == FloatType():
            df = df.withColumn(column, when(col(column).rlike('^-?\\d*\\.\\d+$'), col(column).cast(FloatType())))
        elif data_type == BooleanType():
            df = df.withColumn(column, when(col(column).rlike('^(true|false)$'), col(column).cast(BooleanType())))
        elif data_type == DateType():
            df = df.withColumn(column, when(col(column).rlike('^\\d{4}-\\d{2}-\\d{2}$'), col(column).cast(DateType())))

    return df

In [None]:
from delta.tables import DeltaTable
import os

def list_delta_tables_in_abfs_directory(spark, abfs_directory):
    """
    Lists all Delta tables in a given ABFS directory.

    Parameters:
        spark (SparkSession): The active Spark session.
        abfs_directory (str): The ABFS directory path (e.g., 'abfss://container@account.dfs.core.windows.net/path/').

    Returns:
        List[str]: List of Delta table paths found in the directory.
    """
    # List all subdirectories in the ABFS directory
    files = dbutils.fs.ls(abfs_directory)
    delta_tables = []
    for f in files:
        # Check if _delta_log exists in the subdirectory (Delta table marker)
        delta_log_path = os.path.join(f.path, "_delta_log")
        if dbutils.fs.exists(delta_log_path):
            delta_tables.append(f.path)
    return delta_tables

# Example usage:
# abfs_directory = "abfss://container@account.dfs.core.windows.net/source-directory/"
# delta_tables = list_delta_tables_in_abfs_directory(spark, abfs_directory)
# print(delta_tables)
def get_delta_table_key_columns(spark, table_path):
    """
    Attempts to infer key columns for a Delta table by checking for columns with unique values.
    Returns a list of candidate key columns (could be empty if none found).
    """
    try:
        df = spark.read.format("delta").load(table_path)
        columns = df.columns
        row_count = df.count()
        key_columns = []
        for col_name in columns:
            distinct_count = df.select(col_name).distinct().count()
            if distinct_count == row_count:
                key_columns.append(col_name)
        return key_columns
    except Exception as e:
        return []

# Example usage:
for table_path in delta_tables:
    key_columns = get_delta_table_key_columns(spark, table_path)
    print(f"Delta table: {table_path}")
    if len(key_columns) == 1:
        print(f"  Unique key column: {key_columns[0]}")
    elif key_columns:
        print(f"  Candidate key columns: {key_columns}")
    else:
        print("  No unique key column found.")