In [None]:
from pyiceberg.catalog import load_catalog
from pyiceberg.schema import Schema, NestedField
from pyiceberg.types import StringType, IntegerType, DoubleType
import pyarrow as pa
import pyarrow.parquet as pq
import pandas as pd
import boto3
from io import BytesIO
from pyiceberg.exceptions import NamespaceAlreadyExistsError, TableAlreadyExistsError

# Define the warehouse path and catalog configuration
warehouse_path = "s3://warehouse"
catalog = load_catalog(
    "nessie",
    **{
        "uri": "http://localhost:19120/api/v2",  # Nessie Server URI
        "warehouse": warehouse_path,
        "s3.endpoint": "http://localhost:9000",
        "s3.region": "us-east-1",
        "s3.access-key-id": "admin",
        "s3.secret-access-key": "password",
    },
)

# Check if the namespace (schema) exists, and create it if it doesn't
namespace = "demosaturday"
try:
    catalog.create_namespace(namespace)
    print(f"Namespace '{namespace}' created successfully.")
except NamespaceAlreadyExistsError:
    print(f"Namespace '{namespace}' already exists.")

# Define the schema for the table using NestedField
schema = Schema(
    NestedField(field_id=1, name="id", field_type=IntegerType(), required=True),
    NestedField(field_id=2, name="name", field_type=StringType(), required=True),
    NestedField(field_id=3, name="value", field_type=DoubleType(), required=True)
)

# Check if the table exists, and drop it if it does
table_name = f"{namespace}.messages"
if table_name in catalog.list_tables(namespace):
    catalog.drop_table(table_name)
    print(f"Table '{table_name}' dropped successfully.")

# Create the table
try:
    table = catalog.create_table(table_name, schema=schema)
    print(f"Table '{table_name}' created successfully.")
except TableAlreadyExistsError:
    print(f"Table '{table_name}' already exists and could not be dropped.")

# Create a sample DataFrame
data = {
    "id": [1, 2, 3],
    "name": ["Alice", "Bob", "Charlie"],
    "value": [123.45, 678.90, 234.56]
}
df = pd.DataFrame(data)

# Convert the DataFrame to a Parquet file in memory
parquet_buffer = BytesIO()
df.to_parquet(parquet_buffer, index=False)
parquet_buffer.seek(0)

# Initialize the S3 client
s3_client = boto3.client(
    "s3",
    aws_access_key_id="admin",
    aws_secret_access_key="password",
    endpoint_url="http://localhost:9000",
    region_name="us-east-1"
)

# Upload the Parquet file to MinIO
s3_client.put_object(Bucket="warehouse", Key=f"{namespace}/messages/data.parquet", Body=parquet_buffer.getvalue())
print("Data uploaded to S3 successfully.")

# Verify metadata in Nessie
import requests

nessie_url = "http://localhost:19120/api/v2/trees/main/entries"
response = requests.get(nessie_url)
if response.status_code == 200:
    print("Metadata for 'demosaturday.messages':", response.json())
else:
    print("Failed to retrieve metadata from Nessie:", response.text)

## Interacting with Nessie

In [1]:
# Regular upload to Nessie - no table or schema specified
import requests
import boto3
import pandas as pd
from io import BytesIO

# Define sensitive variables
# NESSIE_URL = "http://localhost:19120/api/v2"  # Nessie Server URL
# MINIO_URL = "http://localhost:9000"  # MinIO Server URL
# MINIO_ACCESS_KEY = "admin"
# MINIO_SECRET_KEY = "password"
NESSIE_URL = "http://192.168.1.100:19120/api/v2"  # Nessie Server URL
MINIO_URL = "http://192.168.1.100:9000"  # MinIO Server URL
MINIO_ACCESS_KEY = "miniouser"
MINIO_SECRET_KEY = "miniopassword"
WAREHOUSE = "s3://warehouse"

# Initialize the S3 client
s3_client = boto3.client(
    "s3",
    aws_access_key_id=MINIO_ACCESS_KEY,
    aws_secret_access_key=MINIO_SECRET_KEY,
    endpoint_url=MINIO_URL,
    region_name="us-east-1"
)

# Create a sample DataFrame
data = {
    "id": [1, 2, 3, 4, 5, 6, 7, 8, 9, 10],
    "first_name": ["John", "Jane", "Alice", "Bob", "Charlie", "David", "Eve", "Frank", "Grace", "Henry"],
    "last_name": ["Doe", "Smith", "Johnson", "Williams", "Brown", "Jones", "Garcia", "Miller", "Lee", "Davis"],
    "age": [28, 34, 22, 45, 30, 25, 32, 29, 27, 38]
}
df = pd.DataFrame(data)

# Convert the DataFrame to a Parquet file in memory
parquet_buffer = BytesIO()
df.to_parquet(parquet_buffer, index=False)
parquet_buffer.seek(0)

# Upload the Parquet file to MinIO
try:
    s3_client.put_object(Bucket="warehouse", Key="demosaturday/people/data.parquet", Body=parquet_buffer.getvalue())
    print("Data uploaded to S3 successfully.")
except Exception as e:
    print(f"Failed to upload data to S3: {e}")

# Verify metadata in Nessie
nessie_url = f"{NESSIE_URL}/trees/main/entries"
response = requests.get(nessie_url)
print(f"Verify metadata response: {response.status_code} - {response.text}")
if response.status_code == 200:
    print("Metadata for 'demosaturday.people':", response.json())
else:
    print("Failed to retrieve metadata from Nessie:", response.text)

Data uploaded to S3 successfully.
Verify metadata response: 200 - {
  "token" : null,
  "entries" : [ ],
  "effectiveReference" : {
    "type" : "BRANCH",
    "name" : "main",
    "hash" : "2e1cfa82b035c26cbbbdae632cea070514eb8b773f616aaeaf668e2f0be8f10d"
  },
  "hasMore" : false
}
Metadata for 'demosaturday.people': {'token': None, 'entries': [], 'effectiveReference': {'type': 'BRANCH', 'name': 'main', 'hash': '2e1cfa82b035c26cbbbdae632cea070514eb8b773f616aaeaf668e2f0be8f10d'}, 'hasMore': False}


In [27]:
# Creating the data in Nessie, including Table
import requests
import boto3
import pandas as pd
from io import BytesIO

# Define sensitive variables
NESSIE_URL = "http://192.168.1.100:19120/api/v2"  # Nessie Server URL
MINIO_URL = "http://192.168.1.100:9000"  # MinIO Server URL
MINIO_ACCESS_KEY = "miniouser"
MINIO_SECRET_KEY = "miniopassword"
WAREHOUSE = "s3://warehouse"

# Initialize the S3 client
s3_client = boto3.client(
    "s3",
    aws_access_key_id=MINIO_ACCESS_KEY,
    aws_secret_access_key=MINIO_SECRET_KEY,
    endpoint_url=MINIO_URL,
    region_name="us-east-1"
)

# Create a sample DataFrame
data = {
    "id": [1, 2, 3, 4, 5, 6, 7, 8, 9, 10],
    "first_name": ["John", "Jane", "Alice", "Bob", "Charlie", "David", "Eve", "Frank", "Grace", "Henry"],
    "last_name": ["Doe", "Smith", "Johnson", "Williams", "Brown", "Jones", "Garcia", "Miller", "Lee", "Davis"],
    "age": [28, 34, 22, 45, 30, 25, 32, 29, 27, 38]
}
df = pd.DataFrame(data)

# Convert the DataFrame to a Parquet file in memory
parquet_buffer = BytesIO()
df.to_parquet(parquet_buffer, index=False)
parquet_buffer.seek(0)

# Upload the Parquet file to MinIO
try:
    s3_client.put_object(Bucket="warehouse", Key="demosaturday2/people/data.parquet", Body=parquet_buffer.getvalue())
    print("Data uploaded to S3 successfully.")
except Exception as e:
    print(f"Failed to upload data to S3: {e}")

# Define the schema for the table
schema = {
    "type": "struct",
    "fields": [
        {"name": "id", "type": "int", "nullable": False},
        {"name": "first_name", "type": "string", "nullable": False},
        {"name": "last_name", "type": "string", "nullable": False},
        {"name": "age", "type": "int", "nullable": False}
    ]
}

# Create a table in Nessie
table_name = "demosaturday2.people"
table_url = f"{NESSIE_URL}/tables/{table_name}"
table_data = {
    "schema": schema,
    "location": f"{WAREHOUSE}/{table_name}"
}
response = requests.post(table_url, json=table_data)
print(f"Create table response: {response.status_code} - {response.text}")
if response.status_code == 201:
    print(f"Table '{table_name}' created successfully in Nessie.")
elif response.status_code == 409:
    print(f"Table '{table_name}' already exists in Nessie.")
else:
    print(f"Failed to create table '{table_name}' in Nessie: {response.text}")

# Verify metadata in Nessie
nessie_url = f"{NESSIE_URL}/trees/main/entries"
response = requests.get(nessie_url)
print(f"Verify metadata response: {response.status_code} - {response.text}")
if response.status_code == 200:
    print("Metadata for 'demosaturday.people':", response.json())
else:
    print("Failed to retrieve metadata from Nessie:", response.text)

Data uploaded to S3 successfully.
Create table response: 404 - 
Failed to create table 'demosaturday2.people' in Nessie: 
Verify metadata response: 200 - {
  "token" : null,
  "entries" : [ ],
  "effectiveReference" : {
    "type" : "BRANCH",
    "name" : "main",
    "hash" : "2e1cfa82b035c26cbbbdae632cea070514eb8b773f616aaeaf668e2f0be8f10d"
  },
  "hasMore" : false
}
Metadata for 'demosaturday.people': {'token': None, 'entries': [], 'effectiveReference': {'type': 'BRANCH', 'name': 'main', 'hash': '2e1cfa82b035c26cbbbdae632cea070514eb8b773f616aaeaf668e2f0be8f10d'}, 'hasMore': False}


In [8]:
# Get the metadata from Nessie Catalog

import requests

NESSIE_URL = "http://192.168.1.100:19120/api/v2"
nessie_url = f"{NESSIE_URL}/trees/main/entries"
response = requests.get(nessie_url)
if response.status_code == 200:
    print("Metadata for 'demosaturday.people':", response.json())
else:
    print("Failed to retrieve metadata from Nessie:", response.text)

Metadata for 'demosaturday.people': {'token': None, 'entries': [], 'effectiveReference': {'type': 'BRANCH', 'name': 'main', 'hash': '2e1cfa82b035c26cbbbdae632cea070514eb8b773f616aaeaf668e2f0be8f10d'}, 'hasMore': False}


In [11]:
import trino
from trino.auth import BasicAuthentication

# Define connection parameters
host = '192.168.1.100'
port = 8442
user = 'admin'
password = 'password'
catalog = 'iceberg'
schema = 'default'

# Create a Trino connection with SSL verification disabled
conn = trino.dbapi.connect(
    host=host,
    port=port,
    user=user,
    catalog=catalog,
    schema=schema,
    http_scheme='https',
    auth=BasicAuthentication(user, password),
    verify=False  # Disable SSL verification
)

# Create a cursor
cur = conn.cursor()

# Define your query to create the schema
query = 'CREATE SCHEMA demosaturday'

# Execute the query
cur.execute(query)

# Close the cursor and connection
cur.close()
conn.close()



TrinoQueryError: TrinoQueryError(type=INTERNAL_ERROR, name=GENERIC_INTERNAL_ERROR, message="Failed to execute GET request against 'http://nessie:19120/api/v2/trees/main?fetch=MINIMAL'.", query_id=20250211_134157_00004_s3jmf)

In [10]:
# Getting all of the entries from the catalog
import trino
from trino.auth import BasicAuthentication

# Define connection parameters
host = '192.168.1.100'
port = 8442
user = 'admin'
password = 'password'
catalog = 'iceberg'
schema = 'default'

# Create a Trino connection with SSL verification disabled
conn = trino.dbapi.connect(
    host=host,
    port=port,
    user=user,
    catalog=catalog,
    schema=schema,
    http_scheme='https',
    auth=BasicAuthentication(user, password),
    verify=False  # Disable SSL verification
)

# Create a cursor
cur = conn.cursor()

# Define your query to describe the table schema
query = 'DESCRIBE demosaturday.people'

# Execute the query
cur.execute(query)

# Fetch and print the results
rows = cur.fetchall()
for row in rows:
    print(row)

# Close the cursor and connection
cur.close()
conn.close()



TrinoUserError: TrinoUserError(type=USER_ERROR, name=SCHEMA_NOT_FOUND, message="line 1:1: Schema 'demosaturday' does not exist", query_id=20250211_133917_00003_s3jmf)

In [9]:
# Query the data directly
import boto3

# MINIO_URL = "http://localhost:9000"
# MINIO_ACCESS_KEY = "admin"
# MINIO_SECRET_KEY = "password"
MINIO_URL = "http://192.168.1.100:9000"
MINIO_ACCESS_KEY = "miniouser"
MINIO_SECRET_KEY = "miniopassword"

s3_client = boto3.client(
    "s3",
    aws_access_key_id=MINIO_ACCESS_KEY,
    aws_secret_access_key=MINIO_SECRET_KEY,
    endpoint_url=MINIO_URL,
    region_name="us-east-1"
)

# Download the Parquet file from MinIO
response = s3_client.get_object(Bucket="warehouse", Key="demosaturday/people/data.parquet")
data = response['Body'].read()
print(data)

b'PAR1\x15\x04\x15\xa0\x01\x15hL\x15\x14\x15\x00\x12\x00\x00P\x04\x01\x00\t\x01\x00\x02\t\x07\x04\x00\x03\r\x08\x00\x04\r\x08\x00\x05\r\x08\x00\x06\r\x08\x00\x07\r\x08\x00\x08\r\x08<\t\x00\x00\x00\x00\x00\x00\x00\n\x00\x00\x00\x00\x00\x00\x00\x15\x00\x15 \x15$,\x15\x14\x15\x10\x15\x06\x15\x06\x1c\x18\x08\n\x00\x00\x00\x00\x00\x00\x00\x18\x08\x01\x00\x00\x00\x00\x00\x00\x00\x16\x00(\x08\n\x00\x00\x00\x00\x00\x00\x00\x18\x08\x01\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x10<\x02\x00\x00\x00\x14\x01\x04\x05\x102Tv\x98\x00\x00\x00\x15\x04\x15\xac\x01\x15\xa0\x01L\x15\x14\x15\x00\x12\x00\x00V\x1c\x04\x00\x00\x00John\x05\x08pane\x05\x00\x00\x00Alice\x03\x00\x00\x00Bob\x07\x00\x00\x00Charli\x05\x1b\x10David\x01\x1b\x04Ev\x05\x10\x10Frank\x0144Grace\x05\x00\x00\x00Henry\x15\x00\x15 \x15$,\x15\x14\x15\x10\x15\x06\x15\x06\x1c6\x00(\x04John\x18\x05Alice\x00\x00\x00\x10<\x02\x00\x00\x00\x14\x01\x04\x05\x102Tv\x98\x00\x00\x00\x15\x04\x15\xba\x01\x15\xb6\x01L\x15\x14\x15\x00\x12\x00\x00]\x98\x03\x00\x

In [4]:
import trino
from trino.auth import BasicAuthentication

# Define connection parameters
host = '192.168.1.100'
port = 8442
user = 'admin'
password = 'password'
catalog = 'iceberg'  #'nessie'
schema = 'default'

# Create a Trino connection with SSL verification disabled
conn = trino.dbapi.connect(
    host=host,
    port=port,
    user=user,
    catalog=catalog,
    schema=schema,
    http_scheme='https',
    auth=BasicAuthentication(user, password),
    verify=False  # Disable SSL verification
)

# Create a cursor
cur = conn.cursor()

# Define your query
query = 'SELECT * FROM your_table_name LIMIT 10'

# Execute the query
cur.execute(query)

# Fetch and print the results
rows = cur.fetchall()
for row in rows:
    print(row)

# Close the cursor and connection
cur.close()
conn.close()



TrinoQueryError: TrinoQueryError(type=INTERNAL_ERROR, name=GENERIC_INTERNAL_ERROR, message="Failed to execute GET request against 'http://nessie:19120/api/v2/trees/main?fetch=MINIMAL'.", query_id=20250211_133142_00002_s3jmf)