# Fraud Detection - Bronze Layer Ingestion

This notebook ingests raw fraud detection data into the Bronze layer of the medallion architecture.

## Data Sources
* **transactions_data.csv** - Transaction records
* **cards_data.csv** - Card information
* **users_data.csv** - User/customer data
* **mcc_codes.json** - Merchant Category Code descriptions
* **train_fraud_labels.json** - Fraud labels for training data

## Output Tables
* `workspace.fraud.transactions_bronze` - All transactions (train/score split happens in Silver layer)
* `workspace.fraud.cards_bronze` - Card dimension (sensitive fields removed)
* `workspace.fraud.users_bronze` - User dimension
* `workspace.fraud.mcc_dim_bronze` - MCC dimension
* `workspace.fraud.labels_map` - Transaction ID to fraud label mapping

In [0]:
%sql
CREATE SCHEMA IF NOT EXISTS workspace.fraud;

In [0]:
%sql
SHOW CATALOGS;

In [0]:
%sql
SELECT current_catalog();

In [0]:
%sql
SHOW SCHEMAS IN workspace;

In [0]:
%sql
CREATE VOLUME IF NOT EXISTS workspace.fraud.raw;

In [0]:
%sql
SHOW VOLUMES IN workspace.fraud;

---
## 1. Setup
Created catalog schema and volume for raw data storage.

In [0]:
dbutils.fs.ls("dbfs:/Workspace/Users/hiverin@gmail.com/")

---
## 2. Load Raw Data
Load all raw CSV and JSON files from the volume.

In [0]:
# Define base path for raw data
base_path = "dbfs:/Volumes/workspace/fraud/raw/"

# Load CSV files
cards = spark.read.csv(base_path + "cards_data.csv", header=True, inferSchema=True)
users = spark.read.csv(base_path + "users_data.csv", header=True, inferSchema=True)

print(f"Loaded {cards.count():,} cards")
print(f"Loaded {users.count():,} users")

### 2.1 Parse Complex JSON Files
Parse nested JSON structures for fraud labels and MCC codes.

In [0]:
from pyspark.sql import functions as F
from pyspark.sql.types import MapType, StringType, StructType, StructField

# Parse train_fraud_labels.json which has structure: {"target": {"tx_id": "label", ...}}
schema = StructType([StructField('target', MapType(StringType(), StringType()), True)])

labels_raw = (
    spark.read
    .option("multiLine", "true")
    .schema(schema)
    .json(base_path + "train_fraud_labels.json")
)

# Explode the map into rows with tx_id and label columns
labels_map = (
    labels_raw
    .select(F.explode(F.map_entries(F.col("target"))).alias("kv"))
    .select(
        F.col("kv.key").alias("tx_id"),
        F.col("kv.value").alias("label")
    )
)

print(f"Loaded {labels_map.count():,} fraud labels")
labels_map.groupBy("label").count().show()

In [0]:
labels_map.write.mode("overwrite").format("delta").saveAsTable("workspace.fraud.labels_map")

In [0]:
from pyspark.sql import functions as F

# Load raw transactions
transactions = spark.read.csv(
    base_path + "transactions_data.csv", 
    header=True, 
    inferSchema=True
)

print(f"Loaded {transactions.count():,} total transactions")

# Add tx_id and tx_ts columns (no split, no join with labels)
transactions_bronze = transactions.withColumn("tx_id", F.col("id").cast("string")) \
                                  .withColumn("tx_ts", F.col("date"))

# Save to single Bronze table
transactions_bronze.write.mode("overwrite").option("overwriteSchema", "true").format("delta") \
    .saveAsTable("workspace.fraud.transactions_bronze")

print(f"Created transactions_bronze with {transactions_bronze.count():,} rows")

---
## 3. Create Bronze Layer Tables
Save raw data to Delta tables (immutable, no transformations).

In [0]:
from pyspark.sql import functions as F

# Parse mcc_codes.json which has structure: {"code": "description", ...}
# Read line-by-line and combine into single JSON string
lines = spark.read.text(base_path + "mcc_codes.json")

json_df = lines.agg(
    F.concat_ws("\n", F.collect_list("value")).alias("json_str")
)

# Parse JSON string into a Map and explode into rows
mcc_map = json_df.select(
    F.from_json("json_str", "map<string,string>").alias("m")
)

mcc_dim = (
    mcc_map
    .select(F.explode(F.map_entries("m")).alias("kv"))
    .select(
        F.col("kv.key").alias("mcc"),
        F.col("kv.value").alias("mcc_description")
    )
)

print(f"Loaded {mcc_dim.count():,} MCC codes")
mcc_dim.show(5, truncate=False)

In [0]:
# Drop sensitive fields (card_number, cvv) before saving to bronze
cards_bronze = cards.drop("card_number", "cvv")

cards_bronze.write.mode("overwrite").format("delta").saveAsTable("workspace.fraud.cards_bronze")
print(f"Created cards_bronze with {cards_bronze.count():,} rows")

In [0]:
users_bronze = users

users_bronze.write.mode("overwrite").format("delta").saveAsTable("workspace.fraud.users_bronze")
print(f"Created users_bronze with {users_bronze.count():,} rows")

In [0]:
mcc_dim.write.mode("overwrite").format("delta").saveAsTable("workspace.fraud.mcc_dim_bronze")
print(f"Created mcc_dim_bronze with {mcc_dim.count():,} rows")

---
## 4. Verify Bronze Tables
Confirm all tables were created successfully.

In [0]:
# Verify transactions bronze table
print("Transactions Bronze Table:")
print("=" * 50)

tx_count = spark.sql("SELECT COUNT(*) as count FROM workspace.fraud.transactions_bronze").collect()[0]['count']
print(f"transactions_bronze: {tx_count:>10,} rows")
print()

# Show sample from table
print("Sample from transactions_bronze:")
spark.sql("SELECT * FROM workspace.fraud.transactions_bronze LIMIT 3").show(truncate=False)

# Show schema
print("Schema:")
spark.sql("DESCRIBE TABLE workspace.fraud.transactions_bronze").show(truncate=False)

In [0]:
# Verify MCC dimension table
print("MCC Dimension:")
spark.sql("SELECT COUNT(*) as row_count FROM workspace.fraud.mcc_dim_bronze").show()
spark.sql("SELECT * FROM workspace.fraud.mcc_dim_bronze LIMIT 5").show(truncate=False)

In [0]:
# List all bronze tables in the fraud schema
print("Bronze Layer Tables:")
print("=" * 60)

bronze_tables = [
    "transactions_bronze",
    "cards_bronze",
    "users_bronze",
    "mcc_dim_bronze",
    "labels_map"
]

for table in bronze_tables:
    try:
        count = spark.sql(f"SELECT COUNT(*) as cnt FROM workspace.fraud.{table}").collect()[0]['cnt']
        print(f"{table:25} {count:>15,} rows")
    except Exception as e:
        print(f"{table:25} ERROR: Table not found")

print("=" * 60)