# はじめに

本ノートブックでは **プロビジョニング汎用コンピューティング** をアタッチしてください。
学習観点でのわかりやすさのため、本ノートブックでは一部で hive_metastore や DBFS を利用しますが、これらの機能がサーバレスクラスタを通じて利用できないためです。

**注意**  
hive_metastore や DBFS はいずれもセキュリティ観点で利用が非推奨となっており、現在はそれぞれ後継機能（Unity Catalog、外部ローケーション）が提供されいます。そうした背景もあり、hive_metastore や DBFS はサーバレスコンピューティングではサポートされていません。  
よって本ノートブックではプロビジョニング汎用コンピューティングを使用しますが、hive_metastore や DBFS へのオペレーション自体は後継機能（Unity Catalog、外部ローケーション）でも同様であるためそれぞれ後継機能で置き換えることで動作します。

In [0]:
%run .././include/handson.h

In [0]:
%python
# SQL コンテキスト用パラメータ定義（spark.conf.set はサーバレス未サポートのため Widgets で環境変数定義）
dbutils.widgets.text("sample_dataset_path", sample_dataset_path)
dbutils.widgets.text("your_catalog", your_catalog)
dbutils.widgets.text("your_schema", your_schema)

# spark.conf.set(f"sample_dataset_path", sample_dataset_path) # for SQL Context
# spark.conf.set(f"your_catalog", your_catalog) # for SQL Context
# spark.conf.set(f"your_schema", your_schema) # for SQL Context

# 1. Delta Lake の構造

**はじめに**：学習観点のわかりやすさのため hive_metastore を利用するためにコンテキストを切り替えます

In [0]:
USE CATALOG hive_metastore;
CREATE SCHEMA IF NOT EXISTS ${your_schema};
USE SCHEMA ${your_schema};

## Delta Table の作成とトランザクションの実行

ここでは構造や挙動の確認を通じて Delta Table の理解を深めます。

さっそく Delta Table を定義しいくつかのオペレーション（トランザクション）を実行してみましょう。

In [0]:
DROP TABLE IF EXISTS students;

CREATE OR REPLACE TABLE students  (id INT, name STRING, value DOUBLE);

INSERT INTO students VALUES (1, "Yve", 1.0);
INSERT INTO students VALUES (2, "Omar", 2.5);
INSERT INTO students VALUES (3, "Elia", 3.3);

INSERT INTO students
VALUES 
  (4, "Ted", 4.7),
  (5, "Tiffany", 5.5),
  (6, "Vini", 6.3);

UPDATE students 
SET value = value + 1
WHERE name LIKE "T%";

DELETE FROM students 
WHERE value > 6;

CREATE OR REPLACE TEMP VIEW updates(id, name, value, type) AS VALUES
  (2, "Omar", 15.2, "update"),
  (3, "", null, "delete"),
  (7, "Blue", 7.7, "insert"),
  (11, "Diya", 8.8, "update");
  
MERGE INTO students b
USING updates u
ON b.id=u.id
WHEN MATCHED AND u.type = "update"
  THEN UPDATE SET *
WHEN MATCHED AND u.type = "delete"
  THEN DELETE
WHEN NOT MATCHED AND u.type = "insert"
  THEN INSERT *;

## テーブルのメタデータの確認

**`DESCRIBE EXTENDED`** を使用すると、テーブルに関するメタデータを表示できます。

In [0]:
DESCRIBE EXTENDED students;

**`Location`** フィールドを見ると Delta Table はクラウドストレージに格納されていることが確認できます。

## Delta Table を構成するファイルの探索

Databricksのユーティリティ関数を使用して Delta Table を構成するファイル構造を探索してみましょう。

**補足** ：利用者にとって本来はファイル構造を意識する必要はありませんが技術がどのように実装されているかを理解を深めるのに役立ちます。

**注意** ：同様に利用者によるストレージへの直接アクセスは本来は制限されるべきですが、レガシー（利用非推奨）である hive_metastore ではストレージパスのリストが可能です（後継の Unity Catalog ではサポートされない操作です）。

In [0]:
%python
tbl_location = spark.sql("DESCRIBE DETAIL students").first().location
data_files = dbutils.fs.ls(tbl_location)
display(data_files)



ディレクトリにはいくつかの Parquet ファイルと  **`_delta_log`** ディレクトリが含まれていることが確認できます。

- Delta Table のデータは Parquet ファイルとして保存されます。

- Delta Table へのトランザクションは **`_delta_log`** ディレクトリ内のトランザクションログとして記録されます。

次に **`_delta_log`** ディレクトリ内のトランザクションログを探索してみましょう。

In [0]:
%python
log_files = dbutils.fs.ls(tbl_location + "/_delta_log")
display(log_files)

JSON ファイル がトランザクションログファイルに相当します。

単一 トランザクションごとに単一 JSON ファイルを書き込みます。よって JSON ファイルの数からこのテーブルに対するトランザクション数がわかります。  

## トランザクション履歴の確認

トランザクションの履歴を見てみましょう。

In [0]:
DESCRIBE HISTORY students;

ユーザー操作以外の内部的なシステムトランザクション（OPTIMIZE）を含むすべてのトランザクションが確認できます。

**補足**: Databricks Runtime 10.4 LTS 以降では MERGE、UPDATE、DELETE の各操作に対して OPTIMIZE が常に有効になります。

**参考**：[データ ファイル サイズを制御するように Delta Lake を構成する](https://learn.microsoft.com/ja-jp/azure/databricks/delta/tune-file-size)

## データファイルの深堀
**`DESCRIBE DETAIL`** は **`DESCRIBE EXTENDED`** と同様にテーブルのメタデータを参照するための別のコマンドです。

**`DESCRIBE DETAIL`** の **`numFiles`** フィールドにより該当テーブルにおける最新バージョンで有効なデータファイル数が確認できます。

In [0]:
DESCRIBE DETAIL students;

Delta Table のディレクトリ配下に多くのデータファイルが確認できましたが最新バージョンとして有効なデータファイルは１つであることがわかります。

次に Delta Table の各レコードが格納されているデータファイルを確認してみましょう。

Delta Table は内部的にメタデータを保持しており _metadata.* 列としてアクセスが可能で **`_metadata.file_name`** 列によりレコードが格納されているデータファイルを確認できます。

**参考**：[ファイル メタデータ列](https://learn.microsoft.com/ja-jp/azure/databricks/ingestion/file-metadata-column)

In [0]:
SELECT _metadata.file_name, * FROM students;

GROUP BY により現在有効なファイルを確認できます。

In [0]:
SELECT _metadata.file_name FROM students GROUP BY _metadata.file_name;

**`DESCRIBE DETAIL`** コマンドの結果と同じく現在有効なファイルは１つであることが確認できます。

## データファイルの最適化によるクエリパフォーマンス向上（OPTIMIZE）

データファイルはさまざまな理由で作成されます。

例えば３つの INSERT トランザクションを実行し作成されるデータファイルを確認してみましょう。

In [0]:
INSERT INTO students VALUES (11, "oota", 1.1);
INSERT INTO students VALUES (12, "tomo", 2.2);
INSERT INTO students VALUES (13, "yuki", 3.3);

SELECT _metadata.file_name, * FROM students;

GROUP BY で現在有効なファイルを確認すると３つのデータファイルが追加されたことが確認できます。

In [0]:
SELECT _metadata.file_name FROM students GROUP BY _metadata.file_name;

このように小さなファイルが大量にある状況は Spark エンジンでの性能において望ましくない状況です。

この状況をメンテナンスするのが **`OPTIMIZE`** コマンドです。

**`OPTIMIZE`** コマンドは対象の Delta Table 内の大量の小さいなファイルを少量の大きなファイルへパッキングすることでのちのクエリパフォーマンスを向上させます。


In [0]:
OPTIMIZE students;

現在のバージョンで有効なデータファイル数を確認してみましょう。

４つのデータファイルが１つのデータファイルにパッキングされていることがわかるはずです。

In [0]:
SELECT _metadata.file_name FROM students GROUP BY _metadata.file_name;

最後にあらためてトランザクション履歴を見てみましょう。

**`OPTIMIZE`** コマンド自体もひとつのトランザクションであることがわかります。

In [0]:
DESCRIBE HISTORY students;

## トランザクションログファイルの深堀
データファイルの深堀で確認した通り最新バージョンで有効なデータファイル合計数に比較して Delta Table のディレクトリ配下のデータファイル合計数は大きくなっていました。

それは Delta Table は変更されたデータを含むファイルは直ちに削除することはせず履歴として保持されるためです。これによりトランザクションログを使用したテーブルの任意時点のスナップショット参照を実現しています。

例として **`MERGE`** ステートメントに対応するトランザクションログを見てみましょう。

In [0]:
%python
display(spark.sql(f"SELECT * FROM json.`{tbl_location}/_delta_log/00000000000000000008.json`"))

**`add`** 列には、テーブルに書き込まれたすべての新しいファイルのリストが含まれています。

**`remove`** 列はこの時点でテーブルに含まれるべきでなくなったファイルを示しています。

Delta Table クエリするとクエリエンジンはトランザクションログを使用し現在のバージョンで有効なすべてのファイルを識別し他のすべてのデータファイルを無視して結果を返却します。

## データのタイムトラベル（TIMESTAMP/VERSION AS OF）

繰り返しになりますが、最新バージョンで有効なデータファイルではない古いデータファイルが残っている理由は履歴として保持するためでトランザクションログを使用することでテーブルの任意時点のスナップショットを参照することができます。

任意時点のスナップショットを参照をタイムトラベルとよび **`TIMESTAMP AS OF`** または **`VERSION AS OF`** によってクエリします。

ここでは例として バージョン３ 時点のスナップショットを参照してみます。

In [0]:
SELECT * FROM students VERSION AS OF 3 ORDER BY id;

## データの復元（RESTORE）

タイムトラベルを利用することでテーブルの任意時点のスナップショットを最新バージョンとして復元することも可能です。

例えばオペレーションミスによって全レコードを削除してしまったことを仮定してみましょう。

In [0]:
DELETE FROM students;

SELECT * FROM students;

意図せず全レコードが削除され最新バージョンは空のテーブルになってしまいました。

これをテーブルの任意時点のスナップショットで復元するには **`RESTORE`** コマンドを利用します。

In [0]:
DESCRIBE HISTORY students;

トランザクション履歴を確認し DELETE の直前のバージョンに戻します。

In [0]:
RESTORE TABLE students TO VERSION AS OF 13;

SELECT * FROM students ORDER BY id;

**`RESTORE`** 自体もトランザクションとして記録されます。

In [0]:
DESCRIBE HISTORY students;

テーブルのすべてのレコードを誤って削除した事実は残しつつ操作を元に戻し、テーブルを望ましい状態に戻すことができました。

## 古いデータファイルのクリーンアップ（VACUUM）

古いデータファイルは履歴としていつまで保持されるのでしょうか？

古いデータファイルの削除は **`VACUUM`** コマンドによって処理します。

**`VACUUM`** コマンドの実行は明示的に実行する もしくは 予測最適化機能を利用した自動実行 の選択肢があります。

**古さ（保持期間）** は `delta.deletedFileRetentionDuration` で設定する閾値で判定されます。既定値は７日でそれよりも古いファイルが削除の対象になります。

この保持期間はVACUUM 実行時のオプションで保持期間をオーバーライド可能ですが `delta.deletedFileRetentionDuration` よりも下回る指定がなされた場合はガードレールによりエラーとなります。

以下のコメントアウトを外し実行しガードレールが機能することを確認してみましょう。

In [0]:
-- VACUUM students RETAIN 0 HOURS;

ここではデモとして保持期間なしを意味する **`0 HOURS`** を `delta.deletedFileRetentionDuration` に設定することでガードレールを無効化します。本番運用では必ずガードレールを構成してください。

In [0]:
ALTER TABLE students SET TBLPROPERTIES ('delta.deletedFileRetentionDuration' = '0 HOURS');

VACUUM は DRY RUN オプションにより実際に削除されるファイルを事前に確認することが可能です（実際の削除は実行されません）。

In [0]:
VACUUM students RETAIN 0 HOURS DRY RUN;

現在のデータファイルも確認しておきましょう。

In [0]:
%python
tbl_location = spark.sql("DESCRIBE DETAIL students").first().location
data_files = dbutils.fs.ls(tbl_location)
display(data_files)

それでは **`VACUUM`** を実行し古いデータファイルをクリーンアップしましょう。 

In [0]:
VACUUM students RETAIN 0 HOURS;

あらためて現在のデータファイルを確認します。最新バージョンとして有効な１つのデータファイルだけが残っていることが確認できるはずです。

In [0]:
%python
tbl_location = spark.sql("DESCRIBE DETAIL students").first().location
data_files = dbutils.fs.ls(tbl_location)
display(data_files)

トランザクション履歴を確認すると VACUUM 自体もトランザクションとして管理されていることがわかります。

In [0]:
DESCRIBE HISTORY students;

トランザクション履歴は残っていますが VACUUM によって削除された古いファイルを参照するバージョンを復元することはできなくなります。

**留意** ：結果が返ってくる場合はキャッシュが効いている可能性があります。時間に余裕があるときにクラスタを再起動したのちに再度確認してみてください。 

In [0]:
-- SELECT * FROM students VERSION AS OF 9 ORDER BY id;

In [0]:
-- SELECT * FROM parquet.`dbfs:/user/hive/warehouse/handson_tico_demo01_schema.db/students/_delta_log/00000000000000000016.checkpoint.parquet`;

## 非 Delta Table

非 Delta Tableについても同様に構造や挙動を簡単に見て Delta Lake との違いを確認してみましょう。



テーブル作成時の **`USING`** により以下のフォーマットを選択できます。既定は Delta です。

- AVRO
- BINARYFILE
- CSV
- DELTA（既定）
- JSON
- ORC
- PARQUET
- TEXT

**参考** ：[CREATE TABLE [USING]](https://learn.microsoft.com/ja-jp/azure/databricks/sql/language-manual/sql-ref-syntax-ddl-create-table-using)

ここでは CSV と Parquet で作成してみましょう。

In [0]:
DROP TABLE IF EXISTS csv_table;
CREATE TABLE csv_table (id int, name string) USING csv;
INSERT INTO csv_table VALUES (1, "LAND CRUISER 70"), (2, "LAND CRUISER 250");
SELECT * FROM csv_table ORDER BY id;

In [0]:
DROP TABLE IF EXISTS parquet_table;
CREATE TABLE parquet_table (id int, name string) USING parquet;
INSERT INTO parquet_table VALUES (1, "1VD-FTV"), (2, "1GD-FTV");
SELECT * FROM parquet_table ORDER BY id;

テーブルのメタデータからテーブルのフォーマットを確認してみましょう。

In [0]:
DESCRIBE EXTENDED csv_table;

In [0]:
DESCRIBE EXTENDED parquet_table;

テーブルのファイル構造も確認してみましょう。

Delta Lake と異なりトランザクションログ（ **`_delta_log`** ディレクトリ）が存在しないことがわかります。

In [0]:
%python
tbl_location = spark.sql("DESCRIBE DETAIL csv_table").first().location
data_files = dbutils.fs.ls(tbl_location)
display(data_files)

In [0]:
%python
tbl_location = spark.sql("DESCRIBE DETAIL parquet_table").first().location
data_files = dbutils.fs.ls(tbl_location)
display(data_files)

非 Delta Table に対しては追記（INSERT）のみ可能です。

In [0]:
INSERT INTO csv_table VALUES (3, "LAND CRUISER 300");
SELECT * FROM csv_table ORDER BY id;

In [0]:
INSERT INTO parquet_table VALUES (3, "F33A-FTV");
SELECT * FROM parquet_table ORDER BY id;

UPDATE や DELETE など変更操作はサポートされません。

In [0]:
-- UPDATE parquet_table SET name = "F33A-FTV" WHERE id = 2;
-- DELETE FROM parquet_table WHERE id = 3;

トランザクションログも存在しないため履歴の確認やタイムトラベル、リストアもサポートされません。

In [0]:
-- DESCRIBE HISTORY parquet_table;
-- SELECT * FROM parquet_table VERSION AS OF 1 ORDER BY id;

# 2. テーブルの分類（マネージド or アンマネージド）

**はじめに**：学習観点のわかりやすさのため hive_metastore を利用するためにコンテキストを切り替えます

In [0]:
USE CATALOG hive_metastore;
CREATE SCHEMA IF NOT EXISTS ${your_schema};
USE SCHEMA ${your_schema};

Databricks のテーブル種類を管理方法の視点で分類すると **マネージドテーブル** と **アンマネージドテーブル** に分類されます。

**[マネージドテーブル](https://learn.microsoft.com/ja-jp/azure/databricks/tables/managed)**  
- データの保存先が Databricks によって管理されるテーブルです。
- 利用者はデータの保存先のストレージやパスを意識する必要はありません。
- ストレージは管理者によって構成されますがパスは Databricks によって決められておりカスタムはできません。
- マネージドテーブルを削除すると同時にデータも削除されます（[実際の削除はシステム側で30日以内に処理されます](https://learn.microsoft.com/ja-jp/azure/databricks/tables/managed#drop-a-managed-table)）。

**[アンマネージドテーブル（外部テーブル）](https://learn.microsoft.com/ja-jp/azure/databricks/tables/external)**  
- データの保存先のストレージやパスはテーブル定義時にユーザーが指定します。
- Databricks 上でのアンマネージドテーブルの削除はメタデータの削除のみを指しストレージ上のデータ実体はそのまま残ります（削除する場合はストレージに対する直接操作として削除する必要があります）。

**マネージドテーブル or アンマネージドテーブル（外部テーブル）**  
- Databricks としての推奨かつ既定はマネージドテーブルです。
- Databricks の利用者は本来 ストレージの場所 と パス を意識する必要がありませんが、アンマネージドテーブルはテーブルを作成する利用者に対してストレージの場所とパスを公開する必要があります。また該当のストレージに対しては Databricks（Unity Catalog）でのアクセス制御の管理外となるためガバナンス観点で望ましくありません。
- 機能がフルサポートされるマネージドテーブルに対してアンマネージドテーブルについては一部機能がサポートされないもしくはサポートが遅れるケースがある点に留意が必要です。

## マネージドテーブル

マネージドテーブルを作成します。

特にオプションを指定することなくテーブルを作成すると Delta フォーマットのマネージドテーブルが作成されます。

In [0]:
CREATE OR REPLACE TABLE managed_table (width INT, length INT, height INT);

INSERT INTO managed_table VALUES (3, 2, 1);

SELECT * FROM managed_table;

テーブルのメタデータを確認してみましょう。

- **`Type`** ：テーブルの種類が確認できます
- **`Provider`** ：テーブルのフォーマットが確認できます
- **`Location`** ：テーブルデータが格納されているストレージパスを確認できます（Databricks によって管理されたパス）

In [0]:
DESCRIBE EXTENDED managed_table;

テーブルデータが格納されているストレージパスをリストしてみましょう。

**注意** ：本来はストレージパスを意識する必要はなく直接アクセスは制限されるべきですが、レガシー（利用非推奨）である hive_metastore ではストレージパスのリストが可能です（後継の Unity Catalog ではサポートされない操作です）。

In [0]:
%python 
tbl_location = spark.sql("DESCRIBE DETAIL managed_table").first().location

# マネージドディレクトリに対するリストコマンドの実行は Unity Catalog では未サポート
files = dbutils.fs.ls(tbl_location)
display(files)

次にテーブルを削除し、ストレージパス上からテーブルに関連するデータも削除されることを確認してみましょう。

In [0]:
DROP TABLE managed_table;

In [0]:
%python 

# マネージドディレクトリに対するリストコマンドの実行は Unity Catalog では未サポート
files = dbutils.fs.ls(f"dbfs:/user/hive/warehouse/{your_schema}.db/")
display(files)

該当テーブルが格納されていた managed_table ディレクトリが削除されていることがわかります。 

なお、Unity Catalog でのデータ削除のタイミングはテーブル削除から 30 日以内にシステム側で処理されます。  

## アンマネージドテーブル（外部テーブル）

次に外部テーブルを作成し同じ確認をしてみましょう。

In [0]:
CREATE OR REPLACE TABLE unmanaged_table (width INT, length INT, height INT)
   LOCATION "dbfs:/user/hive/warehouse/${your_schema}.db/user_managed_path/unmanaged_table";

INSERT INTO unmanaged_table VALUES (3, 2, 1);

SELECT * FROM unmanaged_table;

テーブルのメタデータを確認してみましょう。

- **`Type`** ：テーブルの種類が確認できます
- **`Provider`** ：テーブルのフォーマットが確認できます
- **`Location`** ：テーブルデータが格納されているストレージパスを確認できます（テーブル作成時にユーザーが指定した自由なパス）

In [0]:
DESCRIBE EXTENDED unmanaged_table;

テーブルデータが格納されているストレージパスをリストしてみましょう。

**注意** ：本来はストレージパスを意識する必要はなく直接アクセスは制限されるべきですが、レガシー（利用非推奨）である hive_metastore ではストレージパスのリストが可能です（後継の Unity Catalog ではサポートされない操作です）。

In [0]:
%python 
tbl_location = spark.sql("DESCRIBE DETAIL unmanaged_table").first().location

# マネージドディレクトリに対するリストコマンドの実行は Unity Catalog では未サポート
files = dbutils.fs.ls(tbl_location)
display(files)

次にテーブルを削除し、ストレージパス上からテーブルに関連するデータも削除されることを確認してみましょう。

In [0]:
DROP TABLE unmanaged_table;

In [0]:
%python 

# マネージドディレクトリに対するリストコマンドの実行は Unity Catalog では未サポート
files = dbutils.fs.ls(f"dbfs:/user/hive/warehouse/{your_schema}.db/user_managed_path/unmanaged_table/")
display(files)

該当テーブルが格納されていた unmanaged_table ディレクトリがそのまま残っていることがわかります。 

テーブルに関連する実データを削除するにはユーザーが明示的にストレージに対して直接操作する必要があります。

In [0]:
%python
dbutils.fs.rm(f"dbfs:/user/hive/warehouse/{your_schema}.db/user_managed_path/", True)
files = dbutils.fs.ls(f"dbfs:/user/hive/warehouse/{your_schema}.db/")
display(files)

## テーブルの種類

Databricks のテーブルは管理視点からマネージドテーブルとアンマネージドテーブルに分類されることを解説しましたが、用途視点での分類として以下があります。

詳細は後述としここでは紹介のみとします。

- テーブル
  - 汎用 Delta Table
  - ストリーミング用 Delta Table
  - 非 Delta Table
- ビュー
  - ビュー
  - マテリアライズドビュー
  - 一時ビュー

![](https://learn.microsoft.com/ja-jp/azure/databricks/_static/images/unity-catalog/object-model-table-and-view.png)

**参考** ：[テーブルとビュー](https://learn.microsoft.com/ja-jp/azure/databricks/tables/)  
**参考** ：[差分テーブル、ストリーミング テーブル、具体化されたビューの違い](https://learn.microsoft.com/ja-jp/azure/databricks/tables/#differences-between-delta-tables-streaming-tables-and-materialized-views)

# [Option] 3. Delta Lakeへのデータの読み込み

## 完全な上書き

テーブル内のすべてのデータを原子的に置き換えるために上書きを使用できます。テーブルを削除して再作成する代わりにテーブルを上書きすることには、複数の利点があります：
- テーブルを上書きする方がはるかに高速です。ディレクトリを再帰的にリストアップしたり、ファイルを削除したりする必要がないためです。
- テーブルの古いバージョンはまだ存在し、タイムトラベルを使用して簡単に古いデータを取得できます。
- CTAS はトランザクションであるためターゲットテーブルに対する同時クエリはブロックされることなくコミット済み最新データを読み取ることができます（削除の場合は再作成するまで元データが一時的に存在しない期間が発生します）。
- ACIDトランザクションの保証により、テーブルの上書きに失敗した場合、テーブルは以前の状態になります。

Spark SQLは完全な上書きを実行するための2つの簡単な方法を提供しています。

- CREATE TABLE AS SELECT（いわゆる CTAS）
- INSERT OVERWRITE

In [0]:
DROP TABLE IF EXISTS events;

CREATE OR REPLACE TABLE events AS SELECT * FROM parquet.`${sample_dataset_path}/ecommerce/raw/events-historical`;

SELECT * FROM events LIMIT 10;

**`INSERT OVERWRITE`** は **`CTAS`** と同様にターゲットテーブル内のデータはクエリからのデータで置き換えられます。
ただし同一のスキーマ定義をもつデータソースからの上書きのみに対応し CTAS のように新しいテーブルを作成することはできません（スキーマが変更されないため下流のコンシューマに対しては「安全」と言えます。

In [0]:
INSERT OVERWRITE events SELECT * FROM parquet.`${sample_dataset_path}/ecommerce/raw/events-historical`;

SELECT * FROM events LIMIT 10;

テーブルのトランザクション履歴を確認してみましょう。

In [0]:
DESCRIBE HISTORY events

CTAS も INSERT OVERWRITE も単一のトランザクションであることがわかります。

## 行の追加

**`INSERT INTO`** を使用して、既存のDeltaテーブルに新しい行を原子的に追加できます。これにより、毎回上書きするよりも効率的な既存のテーブルへの増分更新が可能になります。

In [0]:
DROP TABLE IF EXISTS sales;

CREATE OR REPLACE TABLE sales AS
  SELECT * FROM parquet.`${sample_dataset_path}/ecommerce/raw/sales-historical`;

SELECT * FROM sales LIMIT 10;

このテーブルを構成するデータファイルを確認してみましょう。

In [0]:
SELECT count(*), _metadata.file_path FROM sales GROUP BY _metadata.file_path;

このテーブルに新しいデータを追加しあらためてデータファイルを確認してみましょう。

In [0]:
INSERT INTO sales SELECT * FROM parquet.`${sample_dataset_path}/ecommerce/raw/sales-30m`;
SELECT count(*), _metadata.file_path FROM sales GROUP BY _metadata.file_path;

**`INSERT INTO`** には、同じレコードを複数回挿入するのを防ぐための組み込みの保証がないことに注意してください。上記セルと同じコマンドを再実行すると同じレコードがターゲットテーブルに書き込まれ重複したレコードが生成されます。

In [0]:
INSERT INTO sales SELECT * FROM parquet.`${sample_dataset_path}/ecommerce/raw/sales-30m`;
SELECT count(*), _metadata.file_path FROM sales GROUP BY _metadata.file_path;

## 増分的な読み込み

**`COPY INTO`** は冪等性をサポートする増分読み込み機能です。

In [0]:
CREATE OR REPLACE TABLE sales AS
  SELECT * FROM parquet.`${sample_dataset_path}/ecommerce/raw/sales-historical`;

SELECT count(*), _metadata.file_path FROM sales GROUP BY _metadata.file_path;

このテーブルに新しいデータを追加しあらためてデータファイルを確認してみましょう。

In [0]:
COPY INTO sales FROM "${sample_dataset_path}/ecommerce/raw/sales-30m" FILEFORMAT = PARQUET;

SELECT count(*), _metadata.file_path FROM sales GROUP BY _metadata.file_path;

**`COPY INTO`** は冪等性をサポートするため同じデータファイルが重複して読み込まれることがないことを保証しています。

例として上記セルと同じコマンドを再実行してみましょう。

In [0]:
COPY INTO sales FROM "${sample_dataset_path}/ecommerce/raw/sales-30m" FILEFORMAT = PARQUET;

SELECT count(*), _metadata.file_path FROM sales GROUP BY _metadata.file_path;


同じデータファイルが重複して読み込まれることがないことが確認できました。

この冪等性は、以下の方法で保証されています。

**ファイルの追跡**: COPY INTO コマンドは、入力ファイルの詳細を Delta テーブルのログディレクトリ内に保存します。この情報は RocksDB というキー・バリュー・ストアに格納されます。  
**重複除去ロジック**: 次回 COPY INTO コマンドが同じテーブルに対して実行されると、まず RocksDB からデータが読み込まれ、入力ファイルと比較されます。この比較により、既にロードされたファイルはスキップされます。  
**強制オプション**: COPY_OPTIONS の force パラメータを true に設定すると、冪等性が無効化され、以前にロードされたファイルも再度ロードされます。  

**参考** ：[How is Idempotency ensured for COPY INTO command](https://community.databricks.com/t5/machine-learning/how-is-idempotency-ensured-for-copy-into-command/td-p/19795)

## マージ更新

**`MERGE`** 使用して、ソーステーブル、ビュー、またはDataFrameからターゲットテーブルにデータをアップサートできます。 

<strong><code>
MERGE INTO target a<br/>
USING source b<br/>
ON {merge_condition}<br/>
WHEN MATCHED THEN {matched_action}<br/>
WHEN NOT MATCHED THEN {not_matched_action}<br/>
</code></strong>

例として 新しいユーザーもしくはメールアドレスが更新された既存ユーザーの情報を元のマスターテーブルに対して **`MERGE`** を利用し更新します。


マスターテーブルを定義します。このテーブルは MERGE におけるターゲットテーブルとなります。

In [0]:
DROP TABLE IF EXISTS users_pii;
CREATE OR REPLACE TABLE users_pii
COMMENT "Contains PII"
AS
  SELECT
    *, 
    cast(cast(user_first_touch_timestamp/1e6 AS TIMESTAMP) AS DATE) first_touch_date, 
    current_timestamp() updated,
    input_file_name() source_file
  FROM parquet.`${sample_dataset_path}/ecommerce/raw/users-historical/`;
  
SELECT * FROM users_pii LIMIT 10;

ターゲットテーブルを構成するデータファイルとデータファイルごと格納されているレコード件数を確認してみます。

In [0]:
SELECT COUNT(*) AS count, source_file FROM users_pii GROUP BY source_file

次に 新しいユーザー情報 もしくは メールアドレスが更新された既存ユーザー情報 をもつテーブルを作成します。このテーブルは MERGE におけるソーステーブルとなります。

In [0]:
CREATE OR REPLACE TEMP VIEW users_update 
AS 
  SELECT 
    *,
    cast(cast(user_first_touch_timestamp/1e6 AS TIMESTAMP) AS DATE) first_touch_date, 
    current_timestamp() AS updated,
    input_file_name() source_file
  FROM parquet.`${sample_dataset_path}/ecommerce/raw/users-30m`;

SELECT * FROM users_update LIMIT 10;

同様にソーステーブルを構成するデータファイルとデータファイルごと格納されているレコード件数を確認してみます。

In [0]:
SELECT COUNT(*) AS count, source_file FROM users_update GROUP BY source_file

**`MERGE`** の主な利点：
* 更新、挿入、削除が1つのトランザクションとして完了します
* マッチングフィールドに加えて複数の条件を追加できます
* カスタムロジックを実装するための幅広いオプションを提供します

以下の MERGE は ターゲットテーブル と ソーステーブル のマージ処理を行います。
- user_id が一致する場合：ターゲットテーブル内の email が登録されていない かつ ソーステーブル内の email が登録 されている場合はソーステーブル内の email をターゲットテーブル内の email として登録します。
- user_id が一致しない場合：ソーステーブル内ユーザー情報（全列）をターゲットテーブルに登録します。

In [0]:
MERGE INTO users_pii a
USING users_update b
ON a.user_id = b.user_id
WHEN MATCHED AND a.email IS NULL AND b.email IS NOT NULL THEN
  UPDATE SET email = b.email, updated = b.updated
WHEN NOT MATCHED THEN
  INSERT *

MERGE 後のソーステーブルのデータファイルとデータファイルごと格納されているレコード件数を確認してみましょう。
最初の結果に比較して新たなソースファイルが追加されていることと、各ソースファイル内の有効なデータ件数が変わっていることに注目してください。

In [0]:
SELECT COUNT(*) AS count, source_file FROM users_pii GROUP BY source_file

この関数の動作を **`MATCHED`** および **`NOT MATCHED`** の両条件に対して明示的に指定しています。ここで示されている例は適用できるロジックの一例であり、 **`MERGE`** のすべての動作を示すものではありません。

**参考** ：[マージを使用した Delta Lake テーブルへの upsert](https://learn.microsoft.com/ja-jp/azure/databricks/delta/merge)  
**参考** ：[MERGE INTO](https://learn.microsoft.com/ja-jp/azure/databricks/sql/language-manual/delta-merge-into)

In [0]:
-- SELECT count(*), _metadata.file_path, source_file FROM users_pii GROUP BY _metadata.file_path, source_file;

# [Option] 4. Delta テーブルのテクニック

## CTAS によるテーブル作成

In [0]:
USE CATALOG ${your_catalog};
USE SCHEMA ${your_schema};

**`CREATE TABLE AS SELECT`** ステートメントは、入力クエリから取得したデータを使用してDeltaテーブルを作成し、データを追加します。

In [0]:
DROP TABLE IF EXISTS sales;
CREATE OR REPLACE TABLE sales AS
SELECT * FROM parquet.`${sample_dataset_path}/ecommerce/raw/sales-historical`;

SELECT * FROM sales LIMIT 10;

列名を変更したり対象列を省略したりするような変換はテーブルの作成中に簡単に行うことができます。

次の CTAS ステートメントは、 **`sales`** テーブルからの列のサブセットを含む新しいテーブルを作成します。
ここでは、意図的にユーザーを特定する可能性のある情報や個別の購入詳細を提供する情報を省略していると仮定します。また、下流のシステムがソースデータと異なる命名規則を持っていると仮定し、フィールド名を変更します。

In [0]:
DROP TABLE IF EXISTS purchases;
CREATE OR REPLACE TABLE purchases AS
SELECT order_id AS id, transaction_timestamp, purchase_revenue_in_usd AS price
FROM sales;

SELECT * FROM purchases LIMIT 10;

要件に応じてビューを使用することでも似たことを達成できます。

In [0]:
DROP VIEW IF EXISTS purchases_vw;
CREATE OR REPLACE VIEW purchases_vw AS
SELECT order_id AS id, transaction_timestamp, purchase_revenue_in_usd AS price
FROM sales;

SELECT * FROM purchases_vw LIMIT 10;

このように便利な CTAS ですが、スキーマ推論はサポートされていないためスキーマが明示的に定義されたソース（Parquet や テーブル）からの外部データ取り込みに適しています。

また追加のファイルオプション指定もサポートされないため CSV ファイルなどスキーマが明示されていないデータの取り込みには不向きです。

In [0]:
DROP TABLE IF EXISTS sales_unparsed;
CREATE OR REPLACE TABLE sales_unparsed AS
SELECT * FROM csv.`${sample_dataset_path}/ecommerce/raw/sales-csv`;

SELECT * FROM sales_unparsed LIMIT 10;

上記のように CSV に対する CTAS は期待通りには動作しません。

CSV ファイルなどスキーマが明示されていないデータを取り込む際には別の手段を利用します。

In [0]:
CREATE OR REPLACE TEMP VIEW sales_tmp_vw
  (order_id LONG, email STRING, transactions_timestamp LONG, total_item_quantity INTEGER, purchase_revenue_in_usd DOUBLE, unique_items INTEGER, items STRING)
USING CSV
OPTIONS (
  path = "${sample_dataset_path}/ecommerce/raw/sales-csv",
  header = "true",
  delimiter = "|"
);

DROP TABLE IF EXISTS sales_delta;
CREATE OR REPLACE TABLE sales_delta AS
  SELECT * FROM sales_tmp_vw;
  
SELECT * FROM sales_delta LIMIT 10;

## 生成列を使用したスキーマ定義

生成列は、Deltaテーブル内の他の列に基づいて自動的に生成される値を持つ列です。

例えば上で作成した Sales テーブルの transactions_timestamp 列は UNIX time として格納されているため扱いづらいため生成列として日付列を追加します。

**参考** ：[Delta Lake で生成された列](https://learn.microsoft.com/ja-jp/azure/databricks/delta/generated-columns)

In [0]:
DROP TABLE IF EXISTS purchase_dates;
CREATE OR REPLACE TABLE purchase_dates (
  id STRING, 
  transaction_timestamp STRING, 
  price STRING,
  date DATE GENERATED ALWAYS AS (
    cast(cast(transaction_timestamp/1e6 AS TIMESTAMP) AS DATE))
    COMMENT "generated based on `transactions_timestamp` column")

生成列をもつ新しいテーブル（ **`purchase_dates`** ）に対して既にデータを保持する元テーブル（**`purchase`**）をロードします。

その手段として MERGE を利用できます。INSERT INTO はスキーマが一致していることが条件ですが MERGE には SCHEMA EVOLUTION という機能があり、これによりデータだけではなくスキーマについてもマージを行うことができます。

**参考** ：[スキーマの展開を有効にする](https://learn.microsoft.com/ja-jp/azure/databricks/delta/update-schema#enable-schema-evo)

In [0]:
-- INSERT INTO はロード先とロード元のスキーマが一致する必要があり SCHEMA EVOLUTION も未サポートです。
-- INSERT INTO purchase_dates SELECT * FROM purchases;

In [0]:
MERGE WITH SCHEMA EVOLUTION INTO purchase_dates a
  USING purchases b ON a.id = b.id
  WHEN NOT MATCHED THEN
    INSERT *
;

SELECT * FROM purchase_dates LIMIT 10;

もちろんデータを個別に INSERT することも可能です。

In [0]:
INSERT INTO purchase_dates (id, transaction_timestamp, price )VALUES (1, 1592237293397767, 1.0);

SELECT * FROM purchase_dates WHERE id = 1;

ただし、生成列の値が指定されている場合は生成列定義で導出される値と完全に一致しない場合エラーとなります。

以下のセルをコメント解除して実行することで、このエラーを確認できます。

In [0]:
-- INSERT INTO purchase_dates VALUES (2, 1592237293397767, 2.0, "2020-06-18");

In [0]:
INSERT INTO purchase_dates VALUES (2, 1592237293397767, 2.0, cast(cast(1592237293397767/1e6 AS TIMESTAMP) AS DATE));

SELECT * FROM purchase_dates WHERE id = 2;

## テーブル制約の追加

テーブルへの追加されるデータに対する制約を定義することでデータの品質と整合性を確保することができます。

Databricks は現時点で２つのタイプの制約をサポートしています:
* <a href="https://learn.microsoft.com/ja-jp/azure/databricks/tables/constraints#not-null-constraint" target="_blank">**`NOT NULL`** 制約</a>
* <a href="https://learn.microsoft.com/ja-jp/azure/databricks/tables/constraints#check-constraint" target="_blank">**`CHECK`** 制約</a>

どちらの場合も、制約を定義する際に既にデータが存在する場合は既存データに対する違反チェックがなされ違反する場合は制約定義エラーとなります。


In [0]:
-- ALTER TABLE purchase_dates ADD CONSTRAINT valid_date CHECK (date > '2025-03-01');


以下では、テーブルの **`date`** 列に **`CHECK`** 制約を追加します。 **`CHECK`** 制約は、データセットをフィルタリングするために使用するかもしれない標準の **`WHERE`** 句のように見えます。

In [0]:
ALTER TABLE purchase_dates ADD CONSTRAINT valid_date CHECK (date > '2020-01-01');

定義した制約はメタデータの **`TBLPROPERTIES`** フィールドから確認可能です。

In [0]:
DESCRIBE EXTENDED purchase_dates

In [0]:
-- INSERT INTO purchase_dates (id, transaction_timestamp, price )VALUES (3, 1572237293397767, 3.0);

## テーブルへのメタデータの追加

**`SELECT`** 句では、ファイルの取り込みに役立つ2つの組み込み関数が提供されています。
* **`current_timestamp()`** ：ロジックが実行されたタイムスタンプを記録します
* **`input_file_name()`** ：テーブル内の各レコードのソースデータファイルを記録します

これらのメタデータは「レコードがいつ挿入されたか」、「どこから来たか」を理解するのに役立つ情報を提供します。これは、ソースデータの問題をトラブルシューティングする必要がある場合に特に役立ちます。

またコメントを付加することもできます。

In [0]:
DROP TABLE IF EXISTS users_pii;
CREATE OR REPLACE TABLE users_pii
COMMENT "Contains PII"
AS
  SELECT *, 
    cast(cast(user_first_touch_timestamp/1e6 AS TIMESTAMP) AS DATE) first_touch_date, 
    current_timestamp() updated,
    input_file_name() source_file
  FROM parquet.`${sample_dataset_path}/ecommerce/raw/users-historical/`;
  
SELECT * FROM users_pii LIMIT 10;

メタデータの利用例としてソースファイルごと現在有効なレコード件数を確認してみましょう。

In [0]:
SELECT COUNT(*) AS count, source_file FROM users_pii GROUP BY source_file

## Delta Lakeテーブルのクローン

**はじめに**：学習観点のわかりやすさのため hive_metastore を利用するためにコンテキストを切り替えます

In [0]:
USE CATALOG hive_metastore;
CREATE SCHEMA IF NOT EXISTS ${your_schema};
USE SCHEMA ${your_schema};

Delta LakeにはDelta Lakeテーブルを効率的にコピーするための2つのオプションがあります。

- **`DEEP CLONE`** ：ソーステーブルからターゲットテーブルにデータとメタデータを完全にコピーします。 このコピーは段階的に行われるので、このコマンドを再度実行するとソースからターゲットの場所に変更を同期できます。  

- **`SHALLOW CLONE`** ：ソーステーブルからターゲットテーブルにトランザクションログのみコピーしデータは移動しません。`DEEP CLONE` はすべてのデータファイルをコピーする必要があるため、大きなデータセットの場合、この処理は、長時間がかかる場合があります。現在のテーブルを変更してしまうリスクを冒さずに変更の適用を試し、簡単にテーブルのコピーを作成したいときは `SHALLOW CLONE` が便利です。

いずれも、クローンに対するデータ変更はソースとは分岐して別に履歴が管理されます。

**参考** ：[Azure Databricks でテーブルを複製する](https://learn.microsoft.com/ja-jp/azure/databricks/delta/clone)  
**参考** ：[CREATE TABLE CLONE](https://learn.microsoft.com/ja-jp/azure/databricks/sql/language-manual/delta-clone)

それでは実際に動作を見てみましょう。まずはデモ用のテーブルを作成します。

In [0]:
DROP TABLE IF EXISTS source_table;
CREATE OR REPLACE TABLE source_table (id INT, name STRING, engine STRING);
INSERT INTO source_table VALUES (1, 'LAND CRUISER 70',  '1VD-FTV');
INSERT INTO source_table VALUES (2, 'LAND CRUISER 250', '1GD-FTV');
SELECT * FROM source_table ORDER BY id;

In [0]:
%python
tbl_location = spark.sql("DESCRIBE DETAIL source_table").first().location
display(dbutils.fs.ls(tbl_location))

データファイルは２つ確認できます。

In [0]:
%python
display(dbutils.fs.ls(tbl_location + '/_delta_log'))

In [0]:
DESCRIBE HISTORY source_table;

トランザクションは３つ確認できます。

### DEEP CLONE
`DEEP CLONE` は、ソーステーブルからターゲットテーブルにデータとメタデータを完全にコピーします。 このコピーは段階的に行われるので、このコマンドを再度実行するとソースからターゲットの場所に変更を同期できます。

In [0]:
DROP TABLE IF EXISTS deep_clone_table;
CREATE OR REPLACE TABLE deep_clone_table DEEP CLONE source_table;

In [0]:
%python
tbl_location = spark.sql("DESCRIBE DETAIL deep_clone_table").first().location
display(dbutils.fs.ls(tbl_location))

DEEP CLONE は完全なコピーであるためデータファイルは同じく３つです。

In [0]:
%python
display(dbutils.fs.ls(tbl_location + '/_delta_log'))

In [0]:
DESCRIBE HISTORY deep_clone_table;

バージョンはクローン元とは別に新たに管理されていることがわかります。

In [0]:
INSERT INTO deep_clone_table VALUES (3, 'LAND CRUISER 300', 'F33A-FTV');
SELECT * FROM deep_clone_table ORDER BY id;

In [0]:
SELECT * FROM source_table ORDER BY id;

### SHALLOW CLONE
ソーステーブルからターゲットテーブルにトランザクションログのみコピーしデータは移動しません。`DEEP CLONE` はすべてのデータファイルをコピーする必要があるため、大きなデータセットの場合、この処理は、長時間がかかる場合があります。現在のテーブルを変更してしまうリスクを冒さずに変更の適用を試し、簡単にテーブルのコピーを作成したいときは `SHALLOW CLONE` が便利です。

In [0]:
DROP TABLE IF EXISTS shallow_clone_table;
CREATE OR REPLACE TABLE shallow_clone_table SHALLOW CLONE source_table;

In [0]:
%python
tbl_location = spark.sql("DESCRIBE DETAIL shallow_clone_table").first().location
display(dbutils.fs.ls(tbl_location))

DEEP CLONE とは異なり SHALLOW CLONE の場合はデータファイルがコピーされていないことがわかります。

In [0]:
%python
display(dbutils.fs.ls(tbl_location + '/_delta_log'))

In [0]:
DESCRIBE HISTORY shallow_clone_table;

DEEP CLONE と同様に SHALLOW CLONE においてもバージョンはクローン元とは別に新たに管理されていることがわかります。

In [0]:
INSERT INTO shallow_clone_table VALUES (3, 'LAND CRUISER 300', 'F33A-FTV');
SELECT * FROM shallow_clone_table ORDER BY id;

In [0]:
%python
tbl_location = spark.sql("DESCRIBE DETAIL shallow_clone_table").first().location
display(dbutils.fs.ls(tbl_location))

シャロークローンに対して追加された分のみデータファイルが作成されています。

### クローン元テーブルの削除とその影響

In [0]:
DROP TABLE source_table;

DEEP CLONE はクローン元から独立するためクローン元が削除されても影響がありません。

In [0]:
SELECT * FROM deep_clone_table ORDER BY id;

一方で SHALLOW CLONE はクローン元に依存するためクローン元が削除されると機能しません。

**留意** ：結果が返ってくる場合はキャッシュが効いている可能性があります。時間に余裕があるときにクラスタを再起動したのちに再度確認してみてください。 

In [0]:
-- SELECT * FROM shallow_clone_table ORDER BY id;

# クリーンアップ

In [0]:
USE CATALOG hive_metastore;
DROP SCHEMA IF EXISTS ${your_schema} CASCADE;

In [0]:
%python
dbutils.widgets.remove("sample_dataset_path")
dbutils.widgets.remove("your_catalog")
dbutils.widgets.remove("your_schema")