# Bronze層への新規データ追加（Auto Loader）

このノートブックでは、Databricks Auto Loaderを使用して新規CSVファイルを自動的にBronze層に取り込みます。

## 目的
- **増分データ取り込み**: 新規ファイルの自動検出・処理
- **スキーマ管理**: 自動スキーマ推論と進化
- **ストリーミング処理**: リアルタイムデータパイプライン

## Auto Loaderの特徴
- **自動ファイル検出**: 新規ファイルを自動的に発見
- **スキーマ推論**: データ型の自動判定
- **チェックポイント機能**: 処理状況の永続化

## 処理フロー
1. Volume内の新規CSVファイル監視
2. スキーマ推論とバリデーション
3. Bronze層Deltaテーブルへの追加


In [0]:
from pyspark.sql.functions import input_file_name

catalog_name = "users"
schema_name = "yukiteru_koide"
base_path = "/Volumes/users/yukiteru_koide/yukiterumart_etl_newdata"

# CatalogとSchemaを選択
spark.sql(f"USE CATALOG {catalog_name}")
spark.sql(f"USE SCHEMA {schema_name}")


In [0]:
from pyspark.sql.functions import col

# Auto LoaderでCSVファイルを読み込み
bronze_stream = (
    spark.readStream.format("cloudFiles")
    .option("cloudFiles.format", "csv")
    .option("header", "true")
    .option("inferSchema", "true")
    .option("cloudFiles.schemaLocation", f"{base_path}/_schema/bronze")  # schema log保存先を指定
    .load(base_path)
    .withColumn("source_file", col("_metadata.file_path"))
)

# BronzeテーブルにDeltaとして書き出し
bronze_stream.writeStream.format("delta") \
    .outputMode("append") \
    .option("checkpointLocation", f"{base_path}/_checkpoints/bronze") \
    .trigger(once=True) \
    .table(f"{catalog_name}.{schema_name}.bronze_transactions")