In [0]:
def ExtractPostgreSQL(table, mode="full", date_column=None):
    # The connection details, which are stored in secret scope
    pg_user = dbutils.secrets.get(scope="postgres_secrets", key="pg_user")
    pg_password = dbutils.secrets.get(scope="postgres_secrets", key="pg_password")
    pg_host = dbutils.secrets.get(scope="postgres_secrets", key="pg_host")
    pg_port = dbutils.secrets.get(scope="postgres_secrets", key="pg_port")
    pg_database = dbutils.secrets.get(scope="postgres_secrets", key="pg_database")

    jdbc_url = f"jdbc:postgresql://{pg_host}:{pg_port}/{pg_database}"
    
    # Create a volume, since that is where files are stored
    volume_name = table.lower()
    volume_path = f"/Volumes/dev/bronze/{volume_name}"
    
    # Check if the volume does not exist create is, else do nothing
    try:
        spark.sql(f"DESCRIBE VOLUME `dev`.`bronze`.`{volume_name}`")
        print(f"Volume dev.bronze.{volume_name} already exists.")

    except Exception:
        print(f"Volume dev.bronze.{volume_name} does not exist. Creating...")
        spark.sql(f"CREATE VOLUME IF NOT EXISTS `dev`.`bronze`.`{volume_name}`")
        print(f"Volume dev.bronze.{volume_name} created successfully.")
    
    # Build the query based on mode, delta gets the incremental difference compared to the last extracted date
    if mode == "delta":
        if not date_column:
            raise ValueError("date_column parameter is required to determine the delta")
        
        # Check if parquet files exist in the volume to determine if we want to do a delta or full
        try:
            files = dbutils.fs.ls(volume_path)
            parquet_files = [f for f in files if f.path.endswith('.parquet') or f.isDir()]
            
            if len(parquet_files) > 0:
                existing_df = spark.read.parquet(volume_path)
                max_date = existing_df.agg({date_column: "max"}).collect()[0][0]
                print(f"Found existing data. Max {date_column}: {max_date}")
                
                dbtable = f'(SELECT * FROM public."{table}" WHERE "{date_column}" > \'{max_date}\') AS new_data'
                write_mode = "append"

            else:
                print("Volume does not have any data. Performing full load for first run...")
                dbtable = f'public."{table}"'
                write_mode = "append"

        except Exception as e:
            print(f"No existing data found: {e}")
            print("Performing full load for first run...")
            dbtable = f'public."{table}"'
            write_mode = "overwrite"

    else:  
        dbtable = f'public."{table}"'
        write_mode = "overwrite"
        print("Performing full load (overwrite mode)...")

    # Read data from PostgreSQL based on parameters in the earlier code
    df = (
        spark.read
        .format("jdbc")
        .option("url", jdbc_url)
        .option("dbtable", dbtable)
        .option("user", pg_user)
        .option("password", pg_password)
        .option("driver", "org.postgresql.Driver")
        .load()
    )
    
    # Write data to volume
    if df.count() > 0:
        df.write.mode(write_mode).format("parquet").save(volume_path)
        print(f"Data written to {volume_path} in {write_mode} mode. Rows: {df.count()}")
    else:
        print("No new data to write.")

    return df

In [0]:
getdata = ExtractPostgreSQL("Customers", mode="delta", date_column="modified_on")
display(getdata)

In [0]:
# Get all volumes in dev.raw schema
try:
    volumes_df = spark.sql("SHOW VOLUMES IN `dev`.`raw`")
    volumes = [row.volume_name for row in volumes_df.collect()]
    
    if len(volumes) > 0:
        print(f"Found {len(volumes)} volume(s) to delete: {volumes}")
        
        # Drop each volume
        for volume in volumes:
            print(f"Dropping volume dev.raw.{volume}...")
            spark.sql(f"DROP VOLUME IF EXISTS `dev`.`raw`.`{volume}`")
            print(f"Volume dev.raw.{volume} deleted successfully.")
        
        print(f"\nAll volumes in dev.raw have been deleted.")
    else:
        print("No volumes found in dev.raw schema.")
        
except Exception as e:
    print(f"Error: {e}")
    print("Make sure the dev.raw schema exists.")