In [0]:
# notebooks/01_data_ingestion.jpynb
# Databricks notebook source
# MAGIC %md
# MAGIC # Ingestão de dados para Delta Lake (Managed Table)

# COMMAND ----------

import pyspark.pandas as ps   # Pandas-on-Spark se precisar
from pyspark.sql.types import (
    StructType, StructField, StringType, IntegerType, DoubleType, TimestampType
)
from pyspark.sql import functions as F

# Caminho do CSV no Volume
RAW_CSV_PATH = "dbfs:/Volumes/workspace/default/fraud_data/transactions.csv"
TABLE_NAME = "workspace.default.transactions_delta"

# Schema explícito
schema = StructType([
    StructField("transaction_id", StringType(), True),
    StructField("user_id", StringType(), True),
    StructField("account_age_days", IntegerType(), True),
    StructField("total_transactions_user", IntegerType(), True),
    StructField("avg_amount_user", DoubleType(), True),
    StructField("amount", DoubleType(), True),
    StructField("country", StringType(), True),
    StructField("bin_country", StringType(), True),
    StructField("channel", StringType(), True),
    StructField("merchant_category", StringType(), True),
    StructField("promo_used", IntegerType(), True),
    StructField("avs_match", StringType(), True),
    StructField("cvv_result", StringType(), True),
    StructField("three_ds_flag", IntegerType(), True),
    StructField("transaction_time", TimestampType(), True),
    StructField("shipping_distance_km", DoubleType(), True),
    StructField("is_fraud", IntegerType(), True),
])

# COMMAND ----------

# Leitura do CSV
df = (
    spark.read
    .option("header", True)
    .schema(schema)
    .csv(RAW_CSV_PATH)
)

display(df.limit(10))
print(f"Linhas: {df.count()} | Colunas: {len(df.columns)}")

# COMMAND ----------

# Limpeza e partição
df = df.dropna(subset=["transaction_id", "user_id", "amount", "transaction_time", "is_fraud"])
df = df.withColumn("date", F.to_date("transaction_time"))

# ✅ Salvar diretamente como tabela gerenciada pelo UC
df.write.saveAsTable(
    name=TABLE_NAME,
    format="delta",
    mode="overwrite",
    partitionBy="date"
)

# COMMAND ----------

# Otimização
spark.sql(f"OPTIMIZE {TABLE_NAME} ZORDER BY (user_id, amount, merchant_category)")
spark.sql(f"VACUUM {TABLE_NAME} RETAIN 168 HOURS")

transaction_id,user_id,account_age_days,total_transactions_user,avg_amount_user,amount,country,bin_country,channel,merchant_category,promo_used,avs_match,cvv_result,three_ds_flag,transaction_time,shipping_distance_km,is_fraud
1,1,141,47,147.93,84.75,FR,FR,web,travel,0,1,1,1,2024-01-06T04:09:39.000Z,370.95,0
2,1,141,47,147.93,107.9,FR,FR,web,travel,0,0,0,0,2024-01-09T20:13:47.000Z,149.62,0
3,1,141,47,147.93,92.36,FR,FR,app,travel,1,1,1,1,2024-01-12T06:20:11.000Z,164.08,0
4,1,141,47,147.93,112.47,FR,FR,web,fashion,0,1,1,1,2024-01-15T17:00:04.000Z,397.4,0
5,1,141,47,147.93,132.91,FR,US,web,electronics,0,1,1,1,2024-01-17T01:27:31.000Z,935.28,0
6,1,141,47,147.93,224.82,FR,FR,web,travel,0,1,1,1,2024-01-26T22:05:08.000Z,289.06,0
7,1,141,47,147.93,125.98,FR,FR,app,electronics,0,1,1,1,2024-01-30T00:51:41.000Z,443.75,0
8,1,141,47,147.93,66.95,FR,RO,web,travel,0,1,1,1,2024-02-11T15:33:30.000Z,1390.59,0
9,1,141,47,147.93,261.58,FR,FR,app,grocery,0,0,0,0,2024-02-22T01:29:55.000Z,110.51,0
10,1,141,47,147.93,97.34,FR,FR,web,electronics,0,0,1,1,2024-03-09T11:13:19.000Z,232.34,0


Linhas: 299695 | Colunas: 17


DataFrame[path: string]