In [None]:
pip install --upgrade pip

In [None]:
pip install databricks-labs-dqx

In [None]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, lit, when
import random
import json

from databricks.labs.dqx.profiler.profiler import DQProfiler
from databricks.labs.dqx.profiler.generator import DQGenerator
from databricks.labs.dqx.engine import DQEngine
from databricks.sdk import WorkspaceClient
from databricks.labs.dqx.col_functions import *

In [None]:
names = ['Alice', 'Bob', 'Esther']
genders = ['F', 'M', None]

data = []

for i in range(1, 91):
  data.append((
    i, random.choice(names) if random.random() > 0.1 else None,
    random.randint(18, 60) if random.random() > 0.5 else None,
    random.choice(genders)
    ))

for _ in range(10):
  data.append(random.choice(data))

spark = SparkSession.builder.appName('StartingDQX').getOrCreate()

ws_client = WorkspaceClient()

df = spark.createDataFrame(data, ['id', 'name', 'age', 'gender'])

df = df.withColumn('id', when(col('id') % 15 == 0, None).otherwise(col('id')))

In [None]:
df.display()

In [None]:
def data_profile(data_input):
  try:
    profiler = DQProfiler(ws_client)
    summary_stats, profiles = profiler.profile(data_input)
    return summary_stats, profiles
  except Exception as e:
    raise

In [None]:
summary_stats, profiles = data_profile(df)

In [None]:
print('stats', json.dumps(summary_stats))

In [None]:
print('profile', profiles)

In [None]:
import dlt
from pyspark.sql.functions import col

# 1. Tabela com os dados brutos
@dlt.table(
    comment="Dados brutos de usuários extraídos de um sistema externo"
)
def raw_users():
    return spark.read.format("json").load("/mnt/raw_data/users/")

# 2. Tabela com regras de qualidade aplicadas
@dlt.table(
    comment="Dados de usuários limpos com validações de qualidade"
)
@dlt.expect("id_not_null", "id IS NOT NULL")
@dlt.expect_or_drop("age_positive", "age > 0")
@dlt.expect_or_fail("email_format_valid", "email LIKE '%@%.%'")
def cleaned_users():
    return dlt.read("raw_users")

# 3. Tabela de usuários ativos
@dlt.table(
    comment="Usuários ativos com idade entre 18 e 60"
)
def active_users():
    return dlt.read("cleaned_users").filter((col("age") >= 18) & (col("age") <= 60))

In [None]:
from pathlib import Path
import zipfile

# Estrutura do projeto
project_files = {
    "dlt_project/pipelines/dlt_pipeline.py": '''from dlt import dlt_table, read_stream, expect, expect_or_drop
from pyspark.sql.functions import col, to_date

@dlt_table(name="bronze_orders")
def bronze_orders():
    return read_stream("cloud_files:/mnt/raw/orders", format="json")

@dlt_table(name="silver_orders")
@expect("valid_order_id", "order_id IS NOT NULL")
@expect_or_drop("valid_total_amount", "total_amount >= 0")
@expect("valid_order_date", "order_date IS NOT NULL")
def silver_orders():
    df = dlt.read("bronze_orders")
    return df.withColumn("order_date", to_date(col("order_date"), "yyyy-MM-dd"))

@dlt_table(name="gold_sales_summary")
def gold_sales_summary():
    df = dlt.read("silver_orders")
    return df.groupBy("order_date").sum("total_amount").withColumnRenamed("sum(total_amount)", "daily_sales")
''',

    "dlt_project/config/table_config.json": '''{
  "source_path": "/mnt/raw/orders",
  "source_format": "json",
  "expectations": {
    "order_id": "order_id IS NOT NULL",
    "total_amount": "total_amount >= 0",
    "order_date": "order_date IS NOT NULL"
  }
}
''',

    "dlt_project/README.md": '''# Delta Live Tables - Pipeline com Qualidade de Dados

Este projeto demonstra um pipeline com Delta Live Tables (DLT) utilizando DQX (Data Quality Expectations) para garantir integridade dos dados.

## Camadas

- **Bronze**: ingestão dos dados brutos
- **Silver**: limpeza e validação com DQX
- **Gold**: agregações e métricas

## Qualidade dos Dados

- `order_id` não pode ser nulo
- `total_amount` deve ser ≥ 0 (linhas inválidas são descartadas)
- `order_date` não pode ser nulo

## Execução

1. Carregue arquivos JSON em `/mnt/raw/orders`
2. Crie o pipeline no Databricks com o notebook `dlt_pipeline.py`
3. Execute e monitore os resultados e falhas no UI do DLT
'''
}

# Criar diretórios e arquivos temporários
base_path = Path("/mnt/data/dlt_project")
for file_path, content in project_files.items():
    path = base_path / "/".join(file_path.split("/")[2:])
    path.parent.mkdir(parents=True, exist_ok=True)
    path.write_text(content)

# Compactar os arquivos em um zip
zip_path = Path("/mnt/data/dlt_project.zip")
with zipfile.ZipFile(zip_path, "w") as zipf:
    for file_path in project_files:
        zipf.write(base_path / "/".join(file_path.split("/")[2:]), arcname=file_path.split("/", 1)[1])

zip_path
