# Auto Loader (Blob -> Bronze Delta)

이 노트북은 **Storage Account Key**로 Blob 경로를 읽고, Auto Loader로 **bronze Delta 테이블**에 적재합니다.

- Serverless/Unity Catalog 환경에서는 `spark.conf.set(fs.azure.account.key...)`가 막힐 수 있습니다.
- 이 방식은 보통 **클래식(All-purpose) 클러스터**에서 동작합니다.


In [0]:
# =========================
# 0) Storage 인증 설정 (Account Key 방식)
# =========================
# - Azure Storage Account Key를 Databricks Secret으로 저장해둔 전제
# - 키를 출력하지 마세요.

storage_account = "team2blobstorage"
container = "functoblob-data"

# Secret scope/key 이름은 환경에 맞게 유지
account_key = dbutils.secrets.get(scope="blob-scope", key="storage-key")

# ADLS (DFS endpoint)용 설정
# - 아래 두 줄 중 하나만으로도 되는 경우가 많지만, 런타임/라이브러리 차이로 둘 다 넣어두는 편이 안전합니다.
spark.conf.set(f"fs.azure.account.key.{storage_account}.dfs.core.windows.net", account_key)
spark.conf.set(f"spark.hadoop.fs.azure.account.key.{storage_account}.dfs.core.windows.net", account_key)

# 데이터 루트 경로 (읽기 대상)
raw_path = "abfss://functoblob-data@team2blobstorage.dfs.core.windows.net/fall-detection/"

print("Configured storage auth. raw_path =", raw_path)

Configured storage auth. raw_path = abfss://functoblob-data@team2blobstorage.dfs.core.windows.net/fall-detection/


In [0]:
# =========================
# 1) 연결 확인 (ls)
# =========================
# 여기서 파일/폴더 목록이 떠야 다음 단계가 의미 있습니다.
dbutils.fs.ls(raw_path)

[FileInfo(path='abfss://functoblob-data@team2blobstorage.dfs.core.windows.net/fall-detection/2025/', name='2025/', size=0, modificationTime=0)]

In [0]:
# test_1 =========================
# Auto Loader -> Bronze Delta Table
# (JSON Raw Ingestion)
# =========================

from pyspark.sql.functions import current_timestamp, col

# -------------------------------------------------
# Storage 경로 설정
# -------------------------------------------------
storage_account = "team2blobstorage"      # 예: team2blobstorage
container = "functoblob-data"

# Raw JSON이 실제로 쌓이는 위치 (중요)
raw_path = f"abfss://{container}@{storage_account}.dfs.core.windows.net/fall-detection/"

# Autoloader 메타데이터 저장 위치
schema_location = f"abfss://{container}@{storage_account}.dfs.core.windows.net/_schemas/fall_detection/"
checkpoint_location = f"abfss://{container}@{storage_account}.dfs.core.windows.net/_checkpoints/fall_detection/"

# Bronze 테이블명
target_table = "bronze_fall_detection_raw"

# -------------------------------------------------
# Auto Loader Read (Bronze)
# -------------------------------------------------
bronze_df = (
    spark.readStream
        .format("cloudFiles")
        .option("cloudFiles.format", "json")
        .option("cloudFiles.schemaLocation", schema_location)
        .option("cloudFiles.schemaEvolutionMode", "addNewColumns")
        .option("cloudFiles.inferColumnTypes", "false")
        .load(raw_path)
        # 메타 컬럼 (운영/디버깅용)
        .withColumn("_ingest_time", current_timestamp())
        .withColumn("_source_file", col("_metadata.file_path"))
)

# -------------------------------------------------
# Write to Delta (Bronze)
# -------------------------------------------------
query = (
    bronze_df.writeStream
        .format("delta")
        .outputMode("append")
        .option("checkpointLocation", checkpoint_location)
        .toTable(target_table)
)

query


<pyspark.sql.streaming.query.StreamingQuery at 0x7fcca47a5b50>

In [0]:
%sql
DESCRIBE DETAIL bronze_fall_detection_raw;

format,id,name,description,location,createdAt,lastModified,partitionColumns,clusteringColumns,numFiles,sizeInBytes,properties,minReaderVersion,minWriterVersion,tableFeatures,statistics,clusterByAuto
delta,91e9019b-2494-4ab6-bd62-de141c3ced35,team2_databricks.default.bronze_fall_detection_raw,,abfss://unity-catalog-storage@dbstoraged5ovj3boifkq2.dfs.core.windows.net/2608286707928203/__unitystorage/catalogs/e6462f53-3ba6-4c6a-992e-3d8b8590a0d9/tables/bc8f3c16-bb8a-4382-92c8-272d6f34d580,2025-12-22T01:36:25.835Z,2025-12-22T01:36:28Z,List(),List(),1,4386,"Map(delta.parquet.compression.codec -> zstd, delta.enableDeletionVectors -> true, delta.writePartitionColumnsToParquet -> true, delta.enableRowTracking -> true, delta.rowTracking.materializedRowCommitVersionColumnName -> _row-commit-version-col-58928af2-f038-4161-97fe-764be2ac0d59, delta.rowTracking.materializedRowIdColumnName -> _row-id-col-ecd157af-28e6-454e-93fc-10a8f5dc08a4)",3,7,"List(appendOnly, deletionVectors, domainMetadata, invariants, rowTracking)","Map(numRowsDeletedByDeletionVectors -> 0, numDeletionVectors -> 0)",False


In [0]:
# test_2 =========================
# 2) Auto Loader -> Bronze Delta Table
# =========================
from pyspark.sql.functions import current_timestamp

# Auto Loader가 스키마/체크포인트를 저장할 위치 (Blob 안에 두는 방식)
schema_location = f"abfss://{container}@{storage_account}.dfs.core.windows.net/schemas/fall_detection/"
checkpoint_location = f"abfss://{container}@{storage_account}.dfs.core.windows.net/checkpoints/fall_detection/"

# 적재할 테이블명
# - 카탈로그/스키마를 명시하지 않으면, '현재 카탈로그/스키마'에 생성됩니다.
target_table = "bronze_fall_detection"

bronze_df = (
    spark.readStream
        .format("cloudFiles")
        .option("cloudFiles.format", "json")
        .option("cloudFiles.schemaLocation", schema_location)
        .option("cloudFiles.schemaEvolutionMode", "addNewColumns")
        .load(raw_path)
        .withColumn("ingest_time", current_timestamp())
)

query = (
    bronze_df.writeStream
        .format("delta")
        .option("checkpointLocation", checkpoint_location)
        .outputMode("append")
        .toTable(target_table)
)

query

<pyspark.sql.streaming.query.StreamingQuery at 0x7f3dd9f6ddc0>

In [0]:
# =========================
# 3) 테이블 생성/위치 확인
# =========================
# 현재 카탈로그/스키마 확인
spark.sql("SELECT current_catalog(), current_schema()").show(truncate=False)

# 테이블 상세(저장 위치, 포맷 등)
spark.sql("DESCRIBE DETAIL bronze_fall_detection_raw").show(truncate=False)

+-----------------+----------------+
|current_catalog()|current_schema()|
+-----------------+----------------+
|team2_databricks |default         |
+-----------------+----------------+

+------+------------------------------------+--------------------------------------------------+-----------+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+-----------------------+-------------------+----------------+-----------------+--------+-----------+------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+----------------+--------------

In [0]:
# =========================
# 4) 배치로 조회 (샘플)
# =========================
df = spark.table("bronze_fall_detection_raw")
display(df.limit(50))

_ingestedAt,device,sensor,timestamp,type,_rescued_data,_ingest_time,_source_file
2025-12-22T01:20:17.743020+00:00,"{""deviceId"":""watch-001"",""model"":""galaxy-watch"",""os"":""wearos"",""appVersion"":""1.0.0""}","[{""ax"":0.08,""ay"":-0.2,""az"":9.7,""gx"":0.02,""gy"":0.01,""gz"":-0.03}]",2025-12-22T03:00:00.000Z,user_cancelled,,2025-12-22T01:23:11.814Z,abfss://functoblob-data@team2blobstorage.dfs.core.windows.net/fall-detection/2025/12/22/291bed60-eb1f-4307-9656-7b1d14944d97.json
2025-12-22T01:20:21.865737+00:00,"{""deviceId"":""watch-001"",""model"":""galaxy-watch"",""os"":""wearos"",""appVersion"":""1.0.0""}","[{""ax"":0.08,""ay"":-0.2,""az"":9.7,""gx"":0.02,""gy"":0.01,""gz"":-0.03}]",2025-12-22T03:00:00.000Z,user_cancelled,,2025-12-22T01:23:11.814Z,abfss://functoblob-data@team2blobstorage.dfs.core.windows.net/fall-detection/2025/12/22/543fd960-6949-49c5-b144-2f760d2927af.json
2025-12-22T01:12:03.067210+00:00,"{""deviceId"":""watch-001"",""model"":""galaxy-watch"",""os"":""wearos"",""appVersion"":""1.0.0""}","[{""ax"":0.08,""ay"":-0.2,""az"":9.7,""gx"":0.02,""gy"":0.01,""gz"":-0.03}]",2025-12-20T03:00:00.000Z,fall_detected,,2025-12-22T01:23:11.814Z,abfss://functoblob-data@team2blobstorage.dfs.core.windows.net/fall-detection/2025/12/20/60323313-19a2-49c2-89bd-f6c6d7e79064.json
2025-12-22T01:20:32.885812+00:00,"{""deviceId"":""watch-001"",""model"":""galaxy-watch"",""os"":""wearos"",""appVersion"":""1.0.0""}","[{""ax"":0.08,""ay"":-0.2,""az"":9.7,""gx"":0.02,""gy"":0.01,""gz"":-0.03}]",2025-12-22T03:00:00.000Z,auto_reported,,2025-12-22T01:23:11.814Z,abfss://functoblob-data@team2blobstorage.dfs.core.windows.net/fall-detection/2025/12/22/17c9b71f-7217-4616-98f4-b642f1982784.json
2025-12-22T01:18:13.778975+00:00,"{""deviceId"":""watch-001"",""model"":""galaxy-watch"",""os"":""wearos"",""appVersion"":""1.0.0""}","[{""ax"":0.08,""ay"":-0.2,""az"":9.7,""gx"":0.02,""gy"":0.01,""gz"":-0.03}]",2025-12-22T03:00:00.000Z,fall_detected,,2025-12-22T01:23:11.814Z,abfss://functoblob-data@team2blobstorage.dfs.core.windows.net/fall-detection/2025/12/22/3369a13f-2206-4204-92a3-5bc4668583b6.json
2025-12-22T01:20:36.915198+00:00,"{""deviceId"":""watch-001"",""model"":""galaxy-watch"",""os"":""wearos"",""appVersion"":""1.0.0""}","[{""ax"":0.08,""ay"":-0.2,""az"":9.7,""gx"":0.02,""gy"":0.01,""gz"":-0.03}]",2025-12-22T03:00:00.000Z,auto_reported,,2025-12-22T01:23:11.814Z,abfss://functoblob-data@team2blobstorage.dfs.core.windows.net/fall-detection/2025/12/22/5353d928-51c8-4e78-b60e-a3838693ecdb.json
2025-12-22T01:20:35.014668+00:00,"{""deviceId"":""watch-001"",""model"":""galaxy-watch"",""os"":""wearos"",""appVersion"":""1.0.0""}","[{""ax"":0.08,""ay"":-0.2,""az"":9.7,""gx"":0.02,""gy"":0.01,""gz"":-0.03}]",2025-12-22T03:00:00.000Z,auto_reported,,2025-12-22T01:23:11.814Z,abfss://functoblob-data@team2blobstorage.dfs.core.windows.net/fall-detection/2025/12/22/a495c2d2-dc2d-4c0a-a202-ede95563ff27.json
2025-12-22T01:20:34.067094+00:00,"{""deviceId"":""watch-001"",""model"":""galaxy-watch"",""os"":""wearos"",""appVersion"":""1.0.0""}","[{""ax"":0.08,""ay"":-0.2,""az"":9.7,""gx"":0.02,""gy"":0.01,""gz"":-0.03}]",2025-12-22T03:00:00.000Z,auto_reported,,2025-12-22T01:23:11.814Z,abfss://functoblob-data@team2blobstorage.dfs.core.windows.net/fall-detection/2025/12/22/c5bdd4f3-9964-42cd-bdc4-1df1f130bd97.json
2025-12-22T01:20:35.985370+00:00,"{""deviceId"":""watch-001"",""model"":""galaxy-watch"",""os"":""wearos"",""appVersion"":""1.0.0""}","[{""ax"":0.08,""ay"":-0.2,""az"":9.7,""gx"":0.02,""gy"":0.01,""gz"":-0.03}]",2025-12-22T03:00:00.000Z,auto_reported,,2025-12-22T01:23:11.814Z,abfss://functoblob-data@team2blobstorage.dfs.core.windows.net/fall-detection/2025/12/22/e43fff17-d9fa-4d6d-a40e-31667e941afc.json


In [0]:

#하단 코드로 AutoLoader 종료 및 컴퓨트 종료 필수
for q in spark.streams.active:
    print("Stopping:", q.name, q.id)
    q.stop()

len(spark.streams.active)

Stopping: None 95a6be44-6ce5-4295-8071-06dc6eaa6c21


0

In [0]:
%sql
--샘플 데이터 삭제

DROP TABLE IF EXISTS bronze_fall_detection_raw;


In [0]:
#데이터가 안쌓인다면

spark.streams.active


[<pyspark.sql.streaming.query.StreamingQuery at 0x7f0bb8620c80>]

In [0]:
dbutils.fs.ls(raw_path)

[FileInfo(path='abfss://functoblob-data@team2blobstorage.dfs.core.windows.net/fall-detection/2025/', name='2025/', size=0, modificationTime=0)]

In [0]:
%sql
-- 데이터 확인

SELECT *
FROM bronze_fall_detection_raw
ORDER BY _ingest_time DESC
LIMIT 2;



_ingestedAt,device,sensor,timestamp,type,_rescued_data,_ingest_time,_source_file
2025-12-22T01:42:25.684231+00:00,"{""deviceId"":""watch-001"",""model"":""galaxy-watch"",""os"":""wearos"",""appVersion"":""1.0.0""}","[{""ax"":0.08,""ay"":-0.2,""az"":9.7,""gx"":0.02,""gy"":0.01,""gz"":-0.03}]",2025-12-22T03:00:00.000Z,auto_reported,,2025-12-22T01:42:26.593Z,abfss://functoblob-data@team2blobstorage.dfs.core.windows.net/fall-detection/2025/12/22/a8418112-dbf1-4f3d-8e90-d4973db8be6d.json
2025-12-22T01:39:24.831848+00:00,"{""deviceId"":""watch-001"",""model"":""galaxy-watch"",""os"":""wearos"",""appVersion"":""1.0.0""}","[{""ax"":0.08,""ay"":-0.2,""az"":9.7,""gx"":0.02,""gy"":0.01,""gz"":-0.03}]",2025-12-22T03:00:00.000Z,auto_reported,,2025-12-22T01:39:25.648Z,abfss://functoblob-data@team2blobstorage.dfs.core.windows.net/fall-detection/2025/12/22/18560509-ae56-44cf-8fb8-fa8d159e9565.json


In [0]:
%sql
SELECT current_catalog(), current_schema();

current_catalog(),current_schema()
team2_databricks,default


In [0]:
storage_account = "team2blobstorage"
container = "functoblob-data"
raw_path = f"abfss://{container}@{storage_account}.dfs.core.windows.net/fall-detection/"

In [0]:
display(dbutils.fs.ls(raw_path))

path,name,size,modificationTime
abfss://functoblob-data@team2blobstorage.dfs.core.windows.net/fall-detection/2025/,2025/,0,0


In [0]:
from pyspark.sql.functions import current_timestamp, col

member_tag = "memberA"  # 예: "soyun", "jh", "hy" 등으로 변경

schema_location = f"abfss://{container}@{storage_account}.dfs.core.windows.net/_schemas/fall_detection_{member_tag}/"
checkpoint_location = f"abfss://{container}@{storage_account}.dfs.core.windows.net/_checkpoints/fall_detection_{member_tag}/"

target_table = f"team2_databricks.default.bronze_fall_detection_raw_{member_tag}"

In [0]:
bronze_df = (
    spark.readStream
        .format("cloudFiles")
        .option("cloudFiles.format", "json")
        .option("cloudFiles.schemaLocation", schema_location)
        .option("cloudFiles.schemaEvolutionMode", "addNewColumns")
        .option("cloudFiles.inferColumnTypes", "true")
        .load(raw_path)
        .withColumn("_ingest_time", current_timestamp())
        .withColumn("_source_file", col("_metadata.file_path"))
)

query = (
    bronze_df.writeStream
        .format("delta")
        .outputMode("append")
        .option("checkpointLocation", checkpoint_location)
        .toTable(target_table)
)

query

In [0]:
%sql
SELECT COUNT(*) AS cnt
FROM team2_databricks.default.bronze_fall_detection_raw_memberA;

cnt
36


In [0]:
%sql
SELECT *
FROM team2_databricks.default.bronze_fall_detection_raw_memberA
ORDER BY _ingest_time DESC
LIMIT 5;

_ingestedAt,device,isFall,sensor,timestamp,type,_rescued_data,_ingest_time,_source_file
2025-12-20T08:52:25.204267+00:00,"List(watch-001, galaxy-watch, wearos)",,"[{""ax"":0.08,""ay"":-0.2,""az"":9.7,""gx"":0.02,""gy"":0.01,""gz"":-0.03}]",2025-12-20T03:00:00.000Z,fall,,2025-12-20T08:52:26.115Z,abfss://functoblob-data@team2blobstorage.dfs.core.windows.net/fall-detection/2025/12/20/7b78ecb2-a008-4369-9e8b-ab09e4d4731b.json
2025-12-20T07:43:44.901786+00:00,"List(watch-001, galaxy-watch, wearos)",,"[{""ax"":0.08,""ay"":-0.2,""az"":9.7,""gx"":0.02,""gy"":0.01,""gz"":-0.03}]",2025-12-20T03:00:00.000Z,fall,,2025-12-20T08:38:15.959Z,abfss://functoblob-data@team2blobstorage.dfs.core.windows.net/fall-detection/2025/12/20/4a5372a0-fe4c-435a-b0f4-fbaad967228a.json
2025-12-20T07:43:43.900512+00:00,"List(watch-001, galaxy-watch, wearos)",,"[{""ax"":0.08,""ay"":-0.2,""az"":9.7,""gx"":0.02,""gy"":0.01,""gz"":-0.03}]",2025-12-20T03:00:00.000Z,fall,,2025-12-20T08:38:15.959Z,abfss://functoblob-data@team2blobstorage.dfs.core.windows.net/fall-detection/2025/12/20/58eb3ee5-3a71-4a70-b7ed-0e5c4c2067ca.json
2025-12-20T07:43:46.047422+00:00,"List(watch-001, galaxy-watch, wearos)",,"[{""ax"":0.08,""ay"":-0.2,""az"":9.7,""gx"":0.02,""gy"":0.01,""gz"":-0.03}]",2025-12-20T03:00:00.000Z,fall,,2025-12-20T08:38:15.959Z,abfss://functoblob-data@team2blobstorage.dfs.core.windows.net/fall-detection/2025/12/20/57074d0f-d497-4086-8cd1-40a756ea688b.json
2025-12-20T07:04:36.713509+00:00,"List(watch-001, galaxy-watch, wearos)",,"[{""ax"":0.08,""ay"":-0.2,""az"":9.7,""gx"":0.02,""gy"":0.01,""gz"":-0.03}]",2025-12-20T03:00:00.000Z,fall,,2025-12-20T08:38:15.959Z,abfss://functoblob-data@team2blobstorage.dfs.core.windows.net/fall-detection/2025/12/20/159d2df2-1c28-4d3d-a6e3-0bdf52d9838b.json
