Este notebook faz a ingestão dos dados com CREATE TABLE AS 

## Volumes a serem criados:
- `claims` - Volume para dados e imagens de sinistros (CSV, imagens, metadata)
- `sql_server` - Volume para dados extraídos do SQL Server (CSV: claims, customers, policies)
- `telematics` - Volume para dados de telemetria veicular (arquivos Parquet)
- `training_imgs` - Volume para imagens de treinamento de modelos de Machine Learning (PNG)

## O que são Volumes no Unity Catalog?
- **Volumes** são containers de arquivos no Unity Catalog
- Permitem armazenar arquivos não estruturados ou semi-estruturados (CSV, JSON, Parquet, imagens, etc.)
- Fornecem governança e controle de acesso granular
- São a alternativa moderna ao DBFS para armazenamento de arquivos
- Permitem organizar dados por tipo ou origem antes do processamento

## Nota sobre Upload
Após criar os volumes neste notebook, você pode fazer upload dos arquivos diretamente via **UI do Databricks**:
1. Navegue até: Catalog → smart_claims_dev → 00_landing
2. Clique no volume desejado
3. Clique em **Upload** ou **Add files**
4. Selecione os arquivos/pastas do seu sistema local



## Setup

Primeiro, vamos garantir que estamos usando o catálogo correto e que o schema 00_landing existe.



In [0]:
SELECT current_catalog(), current_schema()


## EXPLORE THE DATA SOURCE FILES



In [0]:
LIST /Volumes/smart_claims_dev.00_landing/sql_server

Query os arquivos parquet usando o path do volume

In [None]:
SELECT *
FROM parquet.`/Volumes/smart_claims_dev.00_landing/sql_server/claims.csv`

## Batch Data Ingestion with CTAs

CTAs with CREATE TABLE AS é uma maneira eficiente de ingerir dados em massa.

Dentro do Unity Catalog, podemos criar tabelas Delta a partir de arquivos em volumes.




In [0]:
SELECT *
FROM read_files(
    'dbfs:/Volumes/smart_claims_dev.00_landing/sql_server/claims.csv',
    format = 'csv'
)






In [None]:
--drop the table if it exists for demonstration purposes
DROP TABLE IF EXISTS smart_claims_dev.01_bronze.claims;

--create the table using the CREATE TABLE AS statement
CREATE TABLE smart_claims_dev.01_bronze.claims
AS SELECT *
FROM read_files(
    'dbfs:/Volumes/smart_claims_dev.00_landing/sql_server/claims.csv',
    format = 'csv'
)

-- Preview the data
SELECT *
FROM smart_claims_dev.01_bronze.claims
LIMIT 10






In [None]:
DESCRIBE TABLE EXTENDED smart_claims_dev.01_bronze.claims;

## Python Ingestion

In [None]:
%python
df = (spark.
        read.
        format("csv").
        option("header", True).
        load("dbfs:/Volumes/smart_claims_dev.00_landing/sql_server/claims.csv")
)

df.write.mode("overwrite").saveAsTable("smart_claims_dev.01_bronze.claims")

claims_table = spark.table("smart_claims_dev.01_bronze.claims")

claims_table.display()






## COPY INTO

In [None]:
DROP TABLE IF EXISTS smart_claims_dev.01_bronze.claims;

CREATE TABLE smart_claims_dev.01_bronze.claims (
    claim_no STRING,
    policy_no STRING,
    claim_date DATE,
    months_as_customer INT,
    injury STRING,
    property STRING,
    vehicle STRING,
)

COPY INTO smart_claims_dev.01_bronze.claims
FROM 'dbfs:/Volumes/smart_claims_dev.00_landing/sql_server/claims.csv'
FILEFORMAT = CSV
COPY_OPTIONS (header = true, infer_schema = true)