# Data Importation

In [0]:
import numpy as np
import pandas as pd
import seaborn as sns
import matplotlib.pyplot as plt

from datetime import datetime, timedelta
from pyspark.sql.functions import col, lit, to_timestamp, current_timestamp
from pyspark.sql.types import * 
from delta import DeltaTable

##Data Importation

At this point, we are going to transform all the data extracted into Delta Lake format. This approach makes it easier to apply data handling methods like data cleansing and schema enforcement.

We are also filtering the ingestion data and making the data loading incremental with the following strategies:

- **Dynamic Cutoff Date:** Based on a configurable variable (`ANALYSIS_WINDOW_MONTHS`), fulfilling the project requirement for an automated, sliding historical window (e.g., "Last 6 months") that updates automatically with every execution.

- **Spark Structured Streaming with Auto Loader (cloudFiles):**  
  Instead of standard file reading, we use Databricks **Auto Loader** (`format("cloudFiles")`). This is crucial because:
  1. It efficiently detects new files as they land in S3 without listing directories repeatedly.
  2. It natively supports various formats (JSON, Avro, etc.) on Shared Clusters where standard streaming sources might be restricted.
  3. It handles **Schema Inference and Evolution** robustly.

- **Mixed Source Formats:** The code iterates through a configuration map that defines not just the source path and target table, but also the specific **file format** (JSON or Avro) for each dataset.

- **S3 Checkpointing:** Checkpoints are stored directly in S3 (`s3://.../checkpoints`) rather than DBFS. This ensures persistence, security, and avoids permissions errors common in shared environments.

In [0]:

# Data source paths
SOURCE_PATH_1 = "s3://aurorapay-studycase/customer_profiles"
SOURCE_PATH_2 = "s3://aurorapay-studycase/device_signals"
SOURCE_PATH_3 = "s3://aurorapay-studycase/merchant_registry"
SOURCE_PATH_4 = "s3://aurorapay-studycase/security_logs"
SOURCE_PATH_5 = "s3://aurorapay-studycase/transaction_events"

# Target tables where data will be stored
TARGET_TABLE_1 = "fraud_detection_project.bronze_layer.customer_profiles"
TARGET_TABLE_2 = "fraud_detection_project.bronze_layer.device_signals"
TARGET_TABLE_3 = "fraud_detection_project.bronze_layer.merchant_registry"
TARGET_TABLE_4 = "fraud_detection_project.bronze_layer.security_logs"
TARGET_TABLE_5 = "fraud_detection_project.bronze_layer.transaction_events"
# -----------------------------------------
# Definition of the Analysis Window (in months)
# This is the period we want to analyze for fraud detection.
ANALYSIS_WINDOW_MONTHS = 6

# Atuomatic calculation of the cutoff date
# If the job runs in 3 weeks, this date advances together, keeping the moving window
cutoff_date = datetime.now() - timedelta(days=ANALYSIS_WINDOW_MONTHS * 30)
cutoff_filter_str = cutoff_date.strftime("%Y-%m-%d")

print(f"--- Início do Ciclo de Inferência ---")
print(f"Data Base da Execução: {datetime.now().strftime('%Y-%m-%d')}")
print(f"Janela configurada: Últimos {ANALYSIS_WINDOW_MONTHS} meses")
print(f"Filtro de Data Calculado: >= {cutoff_filter_str}")
print(f"Nota: A ingestão abaixo traz todos os dados novos (Incremental). O filtro deve ser usado na leitura dessas tabelas.")
# -----------------------------------------

# Maping Origin to Destiny
tables_config = { 
    SOURCE_PATH_1: (TARGET_TABLE_1, "json"),
    SOURCE_PATH_2: (TARGET_TABLE_2, "avro"),
    SOURCE_PATH_3: (TARGET_TABLE_3, "json"),
    SOURCE_PATH_4: (TARGET_TABLE_4, "avro"),
    SOURCE_PATH_5: (TARGET_TABLE_5, "avro")
}

CHECKPOINT_BASE_PATH = "s3://aurorapay-studycase/checkpoints"

# Loop to process each table
for source, (target, file_format) in tables_config.items():
    print(f"Lendo de: {source} ({file_format}) -> Gravando em: {target}")

    # Define checkpoint location in S3
    table_name = target.split(".")[-1]
    checkpoint_location = f"{CHECKPOINT_BASE_PATH}/{table_name}"
    
    # Pre-read to infer schema (Required for streaming non-Delta files like JSON/Avro)
    try:
        source_schema = spark.read.format(file_format).load(source).schema
    except Exception as e:
        print(f"Erro ao inferir schema para {source}: {e}")
        continue

    # Loading only new data using Structured Streaming
    (spark.readStream
        .format("cloudFiles") 
        .option("cloudFiles.format", file_format)
        .schema(source_schema)
        .option("maxFilesPerTrigger", 1000)
        .load(source)
        .writeStream
        .format("delta")
        .outputMode("append")
        .option("checkpointLocation", checkpoint_location)
        .option("mergeSchema", "true")
        .trigger(availableNow=True)
        .toTable(target))