## COPY INTO


🟢 Best for: Batch ingestion of semi-static files that don’t change frequently
🟠 Limitation: No built-in auto-detection of new files (you manually run COPY INTO periodically)

In [0]:
%sql
-- Databricks SQL: COPY INTO Example
-- ====================================
-- COPY INTO allows you to load data from cloud storage into Delta table (batch ingestion)
-- Supported formats: CSV, JSON, PARQUET, AVRO, ORC, TEXT
-- COPY INTO is **idempotent**: it tracks already loaded files to avoid duplicates

-- Example: Load CSV sales data from Azure Blob Storage / AWS S3 / ADLS
-- NOTE: Replace 'your_container', 'your_storage_account', and 'your_path' appropriately

COPY INTO sales
FROM 'abfss://your_container@your_storage_account.dfs.core.windows.net/bookstore-data/sales/'
FILEFORMAT = CSV
FORMAT_OPTIONS ('header' = 'true', 'inferSchema' = 'true')
COPY_OPTIONS ('mergeSchema' = 'true');

-- Key Options Explained:
-- ======================
-- FILEFORMAT = CSV | PARQUET | JSON | AVRO  --> Specifies file type
-- FORMAT_OPTIONS: header, inferSchema, delimiter, etc.  --> CSV-specific options
-- COPY_OPTIONS('mergeSchema'='true')  --> Automatically update schema if new columns appear
-- COPY INTO skips already loaded files by tracking file metadata internally (transaction log)



## Auto Loader (Spark Structured Streaming)

🟢 Best for: Streaming ingestion of continuously arriving files
🟠 Advantage: Auto Loader automatically detects new files — no need to trigger manually
🔵 Supports schema evolution (with .option("mergeSchema", "true") if needed)



In [0]:
# Databricks PySpark: Auto Loader Example
# =========================================
# Auto Loader automatically **detects and ingests new files** arriving in cloud storage
# Supports formats: CSV, JSON, PARQUET, AVRO, BINARY, ORC, TEXT
# Incrementally loads data using file notifications (cloud native) or directory listing

# Example: Load JSON sales data from S3 bucket (or ADLS / Azure Blob / GCS)

from pyspark.sql.types import StructType, StructField, IntegerType, StringType, TimestampType

# 1️⃣ Define schema (recommended — better performance than inferring schema on each microbatch)
sales_schema = StructType([
    StructField("sale_id", IntegerType(), True),
    StructField("book_id", IntegerType(), True),
    StructField("customer_id", IntegerType(), True),
    StructField("quantity", IntegerType(), True),
    StructField("sale_date", TimestampType(), True)
])

# 2️⃣ Use Auto Loader to read new JSON files incrementally
sales_stream_df = (
    spark.readStream.format("cloudFiles")  # cloudFiles = Auto Loader
        .option("cloudFiles.format", "json")  # File format is JSON
        .option("cloudFiles.inferColumnTypes", "true")  # Optional if schema is not predefined
        .schema(sales_schema)
        .load("s3://your_bucket/bookstore-data/sales/")  # Adjust path for your cloud provider
)

# 3️⃣ Write streaming data into Delta table (append mode)
sales_stream_df.writeStream \
    .format("delta") \
    .option("checkpointLocation", "/mnt/checkpoints/sales_autoloader/") \
    .outputMode("append") \
    .table("sales")  # Target Delta table

# Key Options Explained:
# ======================
# cloudFiles.format = csv | json | parquet | avro  --> Specifies file type
# cloudFiles.inferColumnTypes = true  --> Optional auto schema inference (better to define schema explicitly)
# checkpointLocation  --> Required to enable fault-tolerance & progress tracking
# outputMode = append  --> Append only (typical for file ingestion)


In [0]:
# Auto Loader single-command version to ingest JSON into Delta table

(
  spark.readStream.format("cloudFiles")
    .option("cloudFiles.format", "json")
    .schema("""
      sale_id INT,
      book_id INT,
      customer_id INT,
      quantity INT,
      sale_date TIMESTAMP
    """)
    .load("s3://your_bucket/bookstore-data/sales/")  # Adjust path (S3 / ADLS / Blob)
    .writeStream
    .format("delta")
    .outputMode("append")
    .option("checkpointLocation", "/mnt/checkpoints/sales_autoloader/")
    .table("sales")
)
