In [1]:
# Install required packages (run once)
# !pip install psycopg2-binary pandas pyarrow pyiceberg

import psycopg2
import pandas as pd
from datetime import datetime

# PostgreSQL connection parameters
PG_CONFIG = {
    'host': 'localhost',
    'port': 5433,
    'database': 'postgres',
    'user': 'postgres',
    'password': 'password'
}

print("‚úì Packages imported successfully")
print(f"Connecting to PostgreSQL at {PG_CONFIG['host']}:{PG_CONFIG['port']}")

‚úì Packages imported successfully
Connecting to PostgreSQL at localhost:5433


In [8]:
# Connect to PostgreSQL
conn = psycopg2.connect(**PG_CONFIG)
cursor = conn.cursor()

# Get table schema
cursor.execute("""
    SELECT column_name, data_type, character_maximum_length, is_nullable
    FROM information_schema.columns
    WHERE table_name = 'test_cdc' AND table_schema = 'public'
    ORDER BY ordinal_position;
""")

schema_df = pd.DataFrame(
    cursor.fetchall(),
    columns=['Column', 'Data Type', 'Max Length', 'Nullable']
)

print("üìä Source Table Schema:")
print("="*60)
display(schema_df)

# Get row count
cursor.execute("SELECT COUNT(*) FROM test_cdc;")
row_count = cursor.fetchone()[0]
print(f"\nüìà Total rows in source table: {row_count}")

üìä Source Table Schema:


Unnamed: 0,Column,Data Type,Max Length,Nullable
0,id,uuid,,NO
1,data,jsonb,,NO
2,meta,json,,YES
3,created_at,timestamp without time zone,,YES



üìà Total rows in source table: 2


In [9]:
# Query source data
query = """
    SELECT 
        id::text as uuid_id,
        data::text as jsonb_column,
        meta::text as json_column,
        created_at,
        pg_typeof(id) as uuid_type,
        pg_typeof(data) as jsonb_type,
        pg_typeof(meta) as json_type
    FROM test_cdc 
    ORDER BY created_at;
"""

source_df = pd.read_sql_query(query, conn)

print("üìã Source Data Sample:")
print("="*80)
display(source_df)

print("\nüîç Key Observations:")
print(f"  ‚Ä¢ UUID column type: {source_df['uuid_type'].iloc[0]}")
print(f"  ‚Ä¢ JSONB column type: {source_df['jsonb_type'].iloc[0]}")
print(f"  ‚Ä¢ JSON column type: {source_df['json_type'].iloc[0]}")
print(f"  ‚Ä¢ These complex types will be properly handled by OLake's pgtype integration")

# Close connection for now
cursor.close()
conn.close()

üìã Source Data Sample:


Unnamed: 0,uuid_id,jsonb_column,json_column,created_at,uuid_type,jsonb_type,json_type
0,e61b23d7-2bed-4594-8243-9d389511c688,"{""user"": ""alice"", ""action"": ""login""}","{""source"": ""web""}",2026-02-07 12:54:30.368422,uuid,jsonb,json
1,5f199f4c-cc2b-487c-bbb2-612ffa920e17,"{""user"": ""bob"", ""action"": ""purchase""}","{""source"": ""mobile""}",2026-02-07 12:54:30.368422,uuid,jsonb,json



üîç Key Observations:
  ‚Ä¢ UUID column type: uuid
  ‚Ä¢ JSONB column type: jsonb
  ‚Ä¢ JSON column type: json
  ‚Ä¢ These complex types will be properly handled by OLake's pgtype integration


In [19]:
import subprocess
import time

print("üöÄ Starting OLake Sync Process...")
print("="*80)

# Build the sync command
sync_command = [
    ".\\drivers\\postgres\\postgres.exe",
    "sync",
    "--config", "demo_source_config.json",
    "--destination", "demo_destination_config.json",
    "--catalog", "demo_catalog.json"
]

try:
    # Run the sync command
    start_time = time.time()
    result = subprocess.run(
        sync_command,
        capture_output=True,
        text=True,
        timeout=120
    )
    
    elapsed_time = time.time() - start_time
    
    # Display output
    if result.stdout:
        print("üìù Sync Output:")
        print(result.stdout)
    
    if result.returncode == 0:
        print(f"\n‚úÖ Sync completed successfully in {elapsed_time:.2f} seconds!")
    else:
        print(f"\n‚ùå Sync failed with exit code {result.returncode}")
        if result.stderr:
            print(f"Error: {result.stderr}")
            
except subprocess.TimeoutExpired:
    print("‚è±Ô∏è Sync command timed out after 120 seconds")
except FileNotFoundError:
    print("‚ö†Ô∏è OLake binary not found. Please ensure postgres.exe is built.")
    print("   Run: cd drivers\\postgres && go build -o postgres.exe .")
except Exception as e:
    print(f"‚ùå Error running sync: {e}")

üöÄ Starting OLake Sync Process...
üìù Sync Output:
[90m2026-02-10T10:21:13Z[0m [32mINFO[0m Running sync with state: {"type":"STREAM"}
[90m2026-02-10T10:21:13Z[0m [32mINFO[0m Standard Replication is selected
[90m2026-02-10T10:21:13Z[0m [32mINFO[0m Starting discover for Postgres database postgres
[90m2026-02-10T10:21:13Z[0m [32mINFO[0m Valid selected streams are public.test_cdc
[90m2026-02-10T10:21:13Z[0m [32mINFO[0m Clearing state for full refresh streams
[90m2026-02-10T10:21:13Z[0m [32mINFO[0m Iceberg JAR file found in base directory: c:\Users\diino\Documents\Contributions\olake/olake-iceberg-java-writer.jar
[90m2026-02-10T10:21:14Z[0m [32mINFO[0m Thread[iceberg_destination_drop:50051] 05:21:14.973 [main] INFO  io.debezium.server.iceberg.OlakeRpcServer - Logs will be output to console only
[90m2026-02-10T10:21:15Z[0m [32mINFO[0m Thread[iceberg_destination_drop:50051] 05:21:15.673 [main] INFO  org.apache.iceberg.CatalogUtil - Loading custom FileIO impl

In [6]:
from pyiceberg.catalog import load_catalog
import warnings
warnings.filterwarnings('ignore')

# Configure PyIceberg to connect to REST catalog
catalog_config = {
    'uri': 'http://localhost:8181',
    'warehouse': 's3://warehouse/',
    's3.endpoint': 'http://localhost:9090',
    's3.access-key-id': 'minio',
    's3.secret-access-key': 'minio123',
    's3.path-style-access': 'true'
}

try:
    # Load the catalog
    catalog = load_catalog('demo_catalog', **catalog_config)
    print("‚úì Connected to Iceberg REST Catalog")
    
    # List namespaces
    namespaces = catalog.list_namespaces()
    print(f"\nüìÅ Available Namespaces: {namespaces}")
    
    # List tables in the postgres namespace (created by OLake)
    expected_namespace = ('postgres_postgres', 'public')
    try:
        tables = catalog.list_tables(expected_namespace)
        print(f"\nüìä Tables in {expected_namespace}: {tables}")
    except Exception as e:
        print(f"‚ö†Ô∏è Note: {e}")
        print("   Listing all tables in all namespaces...")
        for ns in namespaces:
            try:
                tables = catalog.list_tables(ns)
                if tables:
                    print(f"   {ns}: {tables}")
            except:
                pass
                
except Exception as e:
    print(f"‚ùå Error connecting to Iceberg catalog: {e}")
    print("\nüí° Alternative: Use Trino or Spark to query Iceberg tables")
    print("   Trino Web UI: http://localhost:8080")
    print("   SQLPad: http://localhost:3000")

‚úì Connected to Iceberg REST Catalog

üìÅ Available Namespaces: [('postgres_postgres_public',), ('test_olake',)]
‚ö†Ô∏è Note: NoSuchNamespaceException: Namespace does not exist: postgres_postgres.public
   Listing all tables in all namespaces...
   ('postgres_postgres_public',): [('postgres_postgres_public', 'test_cdc')]
   ('test_olake',): [('test_olake', 'test_olake')]


In [7]:
# Query the synced table
table_id = ('postgres_postgres_public', 'test_cdc')
table = catalog.load_table(table_id)

print(f"‚úÖ Successfully loaded table: {table_id}")
print(f"   Location: {table.location()}")
print(f"   Schema: {table.schema()}\n")

# Query the data
scan = table.scan()
df_iceberg = scan.to_pandas()

print(f"üìä Retrieved {len(df_iceberg)} rows from Iceberg\n")
print("="*80)
display(df_iceberg)

# Store for comparison
globals()['df_iceberg'] = df_iceberg

‚úÖ Successfully loaded table: ('postgres_postgres_public', 'test_cdc')
   Location: s3://warehouse/postgres_postgres_public/test_cdc
   Schema: table {
  1: created_at: optional timestamptz
  2: id: optional string
  3: _olake_timestamp: optional timestamptz
  4: data: optional string
  5: _op_type: optional string
  6: _olake_id: required string
  7: meta: optional string
}

üìä Retrieved 2 rows from Iceberg



Unnamed: 0,created_at,id,_olake_timestamp,data,_op_type,_olake_id,meta
0,2026-02-07 12:54:30.368422+00:00,e61b23d7-2bed-4594-8243-9d389511c688,2026-02-10 10:23:41.550269+00:00,"{""user"": ""alice"", ""action"": ""login""}",r,e61b23d7-2bed-4594-8243-9d389511c688,"{""source"": ""web""}"
1,2026-02-07 12:54:30.368422+00:00,5f199f4c-cc2b-487c-bbb2-612ffa920e17,2026-02-10 10:23:41.550269+00:00,"{""user"": ""bob"", ""action"": ""purchase""}",r,5f199f4c-cc2b-487c-bbb2-612ffa920e17,"{""source"": ""mobile""}"


In [11]:
# Reconnect to PostgreSQL to get fresh data
conn = psycopg2.connect(**PG_CONFIG)
source_df_compare = pd.read_sql_query(
    "SELECT id::text, data::text, meta::text, created_at FROM test_cdc ORDER BY created_at",
    conn
)
conn.close()

print("üîç Data Integrity Verification")
print("="*80)

# Compare row counts
source_count = len(source_df_compare)
try:
    dest_count = len(df_iceberg)
    print(f"\nüìä Row Count Comparison:")
    print(f"   Source (PostgreSQL):  {source_count} rows")
    print(f"   Destination (Iceberg): {dest_count} rows")
    
    if source_count == dest_count:
        print("   ‚úÖ Row counts match!")
    else:
        print("   ‚ö†Ô∏è Row count mismatch detected")
    
    # Compare schemas
    print(f"\nüìã Column Comparison:")
    print(f"   Source columns: {list(source_df_compare.columns)}")
    print(f"   Destination columns: {list(df_iceberg.columns)}")
    
    # Sample data comparison
    print(f"\nüî¨ Sample Data Verification:")
    print("\nSource (PostgreSQL) - First Row:")
    display(source_df_compare.head(1))
    print("\nDestination (Iceberg) - First Row:")
    display(df_iceberg.head(1))
    
except NameError:
    print("‚ö†Ô∏è Iceberg data not available for comparison")
    print(f"   Source contains {source_count} rows")

print("\n" + "="*80)
print("Complete!")

üîç Data Integrity Verification

üìä Row Count Comparison:
   Source (PostgreSQL):  2 rows
   Destination (Iceberg): 2 rows
   ‚úÖ Row counts match!

üìã Column Comparison:
   Source columns: ['id', 'data', 'meta', 'created_at']
   Destination columns: ['created_at', 'id', '_olake_timestamp', 'data', '_op_type', '_olake_id', 'meta']

üî¨ Sample Data Verification:

Source (PostgreSQL) - First Row:


Unnamed: 0,id,data,meta,created_at
0,e61b23d7-2bed-4594-8243-9d389511c688,"{""user"": ""alice"", ""action"": ""login""}","{""source"": ""web""}",2026-02-07 12:54:30.368422



Destination (Iceberg) - First Row:


Unnamed: 0,created_at,id,_olake_timestamp,data,_op_type,_olake_id,meta
0,2026-02-07 12:54:30.368422+00:00,e61b23d7-2bed-4594-8243-9d389511c688,2026-02-10 10:23:41.550269+00:00,"{""user"": ""alice"", ""action"": ""login""}",r,e61b23d7-2bed-4594-8243-9d389511c688,"{""source"": ""web""}"



Complete!
