In [1]:
import pyspark
from pyspark.conf import SparkConf
from pyspark.sql import SparkSession
import pandas as pd
import requests

CATALOG_URL = "http://server:8080/catalog"
MANAGEMENT_URL = "http://server:8080/management"
SPARK_VERSION = pyspark.__version__
SPARK_MINOR_VERSION = '.'.join(SPARK_VERSION.split('.')[:2])
ICEBERG_VERSION = "1.5.2"

# Create a new Catalog

In [2]:
response = requests.post(f"{MANAGEMENT_URL}/v1/warehouse",
              json={
                # Name of the new warehouse
                "warehouse-name": "new_warehouse",
                # Project to create the warehouse in.
                # Projects do not need to exist before creating a warehouse in them.
                # For simple deployments that do not require multiple projects with
                # multiple warehouses each, we recommend using a single warehouse
                # with the null UUID
                "project-id": "12300000-0000-0000-0000-000000000000",
                # Physical location of this warehouse
                "storage-profile": {
                    "type": "s3",
                    "bucket": "examples",
                    "key-prefix": "path/to/new-warehouse/",
                    "assume-role-arn": None,
                    "endpoint": "http://minio:9000",
                    "region": "local-01",
                    "path-style-access": True
                },
                # Storage Credentials for the profile specified above.
                # These credentials are used to grant clients access to specific files in the storage.
                # Clients do not need to know those credentials and will never obtain them directly.
                "storage-credential": {
                    "type": "s3",
                    "credential-type": "access-key",
                    "aws-access-key-id": "minio-root-user",
                    "aws-secret-access-key": "minio-root-password"
                }
            })
response.status_code

201

In [None]:
# As warehouse names must be unique inside a project, creating the same warehouse again would fail

# Connect with Spark

In [3]:
config = {
    "spark.sql.catalog.demo-catalog": "org.apache.iceberg.spark.SparkCatalog",
    "spark.sql.catalog.demo-catalog.type": "rest",
    "spark.sql.catalog.demo-catalog.uri": CATALOG_URL,
    # If you are not using the default project, prefix the warehouse name
    # with <project-id>/
    "spark.sql.catalog.demo-catalog.warehouse": "12300000-0000-0000-0000-000000000000/new_warehouse",
    "spark.sql.catalog.demo-catalog.io-impl": "org.apache.iceberg.aws.s3.S3FileIO",
    "spark.sql.extensions": "org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions",
    "spark.sql.defaultCatalog": "demo-catalog",
    "spark.jars.packages": f"org.apache.iceberg:iceberg-spark-runtime-{SPARK_MINOR_VERSION}_2.12:{ICEBERG_VERSION},org.apache.iceberg:iceberg-aws-bundle:{ICEBERG_VERSION}",
}

In [4]:
spark_config = SparkConf().setMaster('local').setAppName("Iceberg-REST")
for k, v in config.items():
    spark_config = spark_config.set(k, v)

spark = SparkSession.builder.config(conf=spark_config).getOrCreate()

In [5]:
spark.sql("CREATE NAMESPACE IF NOT EXISTS spark_demo")
spark.sql("SHOW NAMESPACES").toPandas()

Unnamed: 0,namespace
0,spark_demo


In [7]:
data = pd.DataFrame([[1, 'a-string', 2.2]], columns=['id', 'strings', 'floats'])
sdf = spark.createDataFrame(data)

In [8]:
sdf.writeTo("spark_demo.my_table").createOrReplace()

In [9]:
spark.sql("SELECT * FROM spark_demo.my_table").toPandas()

Unnamed: 0,id,strings,floats
0,1,a-string,2.2
