In [0]:
def connect_to_azure():
    """Establish and test connection to Azure SQL."""
    try:
        # Connect to Azure SQL
        spark = SparkSession.builder.getOrCreate()
        dbutils = DBUtils(spark)
        settings = Settings(dbutils)
        logger.info(f"Attempting to connect to Azure SQL")
        
        # Get the connection
        connection = get_connection()
        
        # Test the connection with a simple query
        result = connection.execute_query("SELECT 1 AS test")
        if result and result.count() > 0:
            logger.info("Azure Connection successful")
            return connection
        else:
            logger.error("Connection test failed.")
            return None
    except Exception as e:
        logger.error(f"Error connecting to Azure SQL: {str(e)}")
        import traceback
        logger.error(traceback.format_exc())
        return None

In [0]:
def push_to_azure(connection, resource_name, df, target_table=None, dry_run=False):
    """Push data to Azure SQL using direct SQL."""
    if df is None or df.empty:
        logger.warning(f"No data to push for {resource_name}")
        return 0
    
    table_name = target_table or resource_name
    logger.info(f"Pushing {len(df)} {resource_name} records to Azure SQL")
    
    if dry_run:
        logger.info(f"DRY RUN: Would upsert {len(df)} rows to {table_name} table")
        return len(df)
    else:
        try:
            # Import the necessary functions
            from azure.azure_upsert import process_records_chunk
            from schema.schema_definitions import SCHEMA
            
            # Find the primary key column(s)
            resource_schema = SCHEMA[resource_name]
            key_columns = []
            
            if "fields" in resource_schema:
                for field_name, field_props in resource_schema["fields"].items():
                    if field_props.get("primaryKey", False):
                        key_columns.append(field_name)
            
            if not key_columns:
                raise ValueError(f"No primary key columns found for resource '{resource_name}'")
            
            # Get the connection
            engine = connection
            
            # Convert DataFrame to records
            records = df.to_dict(orient='records')
            
            # Use process_records_chunk directly
            batch_size = 10000  # Adjust as needed
            result = process_records_chunk(records, resource_name, table_name, key_columns, engine, batch_size)
            
            rows_affected = result['upserted']
            logger.info(f"Successfully upserted {rows_affected} rows to {table_name} table")
            return rows_affected
        except Exception as e:
            logger.error(f"Error upserting {resource_name} data: {str(e)}")
            import traceback
            logger.error(traceback.format_exc())
            return 0