In [None]:
# Generate & write yml to a file
import pandas
session = get_active_session()
def generate_and_write_source_yml(schema_name, catalog, name=None):
    """
    Generate a source YAML configuration.
    
    Args:
        schema_name (str): The schema name
        catalog (str): The catalog/database name
        name (str, optional): Source name. Defaults to schema_name if not provided.
    
    Returns:
        str: Generated YAML configuration as a string
    """
    # Define name and tables
    name = name or schema_name
    get_tables_query = f"""
    SELECT table_name FROM {catalog}.INFORMATION_SCHEMA.TABLES WHERE table_schema = '{schema}'
    """
    tables = session.sql(get_tables_query).to_pandas()
    
    # Build starting yml
    sources_yaml = [
        'version: 2\n',
        'sources:',
        f'  - name: {name.lower()}',
        f'    schema: {schema_name.lower()}',
        f'    database: {catalog.lower()}',
        '    tables:'
    ]
    
    # Add tables and their columns
    for index, table in tables.iterrows():
        sources_yaml.append(f'      - name: {table.iloc[0].lower()}')
        sources_yaml.append('        columns:')

        get_columns_query = f"""
        SELECT column_name, data_type, ordinal_position FROM {catalog}.INFORMATION_SCHEMA.COLUMNS 
        WHERE table_schema = '{schema}'
        AND table_name = '{table.iloc[0]}'
        order by ordinal_position
        """
        columns = session.sql(get_columns_query).to_pandas()
    
        for index, column in columns.iterrows():
            sources_yaml.append(f'          - name: {column.iloc[0].lower()}')
            sources_yaml.append(f'          - name: {column.iloc[1].lower()}\n')
    
    # Join the list into a single string
    yaml_output = '\n'.join(sources_yaml)
    yaml_path = f"{schema_name}/_{schema_name}_sources.yml"

    # write_to_file(yaml_output, yaml_path)
    print(yaml_output)
    return (f"YAML file written to: models/staging/{yaml_path}")

In [None]:
# Generate & write sql to a file
import pandas
session = get_active_session()
def generate_and_write_staging_sql(schema_name, catalog, name=None, quote_columns=False):
    """
    Generate a staging SQL transformation for a table.
    
    Args:
        schema_name (str): The schema name
        catalog (str): The catalog/database name
        name (str, optional): Source name. Defaults to schema_name if not provided.
    
    Returns:
        str: Generated SQL transformation as a string
    """
    # Define name and tables
    name = name or schema_name
    get_tables_query = f"""
    SELECT table_name FROM {catalog}.INFORMATION_SCHEMA.TABLES WHERE table_schema = '{schema_name}'
    """
    tables = session.sql(get_tables_query).to_pandas()

    # Add tables and their columns
    for index, table in tables.iterrows():
            # Build the SQL transformation
            staging_sql = [
                f"with\n",
                "source as (\n",
                f"    select * from {{{{ source('{name}', '{table.iloc[0].lower()}') }}}}\n",
                "),\n",
                "renamed as (\n",
                "    select"
            ]
        
            # Add columns with lowercase naming
            get_columns_query = f"""
            SELECT column_name, ordinal_position FROM {catalog}.INFORMATION_SCHEMA.COLUMNS 
            WHERE table_schema = '{schema_name}'
            AND table_name = '{table.iloc[0]}'
            order by ordinal_position
            """
            columns = session.sql(get_columns_query).to_pandas()

            for index, column in columns.iterrows():
                if quote_columns == False:
                    column_line = (f"{'        ' if index == 0 else '        , '}"
                                f"{column.iloc[0].lower()}")
                else:
                    column_line = (f"{'        ' if index == 0 else '        , '}"
                                f""" "{column.iloc[0]}" """)

                staging_sql.append(column_line)
        
            # Complete the SQL transformation
            staging_sql.extend([
                "\n    from source",
                "\n)\n",
                "select * from renamed\n"
            ])

            # Join the list into a single string
            sql_output = '\n'.join(staging_sql)
            sql_path = f"{schema_name}/stg_{schema_name}__{table.name}.sql"
        
            # write_to_file(sql_output, sql_path)
            print(sql_output)
            print(f"SQL file written to: models/staging/{sql_path}")

    return ("All sql files written successfully")

In [None]:
# Write to file
def write_to_file(file_content, file_path):
    """
    Write content to a file.
    
    Args:
        file_content (str): content to write
        file_path (str): Path and name of the file
    """
    file_path = f"../../models/staging/{file_path}"

    # Ensure the directory exists
    os.makedirs(os.path.dirname(file_path), exist_ok=True)

    # Write to file
    with open(file_path, 'w') as f:
        f.write(file_content)

In [None]:
generate_and_write_staging_sql('DBT_SAMHARTING','DEVELOPMENT', quote_columns=True)