In [7]:
import logging
import os
from pyspark import SparkConf
from pyspark import SparkContext
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, LongType, DoubleType, StringType

iceberg_version = "1.5.2"
spark_version = "3.5.1"
hadoop_aws_version = "3.3.4"
jdk_bundle = "1.12.262"

spark = SparkSession.builder \
    .appName("IcebergS3Example") \
    .config("spark.jars.repositories", "https://repo1.maven.org/maven2") \
    .config("spark.jars.packages", 
        f'org.apache.iceberg:iceberg-spark-runtime-3.5_2.12:{iceberg_version},'
        f'org.apache.spark:spark-avro_2.12:{spark_version},'
            f'org.apache.hadoop:hadoop-aws:{hadoop_aws_version},'
            f'com.amazonaws:aws-java-sdk-bundle:{jdk_bundle}')\
    .config("spark.sql.extensions", "org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions") \
    .config("spark.sql.catalog.spark_catalog", "org.apache.iceberg.spark.SparkSessionCatalog") \
    .config("spark.sql.catalog.spark_catalog.type", "hadoop") \
    .config("spark.sql.catalog.spark_catalog.warehouse", "s3a://test/warehouse") \
    .config("spark.hadoop.fs.s3a.endpoint", "http://localhost:11111") \
    .config("spark.hadoop.fs.s3a.access.key", "clickhouse") \
    .config("spark.hadoop.fs.s3a.secret.key", "clickhouse") \
    .config("spark.hadoop.fs.s3a.path.style.access", "true") \
    .config("spark.hadoop.fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem") \
    .getOrCreate()


In [9]:
TABLE_NAME = "test_iceberg_schema_evolution_6"

In [10]:
spark.sql(
    f"""
    CREATE TABLE {TABLE_NAME} (
        a int NOT NULL,
        b int NOT NULL
    )
    USING iceberg
    OPTIONS ('format-version'='2');
    """
)

spark.sql(
    f"""
    INSERT INTO {TABLE_NAME} VALUES (0, 1);
    """
)

spark.sql(
    f"""
    ALTER TABLE {TABLE_NAME} RENAME COLUMN b TO c;
    """
)

spark.sql(
    f"""
    ALTER TABLE {TABLE_NAME} RENAME COLUMN a TO b;
    """
)

spark.sql(
    f"""
    ALTER TABLE {TABLE_NAME} RENAME COLUMN c TO a;
    """
)

spark.sql(
    f"""
    ALTER TABLE {TABLE_NAME} ALTER COLUMN b AFTER a;
    """
)

spark.sql(
    f"""
    INSERT INTO {TABLE_NAME} VALUES (1, 0);
    """
)


DataFrame[]

In [11]:
spark.sql(
    f"SELECT * FROM {TABLE_NAME} ORDER BY ALL"
).show(n = 100)

+---+---+
|  a|  b|
+---+---+
|  1|  0|
|  1|  0|
+---+---+



In [66]:
import pandas as pd

clickhouse_to_pandas_types = {
    "Int32": "int",
}

def convert_schema_and_data_to_pandas_df(schema_raw, data_raw):
    # Extract column names from schema
    schema_rows = list(
        map(
            lambda x: x.split("\t")[:2],
            filter(lambda x: len(x) > 0, schema_raw.strip().split("\n")),
        )
    )
    column_names = [x[0] for x in schema_rows]
    types = [x[1] for x in schema_rows]
    pandas_types = [clickhouse_to_pandas_types[t]for t in types]

    schema_df = pd.DataFrame([types], columns=column_names)
    
    # Convert data to DataFrame
    data_rows = list(
        map(
            lambda x: x.split("\t"),
            filter(lambda x: len(x) > 0, data_raw.strip().split("\n")),
        )
    )
    
    if data_rows:
        data_df = pd.DataFrame(data_rows, columns=column_names, dtype='object')
    else:
        # Create empty DataFrame with correct columns
        data_df = pd.DataFrame(columns=column_names, dtype='object')
    
    data_df = data_df.astype(dict(zip(column_names, pandas_types)))
    return schema_df, data_df

In [67]:
schema_df, data_df = convert_schema_and_data_to_pandas_df(
    "a\tInt32\n" \
    "b\tInt32",
    "1\t2"
)
schema_df

Unnamed: 0,a,b
0,Int32,Int32


In [68]:
data_df

Unnamed: 0,a,b
0,1,2
