In [None]:
# sensitive variables
postgres_container_database_name = 'mydb'

minio_ip = "172.18.0.4"
# docker inspect minio | grep IPAddress
# run above command to get ip at minio
data_bucket_name_in_minio = "mybucket"


# Uploading all tables from a Folder to a Database

In [None]:
# Copy the CSV to the containers
# docker cp tables spark:/workspace/tables
tables_folder_spark = '/workspace/tables'
# docker cp tables postgres:/tables
tables_folder_postgres = '/tables'

# check csv folder properly imported to postgres
import os
tables_folder = tables_folder_spark
files = os.listdir(tables_folder)
print(files)

In [None]:
!pip install psycopg2-binary
import psycopg2
import pandas as pd

# ----------------------------------Establishing a connection to the db--------------------------------
# Set PostgreSQL connection parameters
db_config = {
    'host': 'postgres',         
    'port': 5432,               # Default PostgreSQL port
    'dbname': postgres_container_database_name,     
    'user': 'myuser',           
    'password': 'mypassword'    
}

# Connect to PostgreSQL
try:
    conn = psycopg2.connect(**db_config)
    print("✅ Connection successful!")

except Exception as e:
    print("connection failed ❌ Error:", e)

In [None]:
def delete_all_tables(conn):
    if 'conn' in globals() and conn is not None:
        confirm = input("⚠️ WARNING: This will delete ALL tables!")
        if confirm == "Y":
            try:
                # Get all tables
                cursor = conn.cursor()
                cursor.execute("""
                    SELECT table_name
                    FROM information_schema.tables
                    WHERE table_schema = 'public'
                    AND table_type = 'BASE TABLE';
                """)
                tables = cursor.fetchall()
                
                if not tables:
                    print("No tables to delete.")
                else:
                    # Disable foreign key checks (if needed)
                    cursor.execute("SET session_replication_role = 'replica';")
                    
                    # Drop all tables
                    for table in tables:
                        table_name = table[0]
                        cursor.execute(f"DROP TABLE IF EXISTS {table_name} CASCADE;")
                        print(f"Dropped table: {table_name}")
                    
                    # Re-enable foreign key checks
                    cursor.execute("SET session_replication_role = 'origin';")
                    
                    print(f"\n✅ Successfully dropped {len(tables)} tables.")
                    
            except Exception as e:
                print("❌ Error deleting tables:", e)
            finally:
                cursor.close()
        else:
            print("Table deletion cancelled.")
    else:
        print("⚠️ No active database connection. Please run the connection code first.")
def list_database_tables(conn, schema='public'):
    """
    List all tables in the specified database schema.
    
    Parameters:
    - conn: Database connection object
    - schema: Schema name (defaults to 'public')
    
    Returns:
    - Pandas DataFrame containing table names, or None if no tables found
    """
    query = f"""
        SELECT table_name 
        FROM information_schema.tables 
        WHERE table_schema = '{schema}' 
        AND table_type = 'BASE TABLE';
    """
    
    df = pd.read_sql(query, conn)
    
    if df.empty:
        print("No tables found in the specified schema.")
        return None
    else:
        return df
# delete_all_tables(conn)
list_database_tables(conn)

In [None]:
# ---------------------Uploading all tables from a folder--------------------------------
# function to Infer SQL data types
def infer_sql_type(series):
    if pd.api.types.is_integer_dtype(series):
        return "INTEGER"
    elif pd.api.types.is_float_dtype(series):
        return "FLOAT"
    elif pd.api.types.is_bool_dtype(series):
        return "BOOLEAN"
    elif pd.api.types.is_datetime64_any_dtype(series):
        return "TIMESTAMP"
    else:
        return "TEXT"
# function to Create query to table
def generate_create_table_sql(csv_path_generate_create_table_sql, table_name_generate_create_table_sql=None):
    df = pd.read_csv(csv_path_generate_create_table_sql)
    if table_name_generate_create_table_sql is None:
        table_name_generate_create_table_sql = os.path.splitext(os.path.basename(csv_path_generate_create_table_sql))[0].replace(" ", "_")

    create_stmt = f"CREATE TABLE IF NOT EXISTS {table_name_generate_create_table_sql} (\n"
    columns = []

    for col in df.columns:
        col_name = col.strip().replace(" ", "_")
        col_type = infer_sql_type(df[col])
        constraint = "NOT NULL" if not df[col].isnull().any() else ""
        columns.append(f"  {col_name} {col_type} {constraint}".strip())

    create_stmt += ",\n".join(columns) + "\n);"
    return create_stmt, table_name_generate_create_table_sql
# function to create table from csv and copy data
def create_table_and_copy_data(csv_path, csv_name):
    create_sql, table_name = generate_create_table_sql(csv_path)
    # print('create statement:', create_sql)
    print('table name:', table_name)
    

    try:
        cur = conn.cursor()

        cur.execute(create_sql)
        conn.commit()
        print(f"✅ Table `{table_name}` created.")

        file_path_create_table_and_copy_data = folder_path_postgres + '/' + csv_name
        copy_sql = f"""
        COPY {table_name}
        FROM '{file_path_create_table_and_copy_data}'
        DELIMITER ','
        CSV HEADER;
        """
        cur.execute(copy_sql)
        conn.commit()
        print(f"📥 Data copied from `{csv_path}` to `{table_name}`.")

        # Preview data
        cur.execute(f"SELECT * FROM {table_name} LIMIT 2;")
        rows = cur.fetchall()
        for row in rows:
            print(row)

    except Exception as e:
        print("❌ Error:", e)
    finally:
        if 'conn' in locals():
            conn.close()
            
# the above code only assigns data types to table columns, you may also want to assign integrity counstraints yourself

In [None]:
# Run the pipeline for all CSV files in the directory
folder_path_postgres = tables_folder_postgres
for filename in os.listdir(tables_folder):
    if filename.endswith(".csv"):
        csv_file_path = os.path.join(tables_folder, filename)
        print(f"\nProcessing file: {filename}")
        
        try:
            # Extract table name from filename (removes .csv extension)
            table_name = os.path.splitext(filename)[0]
            create_table_and_copy_data(csv_file_path, filename)
            print(f"✅ Successfully processed {filename}")
        except Exception as e:
            print(f"❌ Error processing {filename}: {str(e)}")
conn.rollback()  # Reset the transaction state

# check if tables are created properly
list_database_tables(conn)

In [17]:
conn.close()

# Uploading a running Database to Minio

In [19]:
import pyspark
from pyspark.sql import SparkSession

In [23]:
postgres_database_name = postgres_container_database_name

In [24]:
CATALOG_URI = "http://nessie:19120/api/v1" ## Nessie Server URI
WAREHOUSE = "s3://" + data_bucket_name_in_minio +"/" ## S3 Address to Write to
STORAGE_URI = "http://"+ minio_ip +":9000"

In [None]:
conf = (
    pyspark.SparkConf()
        .setAppName('app_name')
          #packages
        .set('spark.jars.packages', 'org.postgresql:postgresql:42.7.3,org.apache.iceberg:iceberg-spark-runtime-3.5_2.12:1.5.0,org.projectnessie.nessie-integrations:nessie-spark-extensions-3.5_2.12:0.77.1,software.amazon.awssdk:bundle:2.24.8,software.amazon.awssdk:url-connection-client:2.24.8')
          #SQL Extensions
        .set('spark.sql.extensions', 'org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions,org.projectnessie.spark.extensions.NessieSparkSessionExtensions')
          #Configuring Catalog
        .set('spark.sql.catalog.nessie', 'org.apache.iceberg.spark.SparkCatalog')
        .set('spark.sql.catalog.nessie.uri', CATALOG_URI)
        .set('spark.sql.catalog.nessie.ref', 'main')
        .set('spark.sql.catalog.nessie.authentication.type', 'NONE')
        .set('spark.sql.catalog.nessie.catalog-impl', 'org.apache.iceberg.nessie.NessieCatalog')
        .set('spark.sql.catalog.nessie.s3.endpoint', STORAGE_URI)
        .set('spark.sql.catalog.nessie.warehouse', WAREHOUSE)
        .set('spark.sql.catalog.nessie.io-impl', 'org.apache.iceberg.aws.s3.S3FileIO')
)

## Start Spark Session
spark = SparkSession.builder.config(conf=conf).getOrCreate()
print("Spark Running")

In [None]:
# Define the JDBC URL for the Postgres database
# JDBC => Java Database Connectivity URL => a kind of protocol
jdbc_url = "jdbc:postgresql://postgres:5432/" + postgres_database_name
properties = {
    "user": "myuser",
    "password": "mypassword",
    "driver": "org.postgresql.Driver"
}

# Get list of table names from Postgres using Spark
tables_df = spark.read.jdbc(
    url=jdbc_url,
    table="(SELECT table_name FROM information_schema.tables WHERE table_schema='public' AND table_type='BASE TABLE') AS table_list",
    properties=properties
)
table_names = [row['table_name'] for row in tables_df.collect()]
print(table_names)

In [None]:
for table_name in table_names:
    print(f"Processing table: {table_name}")
    
    # Load table from Postgres
    postgres_df = spark.read.jdbc(url=jdbc_url, table=table_name, properties=properties)
    
    # Write to Iceberg via Nessie
    postgres_df.writeTo(f"nessie.{table_name}").createOrReplace()
    
    # Show contents of Iceberg table
    print(f"Contents of table: {table_name}")
    spark.read.table(f"nessie.{table_name}").show()


In [33]:
spark.stop()