In [None]:
!pip install pyiceberg[s3fs,sql-sqlite]

In [None]:
from pyiceberg.catalog import load_catalog

# Nessie REST Catalog に接続
catalog = load_catalog(
    "nessie",
    **{
        "uri": "http://nessie:19120/iceberg/main/",
    }
)

# 名前空間を作成
namespace = "demo"
catalog.create_namespace(namespace)
print(f"✅ 名前空間「{namespace}」が作成されました")


<img src="名前空間作成.png">

In [None]:
from pyiceberg.schema import Schema
from pyiceberg.types import NestedField, IntegerType, StringType, TimestampType
from pyiceberg.partitioning import PartitionField, PartitionSpec
from pyiceberg.table.sorting import SortOrder, SortField
from pyiceberg.transforms import IdentityTransform

# スキーマ定義
user_schema = Schema(
    NestedField(1, "id", IntegerType(), required=True),
    NestedField(2, "name", StringType(), required=False),
    NestedField(3, "created_at", TimestampType(), required=True)
)

# パーティション定義
user_partition_spec = PartitionSpec(
    fields=[
        PartitionField(
            source_id=3,  
            field_id=1000,  
            transform="day",  
            name="created_at_day" 
        )
    ]
)

# ソートオーダー定義
user_sort_order = SortOrder(
    fields=[
        SortField(
            source_id=user_schema.find_field("created_at").field_id,
            transform=IdentityTransform(),
        )
    ]
)

# テーブル作成
catalog.create_table(
    identifier="demo.user_table_with_date_partition",
    schema=user_schema,
    partition_spec=user_partition_spec,
    sort_order=user_sort_order
)

# 詳細
print("user_table_with_date_partition(   # テーブル名")
print("  1: id: required int,            # フィールドID 1、カラム名 id、整数型、必須フィールド")
print("  2: name: optional string,       # フィールドID 2、カラム名 name、文字列型、任意フィールド")
print("  3: created_at: required timestamp  # フィールドID 3、カラム名 created_at、タイムスタンプ型、必須フィールド")
print("),")
print("partition by: [created_at_day],   # パーティショニング設定: created_atフィールドの日付部分でパーティション分割")
print("sort order: [3 ASC NULLS FIRST],  # ソート順設定: フィールドID 3(created_at)で昇順、NULL値を先頭に配置")
print("snapshot: null                    # スナップショット情報: データがまだないのでnull")

<img src="Icebergテーブルの作成.png">

In [None]:
import pyarrow as pa
import datetime

# 正しいスキーマを設定 (created_at フィールドを追加)
arrow_schema = pa.schema([
    pa.field("id", pa.int32(), nullable=False),  # 必須フィールド
    pa.field("name", pa.string()),  # 任意フィールド
    pa.field("created_at", pa.timestamp('us'), nullable=False)  # 必須のタイムスタンプフィールド
])

# パーティションに対応したデータ作成
data_dict = {
    "id": [1, 2, 3, 4, 5],  
    "name": ["Ameri", "Bob", "Caden", "Diana", "Eve"],  
    "created_at": [
        datetime.datetime(2025, 4, 28, 10, 0, 0),
        datetime.datetime(2025, 4, 28, 14, 30, 0),
        datetime.datetime(2025, 4, 28, 9, 15, 0),
        datetime.datetime(2025, 4, 28, 16, 45, 0),
        datetime.datetime(2025, 4, 28, 11, 20, 0)
    ]  
}

# PyArrow Tableの作成
table_data = pa.Table.from_pydict(data_dict, schema=arrow_schema)

#Icebergテーブルをロード
table = catalog.load_table("demo.user_table_with_date_partition")

# データの追加
table.append(table_data)
print("✅ データ書き込み完了")

<img src="dataの追加.png">

In [None]:
# テーブルの現在の状態をスキャン
result = table.scan().to_arrow()

# PyArrow TableをDataFrameに変換
import pandas as pd
df = result.to_pandas()

# DataFrameの内容を確認
print("データの参照結果")
print(df)

In [None]:
import pyarrow as pa
import datetime

# スキーマを設定
arrow_schema = pa.schema([
    pa.field("id", pa.int32(), nullable=False),  
    pa.field("name", pa.string()), 
    pa.field("created_at", pa.timestamp('us'), nullable=False)  
])

# パーティション
data_dict = {
    "id": [6, 7, 8, 9, 10],  # 必須フィールド
    "name": ["Fabiano", "Gem", "Helen", "Iacob", "Jan"],  # 任意フィールド
    "created_at": [
        datetime.datetime(2025, 5, 2, 11, 0, 0),
        datetime.datetime(2025, 5, 3, 13, 30, 0),
        datetime.datetime(2025, 5, 4, 10, 15, 0),
        datetime.datetime(2025, 5, 5, 4, 45, 0),
        datetime.datetime(2025, 5, 6, 1, 20, 0)
    ]  
}

# PyArrow Tableの作成
table_data = pa.Table.from_pydict(data_dict, schema=arrow_schema)

# 正しいテーブル名でIcebergテーブルをロード
table = catalog.load_table("demo.user_table_with_date_partition")

# データの追加
table.append(table_data)
print("✅ データ書き込み完了")


<img src="パーティション反映確認.png">

In [None]:
# テーブルの現在の状態をスキャン
result = table.scan().to_arrow()

# PyArrow TableをPandas DataFrameに変換
import pandas as pd
df = result.to_pandas()
df = df.sort_values(by="id", ascending=True)

# DataFrameの内容を確認
print("Existing Data (sorted by id):")
print("データの参照結果")
print(df)

In [None]:

# テーブルプロパティを取得
properties = table.properties
print("Table Properties:", properties)

# 各プロパティの注釈定義
property_notes = {
    'nessie.catalog.content-id': 'NessieのコンテンツID',
    'created-at': 'テーブル作成日時',
    'nessie.commit.id': 'NessieのコミットID',
    'gc.enabled': 'ガベージコレクション無効フラグ',
    'nessie.commit.ref': 'Nessieのブランチ参照'
}

#注釈を付けて出力
print("\n# プロパティ")
for key, value in properties.items():
    note = property_notes.get(key, '説明なし')
    print(f"  '{key}': '{value}',  # {note}")



In [None]:
# パーティション情報の表示
partition_spec = table.spec()
print("Partition Spec:", partition_spec)

# 注釈
print("""
# 説明:
# テーブル内のデータは「created_at」フィールドの日付部分（年月日）でパーティション分割され、
# 同じ日付のデータは同じパーティションに格納されます。これは時系列データに対して効率的なクエリを
# 可能にする適切なパーティション戦略です。
""")

<img src="パーティション反映確認.png">

In [None]:
# スナップショットの取得
snapshots = table.snapshots()
print("Table Snapshots:", snapshots)

# スナップショット情報をより読みやすく表示
if snapshots:
    snapshot = snapshots[0]  # 最初のスナップショット
    
    # スナップショットの主要パラメータを取得
    snapshot_id = snapshot.snapshot_id
    parent_id = snapshot.parent_snapshot_id
    seq_num = snapshot.sequence_number
    timestamp = snapshot.timestamp_ms
    manifest_path = snapshot.manifest_list
    schema_id = snapshot.schema_id
    
    # サマリー情報を取得
    summary = snapshot.summary
    operation_type = summary.operation
    
    # 注釈
    print("\n# スナップショット情報（注釈付き）")
    print(f"snapshot_id={snapshot_id}                # スナップショットの一意識別子")
    print(f"parent_snapshot_id={parent_id}                        # 親スナップショットID（Noneは最初のスナップショットを意味する）")
    print(f"sequence_number={seq_num}                            # このスナップショットのシーケンス番号")
    print(f"timestamp_ms={timestamp}                    # スナップショット作成時刻（UNIX時間・ミリ秒）")
    print(f"manifest_list='{manifest_path}'  # マニフェストファイルリストのS3パス")
    print("summary=Summary(")
    print(f"  {operation_type},                           # 操作タイプ（APPEND=データ追加）")
    print("  **{")
    


In [None]:
# テーブルのマニフェスト情報を取得
manifests = table.inspect.manifests()

import pandas as pd
manifests_df = manifests.to_pandas()

### 出力結果用
print("=== テーブルのマニフェスト情報 ===")
print(f"マニフェストファイル数: {len(manifests_df)}")

for i, row in manifests_df.iterrows():
    print(f"\nマニフェスト #{i+1}:")
    print(f"  パス: {row['path']}")
    print(f"  スナップショットID: {row['added_snapshot_id']}")
    print(f"  データファイル数: {row['added_data_files_count']}")
    print(f"  既存データファイル数: {row['existing_data_files_count']}")
    print(f"  削除データファイル数: {row['deleted_data_files_count']}")
    print(f"  追加削除ファイル数: {row['added_delete_files_count']}")
    print(f"  既存削除ファイル数: {row['existing_delete_files_count']}")
    
# マニフェストの要約情報
print("\n=== マニフェスト要約 ===")
total_data_files = manifests_df['added_data_files_count'].sum()
print(f"合計データファイル数: {total_data_files}")
print(f"すべてのデータファイルは同じスナップショット(ID: {manifests_df['added_snapshot_id'].iloc[0]})で追加されました")
if manifests_df['deleted_data_files_count'].sum() == 0 and manifests_df['added_delete_files_count'].sum() == 0:
    print("削除されたファイルや削除ファイルは存在しません")

# Nessie Catalog の commit を確認しましょう
<img src="NessieCatalog_Branch.png">
<img src="main_Branch_commit.png">

In [None]:
#スキーマの進化
## 既存のテーブルを読み込む
table = catalog.load_table("demo.user_table_with_date_partition")

## スキーマを更新
with table.update_schema() as update:
    # 新しいカラムを追加（IntegerType型の「age」カラム）
    update.add_column("age", IntegerType(), "ユーザーの年齢")
    
    # 別の新しいカラム（StringType型の「email」カラム）
    update.add_column("email", StringType(), "ユーザーのメールアドレス")

In [None]:
# テーブルの現在の状態をスキャン
result = table.scan().to_arrow()

# PyArrow TableをPandas DataFrameに変換
import pandas as pd
df = result.to_pandas()
df = df.sort_values(by="id", ascending=True)

# DataFrameの内容を確認
print("Existing Data (sorted by id):")
print("データの参照結果")
print(df)

In [None]:
# テーブル内のすべてのデータを削除
from pyiceberg.expressions import AlwaysTrue
table.delete(AlwaysTrue())

print("✅ すべてのデータを削除しました")

# テーブルの現在の状態をスキャン
result = table.scan().to_arrow()

# PyArrow TableをPandas DataFrameに変換
import pandas as pd
df = result.to_pandas()
df = df.sort_values(by="id", ascending=True)

# DataFrameの内容を確認
print("Existing Data (sorted by id):")
print("データの参照結果")
print(df)


## ここからローカル上から Nessie-CLI の実行

In [None]:
from pyiceberg.catalog import load_catalog

# Nessie REST Catalog に接続
catalog = load_catalog(
    "nessie",
    **{
        "uri": "http://nessie:19120/iceberg/feature_branch/",
    }
)


In [None]:
# Feture_branch の情報を確認
print(catalog.name)  # => nessie
print(catalog.uri)   # => http://nessie:19120/iceberg/feature_branch/
print(catalog.list_namespaces())

In [None]:
import pyarrow as pa
import datetime

# 正しいスキーマを設定 (created_at フィールドを追加)
arrow_schema = pa.schema([
    pa.field("id", pa.int32(), nullable=False),  # 必須フィールド
    pa.field("name", pa.string()),  # 任意フィールド
    pa.field("created_at", pa.timestamp('us'), nullable=False)  # 必須のタイムスタンプフィールド
])

# パーティションに対応したデータ作成
data_dict = {
    "id": [11, 12, 13, 14, 15],  
    "name": ["Kon", "Luis", "Mary", "Nakamura", "Oda"],  
    "created_at": [
        datetime.datetime(2025, 5, 28, 10, 0, 0),
        datetime.datetime(2025, 5, 28, 14, 30, 0),
        datetime.datetime(2025, 5, 28, 9, 15, 0),
        datetime.datetime(2025, 5, 28, 16, 45, 0),
        datetime.datetime(2025, 5, 28, 11, 20, 0)
    ]  
}

# PyArrow Tableの作成
table_data = pa.Table.from_pydict(data_dict, schema=arrow_schema)

#Icebergテーブルをロード
table = catalog.load_table("demo.user_table_with_date_partition")

# データの追加
table.append(table_data)
print("✅ データ書き込み完了")

In [None]:
# テーブルの現在の状態をスキャン
result = table.scan().to_arrow()

# PyArrow TableをPandas DataFrameに変換
import pandas as pd
df = result.to_pandas()
df = df.sort_values(by="id", ascending=True)

# DataFrameの内容を確認
print("Existing Data (sorted by id):")
print("データの参照結果")
print(df)

In [None]:
from pyiceberg.catalog import load_catalog

# Nessie REST Catalog に接続
catalog = load_catalog(
    "nessie",
    **{
        "uri": "http://nessie:19120/iceberg/main/",
    }
)


In [None]:
# Feture_branch の情報を確認
print(catalog.name)  # => nessie
print(catalog.uri)   # => http://nessie:19120/iceberg/feature_branch/
print(catalog.list_namespaces())

In [None]:
# テーブルの現在の状態をスキャン
result = table.scan().to_arrow()

# PyArrow TableをPandas DataFrameに変換
import pandas as pd
df = result.to_pandas()
df = df.sort_values(by="id", ascending=True)

# DataFrameの内容を確認
print("Existing Data (sorted by id):")
print("データの参照結果")
print(df)