# JSON Flattening Toolkit - Comprehensive Guide (Java Edition)

> **A world-class exploration of JSON flattening techniques, patterns, and real-world applications using Java**

This notebook is organized into **10 self-contained milestones**, each focusing on specific aspects of JSON flattening. You can work through them sequentially or jump to specific topics of interest.

## üìö Table of Contents

### Foundations
- **[Milestone 1: Foundations & Core Concepts](#milestone-1)** - Basic flattening, list policies, separators
- **[Milestone 2: Array Handling Strategies](#milestone-2)** - Index vs join, explosion, cartesian products

### Advanced Techniques  
- **[Milestone 3: Complex Structures](#milestone-3)** - Deep nesting, mixed types, null handling

### Real-World Use Cases
- **[Milestone 4: E-commerce Data](#milestone-4)** - Orders, products, customers, transactions
- **[Milestone 5: API & Event Data](#milestone-5)** - API responses, webhooks, event logs

### Data Pipelines
- **[Milestone 6: CSV Operations & Pipelines](#milestone-6)** - Read/write, transformations, batch processing

### Database Integration
- **[Milestone 7: MongoDB Integration](#milestone-7)** - Ingestion, querying, type inference
- **[Milestone 8: Snowflake Integration](#milestone-8)** - Schema generation, ingestion, queries

### Production Patterns
- **[Milestone 9: Advanced Patterns & Best Practices](#milestone-9)** - Performance, memory, error handling
- **[Milestone 10: End-to-End Workflows](#milestone-10)** - Complete pipelines, production examples

---

## üéØ Learning Objectives

By the end of this notebook, you will be able to:
- ‚úÖ Flatten complex nested JSON structures efficiently in Java
- ‚úÖ Choose appropriate array handling strategies for your use case
- ‚úÖ Build data pipelines from JSON to CSV to databases
- ‚úÖ Handle edge cases (nulls, empty arrays, mixed types)
- ‚úÖ Integrate with MongoDB and Snowflake
- ‚úÖ Apply best practices for production systems

## üöÄ Quick Start

Let's set up our environment and import the necessary modules.

**Prerequisites:**
- IJava kernel installed (https://github.com/SpencerPark/IJava)
- Java 11 or higher

In [None]:
// ============================================================================
// DEPENDENCY MANAGEMENT - Load required libraries
// ============================================================================

// Using IJava's dependency management
%maven com.google.code.gson:gson:2.10.1
%maven org.mongodb:mongodb-driver-sync:4.11.1
%maven com.opencsv:opencsv:5.9
%maven org.apache.commons:commons-lang3:3.14.0

In [None]:
// ============================================================================
// IMPORTS - All imports at the top for clarity
// ============================================================================

import com.google.gson.*;
import com.opencsv.*;
import org.apache.commons.lang3.StringUtils;

import java.io.*;
import java.nio.file.*;
import java.util.*;
import java.util.stream.*;
import java.time.*;
import java.time.format.*;

// MongoDB imports (optional - may not be available)
import com.mongodb.client.*;
import org.bson.Document;

System.out.println("‚úÖ Imports loaded successfully!");

In [None]:
// ============================================================================
// CORE FLATTENING UTILITIES
// ============================================================================

/**
 * Enum for list handling policies
 */
enum ListPolicy {
    INDEX,  // Creates indexed keys (tags.0, tags.1)
    JOIN    // Joins primitive arrays with commas
}

/**
 * JSON Flattening utility class
 */
class JsonFlattener {
    private final String separator;
    private final ListPolicy listPolicy;
    private final Gson gson = new GsonBuilder().setPrettyPrinting().create();
    
    public JsonFlattener() {
        this(".", ListPolicy.INDEX);
    }
    
    public JsonFlattener(String separator, ListPolicy listPolicy) {
        this.separator = separator;
        this.listPolicy = listPolicy;
    }
    
    /**
     * Flatten a JSON object into a flat Map
     */
    public Map<String, Object> flatten(JsonObject json) {
        Map<String, Object> result = new LinkedHashMap<>();
        flattenElement("", json, result);
        return result;
    }
    
    /**
     * Flatten a JSON string into a flat Map
     */
    public Map<String, Object> flatten(String jsonString) {
        JsonObject json = JsonParser.parseString(jsonString).getAsJsonObject();
        return flatten(json);
    }
    
    private void flattenElement(String prefix, JsonElement element, Map<String, Object> result) {
        if (element.isJsonNull()) {
            result.put(prefix, null);
        } else if (element.isJsonPrimitive()) {
            JsonPrimitive prim = element.getAsJsonPrimitive();
            if (prim.isBoolean()) {
                result.put(prefix, prim.getAsBoolean());
            } else if (prim.isNumber()) {
                Number num = prim.getAsNumber();
                if (num.doubleValue() == num.longValue()) {
                    result.put(prefix, num.longValue());
                } else {
                    result.put(prefix, num.doubleValue());
                }
            } else {
                result.put(prefix, prim.getAsString());
            }
        } else if (element.isJsonObject()) {
            JsonObject obj = element.getAsJsonObject();
            for (Map.Entry<String, JsonElement> entry : obj.entrySet()) {
                String newKey = prefix.isEmpty() ? entry.getKey() : prefix + separator + entry.getKey();
                flattenElement(newKey, entry.getValue(), result);
            }
        } else if (element.isJsonArray()) {
            JsonArray arr = element.getAsJsonArray();
            if (listPolicy == ListPolicy.JOIN && isAllPrimitives(arr)) {
                String joined = StreamSupport.stream(arr.spliterator(), false)
                    .map(e -> e.isJsonNull() ? "" : e.getAsString())
                    .collect(Collectors.joining(","));
                result.put(prefix, joined);
            } else {
                for (int i = 0; i < arr.size(); i++) {
                    String newKey = prefix + separator + i;
                    flattenElement(newKey, arr.get(i), result);
                }
            }
        }
    }
    
    private boolean isAllPrimitives(JsonArray arr) {
        return StreamSupport.stream(arr.spliterator(), false)
            .allMatch(e -> e.isJsonPrimitive() || e.isJsonNull());
    }
    
    /**
     * Explode arrays into multiple records (cartesian product)
     */
    public List<Map<String, Object>> flattenRecords(JsonObject json, List<String> explodePaths) {
        List<Map<String, Object>> results = new ArrayList<>();
        results.add(new LinkedHashMap<>());
        
        flattenRecordsHelper("", json, results, new HashSet<>(explodePaths));
        return results;
    }
    
    private void flattenRecordsHelper(String prefix, JsonElement element, 
                                       List<Map<String, Object>> results, Set<String> explodePaths) {
        if (element.isJsonNull()) {
            for (Map<String, Object> record : results) {
                record.put(prefix, null);
            }
        } else if (element.isJsonPrimitive()) {
            JsonPrimitive prim = element.getAsJsonPrimitive();
            Object value;
            if (prim.isBoolean()) {
                value = prim.getAsBoolean();
            } else if (prim.isNumber()) {
                Number num = prim.getAsNumber();
                value = (num.doubleValue() == num.longValue()) ? num.longValue() : num.doubleValue();
            } else {
                value = prim.getAsString();
            }
            for (Map<String, Object> record : results) {
                record.put(prefix, value);
            }
        } else if (element.isJsonObject()) {
            JsonObject obj = element.getAsJsonObject();
            for (Map.Entry<String, JsonElement> entry : obj.entrySet()) {
                String newKey = prefix.isEmpty() ? entry.getKey() : prefix + separator + entry.getKey();
                flattenRecordsHelper(newKey, entry.getValue(), results, explodePaths);
            }
        } else if (element.isJsonArray()) {
            JsonArray arr = element.getAsJsonArray();
            if (explodePaths.contains(prefix)) {
                // Explode: multiply records
                List<Map<String, Object>> newResults = new ArrayList<>();
                for (int i = 0; i < arr.size(); i++) {
                    for (Map<String, Object> record : results) {
                        Map<String, Object> newRecord = new LinkedHashMap<>(record);
                        List<Map<String, Object>> singleRecordList = new ArrayList<>();
                        singleRecordList.add(newRecord);
                        flattenRecordsHelper(prefix, arr.get(i), singleRecordList, explodePaths);
                        newResults.addAll(singleRecordList);
                    }
                }
                results.clear();
                results.addAll(newResults);
            } else if (listPolicy == ListPolicy.JOIN && isAllPrimitives(arr)) {
                String joined = StreamSupport.stream(arr.spliterator(), false)
                    .map(e -> e.isJsonNull() ? "" : e.getAsString())
                    .collect(Collectors.joining(","));
                for (Map<String, Object> record : results) {
                    record.put(prefix, joined);
                }
            } else {
                for (int i = 0; i < arr.size(); i++) {
                    String newKey = prefix + separator + i;
                    flattenRecordsHelper(newKey, arr.get(i), results, explodePaths);
                }
            }
        }
    }
    
    public String toJson(Object obj) {
        return gson.toJson(obj);
    }
}

System.out.println("‚úÖ JsonFlattener class defined!");

In [None]:
// ============================================================================
// HELPER UTILITIES
// ============================================================================

class Utils {
    private static final Gson gson = new GsonBuilder().setPrettyPrinting().create();
    
    public static void printSection(String title) {
        printSection(title, '=');
    }
    
    public static void printSection(String title, char ch) {
        String line = String.valueOf(ch).repeat(60);
        System.out.println("\n" + line);
        System.out.println("  " + title);
        System.out.println(line + "\n");
    }
    
    public static void compareBeforeAfter(String before, Map<String, Object> after, String title) {
        printSection(title);
        System.out.println("BEFORE (Original JSON):");
        System.out.println(formatJson(before));
        System.out.println("\nAFTER (Flattened):");
        System.out.println(gson.toJson(after));
        System.out.println("\nüìä Flattened to " + after.size() + " fields");
    }
    
    public static String formatJson(String json) {
        JsonElement el = JsonParser.parseString(json);
        return gson.toJson(el);
    }
    
    public static String readFile(String path) throws IOException {
        return Files.readString(Path.of(path));
    }
    
    public static void writeFile(String path, String content) throws IOException {
        Files.writeString(Path.of(path), content);
    }
    
    public static long measureTime(Runnable task) {
        long start = System.nanoTime();
        task.run();
        long elapsed = System.nanoTime() - start;
        System.out.println("‚è±Ô∏è  Execution time: " + (elapsed / 1_000_000.0) + " ms");
        return elapsed;
    }
}

// Create output directory
Path outputDir = Path.of("notebook_output");
Files.createDirectories(outputDir);

System.out.println("‚úÖ Environment setup complete!");
System.out.println("üìÅ Output directory: " + outputDir.toAbsolutePath());

---

<a id="milestone-1"></a>

# Milestone 1: Foundations & Core Concepts

## Learning Objectives
- Understand the fundamental concept of JSON flattening
- Learn how nested structures are converted to flat dictionaries
- Explore different list handling policies
- Master custom separator usage

## Why Flatten JSON?

Data engineers and data scientists frequently encounter challenges:
- **Tabular formats** (CSV, databases) require flat structures
- **Analytics tools** work better with normalized data
- **Schema inference** is easier with flat structures
- **Database ingestion** requires consistent column structures

Let's start with the basics!

### 1.1 Understanding Nested Structures

**What is nesting?**  
Nesting occurs when JSON objects contain other objects or arrays inside them. Think of it like Russian dolls - objects within objects.

**Why is this a problem?**  
- Databases expect flat tables with columns
- CSV files are inherently flat (rows and columns)
- Analytics tools work better with normalized data
- Schema inference becomes complex with nested structures

**How does flattening work?**  
The `JsonFlattener.flatten()` method recursively traverses nested structures and creates dot-delimited keys. For example:
- `user.profile.name` represents the `name` field inside `profile` inside `user`
- The dot (`.`) is the default separator, but you can customize it

Let's see this in action:

In [None]:
// Example 1: Simple nested structure
String json1 = """
    {
        "user": {
            "id": 42,
            "profile": {
                "name": "Alice",
                "active": true
            }
        },
        "score": 9.5
    }
    """;

JsonFlattener flattener = new JsonFlattener();
Map<String, Object> flattened1 = flattener.flatten(json1);

Utils.compareBeforeAfter(json1, flattened1, "Example 1: Simple Nested Structure");

### 1.2 Custom Separators

**Why use custom separators?**  
Sometimes the default dot (`.`) separator can conflict with your data:
- Field names might contain dots
- You might prefer underscores (`_`) or double underscores (`__`)
- Some systems have naming conventions

**Example use cases:**
- MongoDB uses dots for nested queries, so you might want `_` instead
- Some databases prefer `__` for clarity
- Your organization might have specific naming standards

Let's explore different separators:

In [None]:
// Example 2: Custom separators
String json2 = """
    {
        "user": {
            "name": "Bob",
            "address": {
                "city": "NYC",
                "zip": "10001"
            }
        }
    }
    """;

Utils.printSection("Custom Separators Comparison");

// Default dot separator
JsonFlattener dotFlattener = new JsonFlattener(".", ListPolicy.INDEX);
System.out.println("With dot (.) separator:");
System.out.println(dotFlattener.toJson(dotFlattener.flatten(json2)));

// Underscore separator
JsonFlattener underscoreFlattener = new JsonFlattener("_", ListPolicy.INDEX);
System.out.println("\nWith underscore (_) separator:");
System.out.println(underscoreFlattener.toJson(underscoreFlattener.flatten(json2)));

// Double underscore separator
JsonFlattener doubleUnderscoreFlattener = new JsonFlattener("__", ListPolicy.INDEX);
System.out.println("\nWith double underscore (__) separator:");
System.out.println(doubleUnderscoreFlattener.toJson(doubleUnderscoreFlattener.flatten(json2)));

## Array Handling

Arrays can be handled in two ways:
- **Index policy**: Creates indexed keys (e.g., `tags.0`, `tags.1`)
- **Join policy**: Joins primitive arrays with commas

In [None]:
// Example: Array explosion - creating multiple records
String json5 = """
    {
        "order_id": 1001,
        "customer": "Alice",
        "items": [
            {"sku": "A1", "qty": 2, "price": 10.50},
            {"sku": "B2", "qty": 1, "price": 5.25},
            {"sku": "C3", "qty": 3, "price": 8.00}
        ]
    }
    """;

JsonFlattener flattener = new JsonFlattener();
JsonObject jsonObj = JsonParser.parseString(json5).getAsJsonObject();
List<Map<String, Object>> records = flattener.flattenRecords(jsonObj, List.of("items"));

System.out.println("Created " + records.size() + " records from array explosion:");
for (int i = 0; i < records.size(); i++) {
    System.out.println("\nRecord " + (i + 1) + ":");
    System.out.println(flattener.toJson(records.get(i)));
}

---

<a id="milestone-2"></a>

# Milestone 2: Array Handling Strategies

## Learning Objectives
- Compare index vs join list policies
- Understand array explosion into multiple records
- Create cartesian products across multiple array paths

Arrays are where flattening decisions have the biggest downstream impact. We'll compare policies and then explode arrays into multiple records.

In [None]:
Utils.printSection("Index vs Join list policies");

String arrayData = """
    {
        "tags": ["alpha", "beta", "gamma"],
        "metrics": {"scores": [10, 20, null]},
        "meta": {"ids": [1, 2, 3]}
    }
    """;

JsonFlattener indexFlattener = new JsonFlattener(".", ListPolicy.INDEX);
JsonFlattener joinFlattener = new JsonFlattener(".", ListPolicy.JOIN);

System.out.println("Index policy output:");
System.out.println(indexFlattener.toJson(indexFlattener.flatten(arrayData)));

System.out.println("\nJoin policy output:");
System.out.println(joinFlattener.toJson(joinFlattener.flatten(arrayData)));

In [None]:
Utils.printSection("Array explosion and cartesian products");

String multiPathData = """
    {
        "order_id": 1,
        "items": [
            {"sku": "A"},
            {"sku": "B"}
        ],
        "discounts": [
            {"code": "SAVE10"},
            {"code": "BONUS"}
        ]
    }
    """;

JsonFlattener flattener = new JsonFlattener();
JsonObject jsonObj = JsonParser.parseString(multiPathData).getAsJsonObject();
List<Map<String, Object>> records = flattener.flattenRecords(jsonObj, List.of("items", "discounts"));

System.out.println("Exploded to " + records.size() + " records (cartesian product):");
for (Map<String, Object> record : records) {
    System.out.println(flattener.toJson(record));
}

---

<a id="milestone-3"></a>

# Milestone 3: Complex Structures

## Learning Objectives
- Handle deep nesting and mixed types
- Process nulls, empty arrays, and optional fields
- Work with nested arrays inside arrays

These scenarios mirror real data engineering edge cases.

In [None]:
// Deep nesting example
Utils.printSection("Deep Nesting");

String deepNestingJson = """
    {
        "level1": {
            "level2": {
                "level3": {
                    "level4": {
                        "value": "deeply nested"
                    }
                }
            }
        }
    }
    """;

JsonFlattener flattener = new JsonFlattener();
Map<String, Object> deepFlattened = flattener.flatten(deepNestingJson);
Utils.compareBeforeAfter(deepNestingJson, deepFlattened, "Deep Nesting");

In [None]:
// Mixed types example
Utils.printSection("Mixed Types");

String mixedTypesJson = """
    {
        "string_field": "hello",
        "int_field": 42,
        "float_field": 3.14,
        "bool_field": true,
        "null_field": null,
        "array_field": [1, "two", 3.0, true, null]
    }
    """;

Map<String, Object> mixedFlattened = flattener.flatten(mixedTypesJson);
Utils.compareBeforeAfter(mixedTypesJson, mixedFlattened, "Mixed Types");

// Show types
System.out.println("\nField types:");
for (Map.Entry<String, Object> entry : mixedFlattened.entrySet()) {
    String type = entry.getValue() == null ? "null" : entry.getValue().getClass().getSimpleName();
    System.out.println("  " + entry.getKey() + ": " + type);
}

In [None]:
// Empty and null handling
Utils.printSection("Empty and Null Handling");

String nullHandlingJson = """
    {
        "present": "value",
        "empty_string": "",
        "null_value": null,
        "empty_array": [],
        "empty_object": {},
        "nested": {
            "null_inside": null,
            "value_inside": 123
        }
    }
    """;

Map<String, Object> nullFlattened = flattener.flatten(nullHandlingJson);
Utils.compareBeforeAfter(nullHandlingJson, nullFlattened, "Empty and Null Handling");

---

<a id="milestone-4"></a>

# Milestone 4: E-commerce Data

## Learning Objectives
- Flatten orders with line items
- Create cartesian combinations across items and discounts
- Preserve customer metadata

In [None]:
Utils.printSection("E-commerce Order Processing");

String orderJson = """
    {
        "order_id": "ORD-2024-001",
        "customer": {
            "id": "cust_001",
            "name": "Ada Lovelace",
            "segment": "enterprise"
        },
        "items": [
            {"sku": "LAPTOP-001", "name": "MacBook Pro", "qty": 1, "price": 2499.00},
            {"sku": "MOUSE-002", "name": "Magic Mouse", "qty": 2, "price": 99.00}
        ],
        "discounts": [
            {"code": "ENTERPRISE20", "percent": 20},
            {"code": "FREESHIP", "percent": 0}
        ],
        "shipping": {
            "address": {
                "street": "123 Tech Lane",
                "city": "San Francisco",
                "state": "CA",
                "zip": "94105"
            },
            "method": "express"
        }
    }
    """;

JsonFlattener flattener = new JsonFlattener();
JsonObject orderObj = JsonParser.parseString(orderJson).getAsJsonObject();
List<Map<String, Object>> orderRecords = flattener.flattenRecords(orderObj, List.of("items", "discounts"));

System.out.println("Created " + orderRecords.size() + " order records (items x discounts):");
for (int i = 0; i < orderRecords.size(); i++) {
    System.out.println("\nRecord " + (i + 1) + ":");
    Map<String, Object> record = orderRecords.get(i);
    // Show key fields only
    System.out.println("  order_id: " + record.get("order_id"));
    System.out.println("  items.sku: " + record.get("items.sku"));
    System.out.println("  items.name: " + record.get("items.name"));
    System.out.println("  discounts.code: " + record.get("discounts.code"));
}

---

<a id="milestone-5"></a>

# Milestone 5: API & Event Data

## Learning Objectives
- Flatten nested API responses
- Handle event log arrays
- Normalize timestamps for analytics

In [None]:
String apiResponse = """
    {
        "request_id": "req_123",
        "status": "ok",
        "data": {
            "user": {"id": 7, "name": "Grace"},
            "roles": ["admin", "editor"],
            "metadata": {"source": "web", "region": "us-east-1"}
        }
    }
    """;

JsonFlattener joinFlattener = new JsonFlattener(".", ListPolicy.JOIN);
Map<String, Object> flattenedApi = joinFlattener.flatten(apiResponse);
Utils.compareBeforeAfter(apiResponse, flattenedApi, "API Response Flattening");

In [None]:
Utils.printSection("Event log normalization");

String eventPayload = """
    {
        "service": "billing",
        "events": [
            {"type": "created", "timestamp": "2024-01-15T10:30:00Z", "amount": 45.5},
            {"type": "captured", "timestamp": "2024-01-15T10:31:05Z", "amount": 45.5}
        ]
    }
    """;

JsonFlattener flattener = new JsonFlattener();
JsonObject eventObj = JsonParser.parseString(eventPayload).getAsJsonObject();
List<Map<String, Object>> eventRecords = flattener.flattenRecords(eventObj, List.of("events"));

System.out.println("Created " + eventRecords.size() + " event records:");
for (Map<String, Object> record : eventRecords) {
    System.out.println(flattener.toJson(record));
}

---

<a id="milestone-6"></a>

# Milestone 6: CSV Operations & Pipelines

## Learning Objectives
- Write flattened records to CSV
- Read CSV back into Java
- Build repeatable batch pipelines

In [None]:
// CSV Writer utility
class CsvUtils {
    
    public static void writeCsv(List<Map<String, Object>> records, Path path) throws IOException {
        if (records.isEmpty()) {
            throw new IllegalArgumentException("No records to write");
        }
        
        // Collect all unique headers
        Set<String> headerSet = new LinkedHashSet<>();
        for (Map<String, Object> record : records) {
            headerSet.addAll(record.keySet());
        }
        List<String> headers = new ArrayList<>(headerSet);
        
        try (CSVWriter writer = new CSVWriter(new FileWriter(path.toFile()))) {
            // Write header
            writer.writeNext(headers.toArray(new String[0]));
            
            // Write data rows
            for (Map<String, Object> record : records) {
                String[] row = headers.stream()
                    .map(h -> {
                        Object val = record.get(h);
                        return val == null ? "" : String.valueOf(val);
                    })
                    .toArray(String[]::new);
                writer.writeNext(row);
            }
        }
    }
    
    public static List<Map<String, String>> readCsv(Path path) throws IOException {
        List<Map<String, String>> records = new ArrayList<>();
        
        try (CSVReader reader = new CSVReader(new FileReader(path.toFile()))) {
            String[] headers = reader.readNext();
            if (headers == null) return records;
            
            String[] row;
            while ((row = reader.readNext()) != null) {
                Map<String, String> record = new LinkedHashMap<>();
                for (int i = 0; i < headers.length && i < row.length; i++) {
                    record.put(headers[i], row[i]);
                }
                records.add(record);
            }
        }
        
        return records;
    }
}

System.out.println("‚úÖ CsvUtils class defined!");

In [None]:
// Write flattened records to CSV
Utils.printSection("CSV Pipeline");

String sampleOrder = """
    {
        "order_id": 1001,
        "customer": {"name": "Alice", "email": "alice@example.com"},
        "items": [
            {"sku": "A1", "qty": 2},
            {"sku": "B2", "qty": 1}
        ]
    }
    """;

JsonFlattener flattener = new JsonFlattener();
JsonObject orderObj = JsonParser.parseString(sampleOrder).getAsJsonObject();
List<Map<String, Object>> records = flattener.flattenRecords(orderObj, List.of("items"));

Path csvPath = outputDir.resolve("orders.csv");
CsvUtils.writeCsv(records, csvPath);

System.out.println("‚úì Written " + records.size() + " records to " + csvPath);
System.out.println("\nCSV content:");
System.out.println(Files.readString(csvPath));

In [None]:
// Read CSV back (round-trip)
System.out.println("\nRound-trip read:");
List<Map<String, String>> roundTrip = CsvUtils.readCsv(csvPath);
for (Map<String, String> row : roundTrip) {
    System.out.println(row);
}

---

<a id="milestone-7"></a>

# Milestone 7: MongoDB Integration

## Learning Objectives
- Ingest flattened records into MongoDB
- Query collections for analytics
- Understand type inference behavior

In [None]:
// MongoDB integration utilities
class MongoUtils {
    
    public static int ingestToMongo(List<Map<String, Object>> records, 
                                    String uri, String database, String collection) {
        try (MongoClient client = MongoClients.create(uri)) {
            MongoDatabase db = client.getDatabase(database);
            MongoCollection<Document> coll = db.getCollection(collection);
            
            List<Document> documents = records.stream()
                .map(Document::new)
                .collect(Collectors.toList());
            
            coll.insertMany(documents);
            return documents.size();
        }
    }
    
    public static List<Document> queryMongo(String uri, String database, 
                                            String collection, int limit) {
        try (MongoClient client = MongoClients.create(uri)) {
            MongoDatabase db = client.getDatabase(database);
            MongoCollection<Document> coll = db.getCollection(collection);
            
            return coll.find().limit(limit).into(new ArrayList<>());
        }
    }
}

System.out.println("‚úÖ MongoUtils class defined!");

In [None]:
Utils.printSection("MongoDB Integration");

String mongoUri = System.getenv().getOrDefault("MONGO_URI", "mongodb://localhost:27017");
String databaseName = System.getenv().getOrDefault("MONGO_DB", "json_flatten_demo");
String collectionName = System.getenv().getOrDefault("MONGO_COLLECTION", "orders_java");

try {
    // Prepare sample records
    String sampleData = """
        {
            "order_id": 2001,
            "customer": "Bob",
            "items": [
                {"sku": "X1", "qty": 3},
                {"sku": "Y2", "qty": 1}
            ]
        }
        """;
    
    JsonFlattener flattener = new JsonFlattener();
    JsonObject jsonObj = JsonParser.parseString(sampleData).getAsJsonObject();
    List<Map<String, Object>> records = flattener.flattenRecords(jsonObj, List.of("items"));
    
    int inserted = MongoUtils.ingestToMongo(records, mongoUri, databaseName, collectionName);
    System.out.println("Inserted " + inserted + " documents into " + databaseName + "." + collectionName);
    
    // Query back
    List<Document> docs = MongoUtils.queryMongo(mongoUri, databaseName, collectionName, 3);
    System.out.println("\nSample documents:");
    for (Document doc : docs) {
        System.out.println(doc.toJson());
    }
} catch (Exception e) {
    System.out.println("MongoDB integration skipped: " + e.getMessage());
    System.out.println("(Make sure MongoDB is running on " + mongoUri + ")");
}

---

<a id="milestone-8"></a>

# Milestone 8: Snowflake Integration

## Learning Objectives
- Generate Snowflake table schemas
- Understand type mapping from Java to Snowflake
- Prepare data for Snowflake ingestion

In [None]:
// Snowflake schema generation utility
class SnowflakeUtils {
    
    public static String inferSnowflakeType(Object value) {
        if (value == null) return "VARCHAR";
        if (value instanceof Boolean) return "BOOLEAN";
        if (value instanceof Long || value instanceof Integer) return "INTEGER";
        if (value instanceof Double || value instanceof Float) return "FLOAT";
        return "VARCHAR";
    }
    
    public static String generateCreateTable(List<Map<String, Object>> records, 
                                              String tableName, String schema) {
        if (records.isEmpty()) {
            throw new IllegalArgumentException("No records to infer schema");
        }
        
        // Collect all columns and infer types from first non-null value
        Map<String, String> columnTypes = new LinkedHashMap<>();
        
        for (Map<String, Object> record : records) {
            for (Map.Entry<String, Object> entry : record.entrySet()) {
                String col = entry.getKey();
                if (!columnTypes.containsKey(col) || columnTypes.get(col).equals("VARCHAR")) {
                    if (entry.getValue() != null) {
                        columnTypes.put(col, inferSnowflakeType(entry.getValue()));
                    } else if (!columnTypes.containsKey(col)) {
                        columnTypes.put(col, "VARCHAR");
                    }
                }
            }
        }
        
        StringBuilder sb = new StringBuilder();
        sb.append("CREATE TABLE IF NOT EXISTS ").append(schema).append(".").append(tableName).append(" (\n");
        
        List<String> columnDefs = new ArrayList<>();
        for (Map.Entry<String, String> entry : columnTypes.entrySet()) {
            // Sanitize column name (replace dots with underscores)
            String colName = entry.getKey().replace(".", "_").toUpperCase();
            columnDefs.add("    " + colName + " " + entry.getValue());
        }
        
        sb.append(String.join(",\n", columnDefs));
        sb.append("\n);");
        
        return sb.toString();
    }
}

System.out.println("‚úÖ SnowflakeUtils class defined!");

In [None]:
Utils.printSection("Snowflake Schema Generation");

// Use our existing order records
String orderData = """
    {
        "order_id": 3001,
        "total": 156.75,
        "paid": true,
        "customer": {"name": "Charlie", "tier": "gold"},
        "items": [
            {"sku": "P1", "qty": 2, "price": 50.00},
            {"sku": "P2", "qty": 1, "price": 56.75}
        ]
    }
    """;

JsonFlattener flattener = new JsonFlattener();
JsonObject orderObj = JsonParser.parseString(orderData).getAsJsonObject();
List<Map<String, Object>> records = flattener.flattenRecords(orderObj, List.of("items"));

String schemaSql = SnowflakeUtils.generateCreateTable(records, "ORDERS_FLAT", "PUBLIC");
System.out.println(schemaSql);

System.out.println("\nNote: Actual Snowflake ingestion requires JDBC driver and credentials.");
System.out.println("Set SNOWFLAKE_* environment variables and use snowflake-jdbc driver.");

---

<a id="milestone-9"></a>

# Milestone 9: Advanced Patterns & Best Practices

## Learning Objectives
- Measure performance for large workloads
- Avoid unintended cartesian explosions
- Understand memory considerations

In [None]:
Utils.printSection("Performance Measurement");

// Generate a large cartesian product scenario
String largeCartesian = """
    {
        "id": 1,
        "items": [
            {"sku": "A"}, {"sku": "B"}, {"sku": "C"}, {"sku": "D"}, {"sku": "E"},
            {"sku": "F"}, {"sku": "G"}, {"sku": "H"}, {"sku": "I"}, {"sku": "J"}
        ],
        "promos": [
            {"code": "P1"}, {"code": "P2"}, {"code": "P3"}, {"code": "P4"}, {"code": "P5"}
        ],
        "regions": [
            {"name": "R1"}, {"name": "R2"}, {"name": "R3"}
        ]
    }
    """;

JsonFlattener flattener = new JsonFlattener();
JsonObject largeObj = JsonParser.parseString(largeCartesian).getAsJsonObject();

// Measure time
long start = System.nanoTime();
List<Map<String, Object>> largeRecords = flattener.flattenRecords(
    largeObj, List.of("items", "promos", "regions")
);
long elapsed = System.nanoTime() - start;

System.out.println("Generated " + largeRecords.size() + " records from cartesian explosion");
System.out.println("Expected: 10 items x 5 promos x 3 regions = 150 records");
System.out.println("‚è±Ô∏è  Execution time: " + (elapsed / 1_000_000.0) + " ms");

In [None]:
Utils.printSection("Batch Processing Pattern");

// Simulate batch processing of multiple JSON documents
List<String> jsonBatch = List.of(
    "{\"id\": 1, \"items\": [{\"sku\": \"A\"}]}",
    "{\"id\": 2, \"items\": [{\"sku\": \"B\"}, {\"sku\": \"C\"}]}",
    "{\"id\": 3, \"items\": [{\"sku\": \"D\"}]}"
);

JsonFlattener flattener = new JsonFlattener();
List<Map<String, Object>> allRecords = new ArrayList<>();

long batchStart = System.nanoTime();
for (String json : jsonBatch) {
    JsonObject obj = JsonParser.parseString(json).getAsJsonObject();
    List<Map<String, Object>> records = flattener.flattenRecords(obj, List.of("items"));
    allRecords.addAll(records);
}
long batchElapsed = System.nanoTime() - batchStart;

System.out.println("Processed " + jsonBatch.size() + " documents");
System.out.println("Total flattened records: " + allRecords.size());
System.out.println("‚è±Ô∏è  Batch execution time: " + (batchElapsed / 1_000_000.0) + " ms");

---

<a id="milestone-10"></a>

# Milestone 10: End-to-End Workflows

## Learning Objectives
- Build a complete JSON ‚Üí CSV pipeline
- Validate round-trip data integrity
- Prepare data for database ingestion

In [None]:
Utils.printSection("End-to-End Pipeline");

// Step 1: Source JSON
String sourceJson = """
    {
        "transaction_id": "TXN-2024-001",
        "timestamp": "2024-01-15T14:30:00Z",
        "merchant": {
            "id": "M001",
            "name": "TechStore",
            "category": "electronics"
        },
        "customer": {
            "id": "C001",
            "email": "user@example.com",
            "tier": "premium"
        },
        "items": [
            {"sku": "PHONE-001", "name": "Smartphone", "qty": 1, "price": 999.00},
            {"sku": "CASE-002", "name": "Phone Case", "qty": 2, "price": 29.99}
        ],
        "payment": {
            "method": "credit_card",
            "status": "completed",
            "total": 1058.98
        }
    }
    """;

System.out.println("Step 1: Source JSON loaded");
System.out.println(Utils.formatJson(sourceJson));

In [None]:
// Step 2: Flatten with array explosion
JsonFlattener flattener = new JsonFlattener();
JsonObject sourceObj = JsonParser.parseString(sourceJson).getAsJsonObject();
List<Map<String, Object>> flatRecords = flattener.flattenRecords(sourceObj, List.of("items"));

System.out.println("\nStep 2: Flattened to " + flatRecords.size() + " records");
for (int i = 0; i < flatRecords.size(); i++) {
    System.out.println("\nRecord " + (i + 1) + ":");
    System.out.println(flattener.toJson(flatRecords.get(i)));
}

In [None]:
// Step 3: Write to CSV
Path pipelineCsvPath = outputDir.resolve("transaction_flat.csv");
CsvUtils.writeCsv(flatRecords, pipelineCsvPath);

System.out.println("\nStep 3: Written to CSV");
System.out.println("File: " + pipelineCsvPath.toAbsolutePath());
System.out.println("\nCSV Preview:");
String csvContent = Files.readString(pipelineCsvPath);
System.out.println(csvContent);

In [None]:
// Step 4: Round-trip validation
List<Map<String, String>> roundTripRecords = CsvUtils.readCsv(pipelineCsvPath);

System.out.println("\nStep 4: Round-trip validation");
System.out.println("Original record count: " + flatRecords.size());
System.out.println("Round-trip record count: " + roundTripRecords.size());
System.out.println("‚úì Record counts match: " + (flatRecords.size() == roundTripRecords.size()));

// Verify key fields
System.out.println("\nKey field verification:");
for (int i = 0; i < roundTripRecords.size(); i++) {
    Map<String, String> record = roundTripRecords.get(i);
    System.out.println("Record " + (i + 1) + ": " + 
        "transaction_id=" + record.get("transaction_id") + ", " +
        "items.sku=" + record.get("items.sku") + ", " +
        "items.name=" + record.get("items.name"));
}

In [None]:
// Step 5: Generate Snowflake schema
String snowflakeSchema = SnowflakeUtils.generateCreateTable(flatRecords, "TRANSACTIONS_FLAT", "ANALYTICS");

System.out.println("\nStep 5: Snowflake schema generated");
System.out.println(snowflakeSchema);

System.out.println("\n‚úÖ End-to-end pipeline complete!");
System.out.println("Data is ready for:");
System.out.println("  - MongoDB ingestion (use MongoUtils.ingestToMongo())");
System.out.println("  - Snowflake ingestion (use COPY INTO with the generated schema)");
System.out.println("  - Analytics (CSV file ready for import)");

---

## Troubleshooting & Tips

If a cell fails, try these first:
- **Imports fail**: Ensure IJava kernel is properly installed and restart the kernel
- **Maven dependencies**: Run the dependency cell first and wait for downloads
- **MongoDB errors**: Confirm a local MongoDB is running and `MONGO_URI` points to it
- **Snowflake errors**: Verify environment variables (`SNOWFLAKE_*`) and add JDBC driver
- **Memory issues**: For large datasets, increase JVM heap size

Tip: Restart the kernel and re-run all cells if the environment feels inconsistent.

---

## Puzzle for Data Scientists

You receive 1,000 JSON records with **three array fields**: `items` (avg 4), `promos` (avg 2), and `regions` (avg 3). You flatten by exploding all three paths to create a cartesian product.

**Riddle:**
- How many records do you expect on average after explosion?
- If one region is missing (empty list) in 10% of records, how does that change the expected total?

Write your answer and then validate by simulating a small sample in code.

In [None]:
import java.util.Random;

Random random = new Random(42);

int numRecords = 1000;
double avgItems = 4.0;
double avgPromos = 2.0;
double avgRegions = 3.0;
double missingRegionRate = 0.10;

// Monte Carlo simulation
long simulatedTotal = 0;
for (int i = 0; i < numRecords; i++) {
    // Exponential distribution (approximation)
    int items = Math.max(1, (int) (-avgItems * Math.log(random.nextDouble())));
    int promos = Math.max(1, (int) (-avgPromos * Math.log(random.nextDouble())));
    
    int regions;
    if (random.nextDouble() < missingRegionRate) {
        regions = 0;
    } else {
        regions = Math.max(1, (int) (-avgRegions * Math.log(random.nextDouble())));
    }
    
    simulatedTotal += items * promos * Math.max(1, regions);
}

System.out.println("Simulated total records: " + simulatedTotal);
System.out.println("Average per input record: " + (simulatedTotal / (double) numRecords));

double expectedNoMissing = avgItems * avgPromos * avgRegions;
double expectedWithMissing = avgItems * avgPromos * ((1 - missingRegionRate) * avgRegions + missingRegionRate * 1);

System.out.println("\nExpected (no missing regions): " + expectedNoMissing);
System.out.println("Expected (10% missing regions): " + expectedWithMissing);

---

## Summary

This Java notebook demonstrated:

1. **Core JSON Flattening** - Converting nested structures to flat maps
2. **Array Policies** - INDEX vs JOIN approaches
3. **Cartesian Products** - Exploding multiple array paths
4. **CSV Operations** - Read/write with OpenCSV
5. **MongoDB Integration** - Document ingestion and querying
6. **Snowflake Schema** - DDL generation for data warehousing
7. **Performance** - Timing measurements for batch operations
8. **End-to-End Pipelines** - Complete JSON ‚Üí CSV ‚Üí Database workflows

### Key Java Libraries Used:
- **Gson** - JSON parsing and serialization
- **OpenCSV** - CSV read/write operations
- **MongoDB Java Driver** - Database integration
- **Apache Commons Lang** - String utilities

### Next Steps:
- Add Spark integration for large-scale processing
- Implement streaming JSON processing
- Add schema evolution handling
- Build production-grade error handling