In [None]:
import pyspark
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, IntegerType, StringType, DoubleType
import os

## DEFINE SENSITIVE VARIABLES
CMD = "curl -v minio-lake:9000 2>&1 | grep -o '(.*).' | tr -d '() '"
# Capture the output of the command using subprocess
MINIO_END_POINT = 'http://' + os.popen(CMD).read().replace('\n', '')  +':9000'

# Configure Spark with necessary packages and Iceberg/Nessie settings
conf = (
    pyspark.SparkConf()
        .setAppName('sales_data_app')
        .set('spark.sql.catalog.nessie.s3.endpoint', MINIO_END_POINT)
)

# Start Spark session
spark = SparkSession.builder.config(conf=conf).getOrCreate()
# spark = SparkSession.builder.getOrCreate()
print("Spark Session Started")

In [None]:
# Define a schema for the sales data
schema = StructType([
    StructField("order_id", IntegerType(), True),
    StructField("customer_id", IntegerType(), True),
    StructField("product", StringType(), True),
    StructField("quantity", IntegerType(), True),
    StructField("price", DoubleType(), True),
    StructField("order_date", StringType(), True)
])

# Create a DataFrame with messy sales data (including duplicates and errors)
sales_data = [
    (1, 101, "Laptop", 1, 1000.00, "2023-08-01"),
    (2, 102, "Mouse", 2, 25.50, "2023-08-01"),
    (3, 103, "Keyboard", 1, 45.00, "2023-08-01"),
    (1, 101, "Laptop", 1, 1000.00, "2023-08-01"),  # Duplicate
    (4, 104, "Monitor", None, 200.00, "2023-08-02"),  # Missing quantity
    (5, None, "Mouse", 1, 25.50, "2023-08-02")  # Missing customer_id
]

# Convert the data into a DataFrame
sales_df = spark.createDataFrame(sales_data, schema)

# Create the "sales" namespace
spark.sql("CREATE NAMESPACE nessie.sales;").show()

# Write the DataFrame to an Iceberg table in the Nessie catalog
# NOTE this would start writing data to MINIO (S3) before Nessie. So if nessie namespace doesn't existed, spark will double write data onto MINIO.
sales_df.writeTo("nessie.sales.sales_data_abc").createOrReplace()

# Verify by reading from the Iceberg table
spark.read.table("nessie.sales.sales_data_abc").show()

spark.stop()

In [None]:
import boto3
import json


MINIO_ACCESS_KEY = os.environ.get('MINIO_ACCESS_KEY')
MINIO_SECRET_KEY = os.environ.get('MINIO_SECRET_KEY')

# Define Minio connection parameters
minio_client = boto3.client(
    's3',
    endpoint_url=MINIO_END_POINT,  # Minio IP address from docker inspect
    aws_access_key_id=MINIO_ACCESS_KEY,
    aws_secret_access_key=MINIO_SECRET_KEY,
    region_name='ap-southeast-1'
)

# Specify the bucket and metadata file path
bucket_name = 'datalakehouse'
metadata_file_key = 'sales/sales_data_raw_7720cf28-1423-431e-83ab-587e1da2ed48/metadata/00000-d50b2fc1-a0b9-468d-b01b-49b3ed5d1ad4.metadata.json'  # Example metadata path

# Download the metadata file
metadata_file = minio_client.get_object(Bucket=bucket_name, Key=metadata_file_key)
metadata_content = metadata_file['Body'].read().decode('utf-8')

# Parse and print the metadata content
metadata_json = json.loads(metadata_content)
print(json.dumps(metadata_json, indent=4))

In [None]:
from minio import Minio
from minio.error import S3Error
import json

## DEFINE SENSITIVE VARIABLES
CMD = "curl -v minio-lake:9000 2>&1 | grep -o '(.*).' | tr -d '() '"
# Capture the output of the command using subprocess
MINIO_END_POINT = os.popen(CMD).read().replace('\n', '')  +':9000'
MINIO_ACCESS_KEY = os.environ.get('MINIO_ACCESS_KEY')
MINIO_SECRET_KEY = os.environ.get('MINIO_SECRET_KEY')

# Define Minio connection parameters
minio_client = Minio(MINIO_END_POINT,
    access_key=MINIO_ACCESS_KEY,
    secret_key=MINIO_SECRET_KEY,
    secure=False
)

# Specify the bucket and metadata file path
bucket_name = 'datalakehouse'
metadata_file_key = 'sales/sales_data_abc_6f787c15-8d54-44e4-b4b7-22e11cbda9e2/metadata/00000-731e5735-bd02-44b0-a011-354164361fa9.metadata.json'  # Example metadata path

# Download the metadata file
metadata_file = minio_client.get_object(bucket_name, metadata_file_key)
metadata_content = metadata_file.read().decode('utf-8')

# Parse and print the metadata content
metadata_json = json.loads(metadata_content)
print(json.dumps(metadata_json, indent=4))