# Bitcoin Whale Intelligence - Erklaert

Dieses Notebook erklaert Schritt fuer Schritt, wie wir Bitcoin-"Wale" (grosse Besitzer) identifizieren.

**Das Problem:** Auf der Blockchain sehen wir nur Adressen, nicht Personen. Ein Besitzer kann hunderte Adressen haben.

**Die Loesung:** Wir gruppieren Adressen, die wahrscheinlich derselben Person gehoeren.

---

| Abschnitt | Beschreibung |
|-----------|--------------|
| 1. Das Problem | Warum Whale-Analyse? |
| 2. UTXO-Modell | Bitcoin hat keine Konten |
| 3. Daten laden | JSON zu Spark DataFrames |
| 4. Outputs extrahieren | Nested zu Flach |
| 5. Inputs extrahieren | Nested zu Flach |
| 6. UTXO berechnen | Unspent Outputs finden |
| 7. Common Input Heuristic | Adressen gruppieren |
| 8. Graph bauen | Adressen als Netzwerk |
| 9. Connected Components | Entities finden |
| 10. Whale Detection | Grosse Besitzer finden |
| 11. Visualisierung | Whale-Verteilung |
| **12. Zeitreihen-Analyse** | **Balance ueber Zeit (NEU)** |
| 13. Executive Summary | Zusammenfassung |

---
## 1. Das Problem: Adressen sind nicht Personen

```
┌─────────────────────────────────────────────────────────────────────┐
│                    WAS WIR AUF DER BLOCKCHAIN SEHEN                 │
├─────────────────────────────────────────────────────────────────────┤
│                                                                     │
│   ┌───────────────┐   ┌───────────────┐   ┌───────────────┐        │
│   │  Adresse A    │   │  Adresse B    │   │  Adresse C    │        │
│   │   500 BTC     │   │   300 BTC     │   │   200 BTC     │        │
│   └───────────────┘   └───────────────┘   └───────────────┘        │
│                                                                     │
│   Sieht aus wie 3 verschiedene Besitzer mit je 200-500 BTC         │
│                                                                     │
└─────────────────────────────────────────────────────────────────────┘
                              │
                              ▼ Analyse
┌─────────────────────────────────────────────────────────────────────┐
│                    DIE REALITAET (versteckt)                        │
├─────────────────────────────────────────────────────────────────────┤
│                                                                     │
│           ┌───────────────────────────────────────┐                 │
│           │         EINE PERSON / ENTITY          │                 │
│           │              1.000 BTC                │                 │
│           │   (kontrolliert A, B und C)           │                 │
│           └───────────────────────────────────────┘                 │
│                                                                     │
│   Das ist ein WAL! Aber ohne Analyse wuerden wir es nicht sehen.   │
│                                                                     │
└─────────────────────────────────────────────────────────────────────┘
```

**Warum ist das wichtig?**
- Boersen haben tausende Adressen
- Privacy-bewusste Nutzer verwenden neue Adressen pro Transaktion
- Echte Vermoegensverteilung ist ohne Clustering unsichtbar

---
## 2. Das UTXO-Modell: Bitcoin hat keine "Konten"

```
┌─────────────────────────────────────────────────────────────────────┐
│                      BANK-MODELL (Nicht Bitcoin!)                   │
├─────────────────────────────────────────────────────────────────────┤
│                                                                     │
│   Konto 123:  Kontostand = 1.000 EUR                               │
│               ─────────────────────                                 │
│               Eine Zahl, die aktualisiert wird                      │
│                                                                     │
└─────────────────────────────────────────────────────────────────────┘

┌─────────────────────────────────────────────────────────────────────┐
│                      BITCOIN: UTXO-MODELL                           │
├─────────────────────────────────────────────────────────────────────┤
│                                                                     │
│   Adresse A besitzt diese "Muenzen" (UTXOs):                       │
│                                                                     │
│   ┌─────────┐  ┌─────────┐  ┌─────────┐                            │
│   │ 0.5 BTC │  │ 0.3 BTC │  │ 0.2 BTC │  = 1.0 BTC Guthaben        │
│   └─────────┘  └─────────┘  └─────────┘                            │
│                                                                     │
│   Wie Bargeld-Scheine: Jeder UTXO ist eine separate "Muenze"       │
│                                                                     │
└─────────────────────────────────────────────────────────────────────┘
```

**Eine Bitcoin-Transaktion funktioniert so:**

```
┌─────────────────┐                         ┌─────────────────┐
│    INPUTS       │                         │    OUTPUTS      │
│  (ausgeben)     │                         │  (neue UTXOs)   │
├─────────────────┤     ┌───────────┐       ├─────────────────┤
│  ┌───────────┐  │     │           │       │  ┌───────────┐  │
│  │  0.5 BTC  │──┼────▶│           │──────▶│  │  0.7 BTC  │  │  Empfaenger
│  └───────────┘  │     │    TX     │       │  └───────────┘  │
│  ┌───────────┐  │     │           │       │  ┌───────────┐  │
│  │  0.3 BTC  │──┼────▶│           │──────▶│  │ 0.09 BTC  │  │  Wechselgeld
│  └───────────┘  │     └───────────┘       │  └───────────┘  │
└─────────────────┘      Gebuehr: 0.01      └─────────────────┘
    0.8 BTC rein             BTC               0.79 BTC raus
```

**Wichtig:** Um einen UTXO auszugeben, braucht man den Private Key der Adresse!

---
## 3. Setup und Konfiguration

```
┌─────────────────────────────────────────────────────────────────────┐
│                        DATENQUELLE                                  │
├─────────────────────────────────────────────────────────────────────┤
│                                                                     │
│  ┌──────────────┐      ┌──────────────┐      ┌──────────────┐      │
│  │   Bitcoin    │      │  bitcoin-etl │      │    JSON      │      │
│  │  Full Node   │─────▶│   (Export)   │─────▶│   Dateien    │      │
│  └──────────────┘      └──────────────┘      └──────────────┘      │
│                                                                     │
│  Die Blockchain        Konvertiert zu        Unser Input           │
│  (900+ GB)             lesbarem Format       (Blocks + TXs)        │
│                                                                     │
└─────────────────────────────────────────────────────────────────────┘
```

**Warum Spark?** Die Blockchain ist riesig. Spark verteilt die Arbeit auf alle CPU-Kerne.

In [None]:
# ==============================================================================
# IMPORTS
# ==============================================================================
import os
import platform
from pathlib import Path
from itertools import combinations

import matplotlib.pyplot as plt
import seaborn as sns

from pyspark.sql import SparkSession, DataFrame
from pyspark.sql.functions import (
    col, count, sum as spark_sum, avg, desc, when, round as spark_round,
    explode, explode_outer, collect_set, size as spark_size,
    from_unixtime, to_timestamp, element_at
)
from pyspark.sql.types import (
    StructType, StructField, StringType, LongType, IntegerType,
    BooleanType, ArrayType
)
from pyspark.sql.functions import udf

# Visualization style
plt.style.use('seaborn-v0_8-whitegrid')
plt.rcParams.update({
    'figure.figsize': (12, 6),
    'figure.dpi': 100,
    'axes.titlesize': 14,
    'axes.titleweight': 'bold',
    'axes.labelsize': 11,
    'font.size': 10
})
COLORS = {'primary': '#2E86AB', 'secondary': '#A23B72', 'accent': '#F18F01', 'dark': '#1B1B1E'}

In [None]:
# ==============================================================================
# KONFIGURATION - PASSE DIE PFADE AN DEIN SYSTEM AN
# ==============================================================================

SYSTEM = platform.system()

# Pfad zu bitcoin-etl exportierten Daten
# Beispiele:
#   Windows: r"C:\Users\DeinName\blockchain_exports"
#   macOS:   "/Users/DeinName/blockchain_exports"
#   Linux:   "/home/DeinName/blockchain_exports"

if SYSTEM == "Windows":
    BLOCKCHAIN_DATA_PATH = r"C:\Users\YourName\blockchain_exports"  # <-- AENDERN
    OUTPUT_PATH = r"C:\Users\YourName\bitcoin_analysis_output"       # <-- AENDERN
elif SYSTEM == "Darwin":  # macOS
    BLOCKCHAIN_DATA_PATH = "/Users/roman/spark_project/blockchain_exports"  # <-- AENDERN
    OUTPUT_PATH = "/Users/roman/spark_project/bitcoin-whale-intelligence/data"
else:  # Linux
    BLOCKCHAIN_DATA_PATH = "/home/YourName/blockchain_exports"  # <-- AENDERN
    OUTPUT_PATH = "/home/YourName/bitcoin_analysis_output"

DRIVER_MEMORY = "16g"

# Output-Verzeichnis erstellen
Path(OUTPUT_PATH).mkdir(parents=True, exist_ok=True)

print(f"System: {SYSTEM}")
print(f"Datenquelle: {BLOCKCHAIN_DATA_PATH}")
print(f"Output-Pfad: {OUTPUT_PATH}")

---
## Schemas und Helper-Funktionen

```
┌─────────────────────────────────────────────────────────────────────┐
│                    JSON STRUKTUR (bitcoin-etl)                      │
├─────────────────────────────────────────────────────────────────────┤
│                                                                     │
│  Transaction JSON:                                                  │
│  {                                                                  │
│    "hash": "abc123...",                                            │
│    "block_number": 100000,                                         │
│    "inputs": [                      <-- Array von Inputs           │
│      { "spent_transaction_hash": "xyz", "value": 50000000 }       │
│    ],                                                               │
│    "outputs": [                     <-- Array von Outputs          │
│      { "index": 0, "value": 40000000, "addresses": ["1A..."] }    │
│    ]                                                                │
│  }                                                                  │
│                                                                     │
└─────────────────────────────────────────────────────────────────────┘
```

**Warum Schemas?** Spark kann die Daten schneller laden, wenn es die Struktur vorher kennt.

In [None]:
# ==============================================================================
# SCHEMAS FUER BITCOIN-ETL JSON DATEN
# ==============================================================================

INPUT_SCHEMA = StructType([
    StructField("index", IntegerType(), True),
    StructField("spent_transaction_hash", StringType(), True),
    StructField("spent_output_index", IntegerType(), True),
    StructField("script_asm", StringType(), True),
    StructField("script_hex", StringType(), True),
    StructField("sequence", LongType(), True),
    StructField("required_signatures", IntegerType(), True),
    StructField("type", StringType(), True),
    StructField("addresses", ArrayType(StringType()), True),
    StructField("value", LongType(), True),
])

OUTPUT_SCHEMA = StructType([
    StructField("index", IntegerType(), True),
    StructField("script_asm", StringType(), True),
    StructField("script_hex", StringType(), True),
    StructField("required_signatures", IntegerType(), True),
    StructField("type", StringType(), True),
    StructField("addresses", ArrayType(StringType()), True),
    StructField("value", LongType(), True),
])

TRANSACTION_SCHEMA = StructType([
    StructField("hash", StringType(), False),
    StructField("size", IntegerType(), True),
    StructField("virtual_size", IntegerType(), True),
    StructField("version", IntegerType(), True),
    StructField("lock_time", LongType(), True),
    StructField("block_number", LongType(), True),
    StructField("block_hash", StringType(), True),
    StructField("block_timestamp", LongType(), True),
    StructField("is_coinbase", BooleanType(), True),
    StructField("index", IntegerType(), True),
    StructField("inputs", ArrayType(INPUT_SCHEMA), True),
    StructField("outputs", ArrayType(OUTPUT_SCHEMA), True),
    StructField("input_count", IntegerType(), True),
    StructField("output_count", IntegerType(), True),
    StructField("input_value", LongType(), True),
    StructField("output_value", LongType(), True),
    StructField("fee", LongType(), True),
])

BLOCK_SCHEMA = StructType([
    StructField("hash", StringType(), False),
    StructField("size", IntegerType(), True),
    StructField("stripped_size", IntegerType(), True),
    StructField("weight", IntegerType(), True),
    StructField("number", LongType(), True),
    StructField("version", IntegerType(), True),
    StructField("merkle_root", StringType(), True),
    StructField("timestamp", LongType(), True),
    StructField("nonce", StringType(), True),
    StructField("bits", StringType(), True),
    StructField("coinbase_param", StringType(), True),
    StructField("transaction_count", IntegerType(), True),
])

print("Schemas definiert.")

In [None]:
# ==============================================================================
# HELPER FUNKTIONEN
# ==============================================================================

def create_spark_session(app_name="Bitcoin Whale Intelligence", driver_memory="8g",
                         enable_graphframes=True, suppress_logs=True):
    """
    Erstellt eine optimierte Spark Session fuer Bitcoin-Datenverarbeitung.
    """
    builder = SparkSession.builder \
        .appName(app_name) \
        .master("local[*]") \
        .config("spark.driver.memory", driver_memory) \
        .config("spark.driver.maxResultSize", "4g") \
        .config("spark.sql.adaptive.enabled", "true") \
        .config("spark.sql.adaptive.coalescePartitions.enabled", "true") \
        .config("spark.sql.adaptive.skewJoin.enabled", "true") \
        .config("spark.sql.shuffle.partitions", "200") \
        .config("spark.sql.debug.maxToStringFields", "100") \
        .config("spark.ui.showConsoleProgress", "false") \
        .config("spark.memory.fraction", "0.8") \
        .config("spark.memory.storageFraction", "0.3")

    if enable_graphframes:
        builder = builder.config(
            "spark.jars.packages",
            "graphframes:graphframes:0.8.3-spark3.5-s_2.12"
        )

    if suppress_logs:
        devnull = os.open(os.devnull, os.O_WRONLY)
        old_stdout_fd = os.dup(1)
        old_stderr_fd = os.dup(2)
        os.dup2(devnull, 1)
        os.dup2(devnull, 2)
        try:
            spark = builder.getOrCreate()
            spark.sparkContext.setLogLevel("ERROR")
        finally:
            os.dup2(old_stdout_fd, 1)
            os.dup2(old_stderr_fd, 2)
            os.close(devnull)
            os.close(old_stdout_fd)
            os.close(old_stderr_fd)
    else:
        spark = builder.getOrCreate()
        spark.sparkContext.setLogLevel("ERROR")

    return spark


def load_transactions(spark, base_path, use_schema=True):
    """
    Laedt bitcoin-etl Transaktionsdaten aus Hive-partitionierten JSON-Dateien.
    """
    base = Path(base_path)
    batch_folders = [d for d in base.iterdir() if d.is_dir() and not d.name.startswith('.')]

    if not batch_folders:
        raise ValueError(f"Keine Batch-Ordner gefunden in {base_path}")

    tx_paths = []
    for batch in batch_folders:
        tx_path = batch / "transactions"
        if tx_path.exists():
            tx_paths.append(str(tx_path))

    if not tx_paths:
        raise ValueError("Keine transactions/ Ordner gefunden")

    if use_schema:
        df = spark.read.schema(TRANSACTION_SCHEMA).json(tx_paths)
    else:
        df = spark.read.json(tx_paths)

    df = df.withColumn("block_datetime", to_timestamp(from_unixtime(col("block_timestamp"))))
    return df


def load_blocks(spark, base_path, use_schema=True):
    """
    Laedt bitcoin-etl Block-Daten aus Hive-partitionierten JSON-Dateien.
    """
    base = Path(base_path)
    batch_folders = [d for d in base.iterdir() if d.is_dir() and not d.name.startswith('.')]

    block_paths = []
    for batch in batch_folders:
        block_path = batch / "blocks"
        if block_path.exists():
            block_paths.append(str(block_path))

    if not block_paths:
        raise ValueError("Keine blocks/ Ordner gefunden")

    if use_schema:
        df = spark.read.schema(BLOCK_SCHEMA).json(block_paths)
    else:
        df = spark.read.json(block_paths)

    df = df.withColumn("timestamp_dt", to_timestamp(from_unixtime(col("timestamp"))))
    return df


print("Helper-Funktionen definiert.")

In [None]:
%%time
# Spark initialisieren
spark = create_spark_session(app_name="Bitcoin Whale Analysis", driver_memory=DRIVER_MEMORY, enable_graphframes=True)
spark.sparkContext.setCheckpointDir(str(Path(OUTPUT_PATH) / "checkpoints"))
print(f"Spark {spark.version} initialisiert | UI: {spark.sparkContext.uiWebUrl}")

---
## 4. Daten laden: JSON zu Spark DataFrames

```
┌─────────────────────────────────────────────────────────────────────┐
│                         DATEN LADEN                                 │
├─────────────────────────────────────────────────────────────────────┤
│                                                                     │
│  blockchain_exports/                                                │
│  ├── batch_0/                                                       │
│  │   ├── blocks/                                                    │
│  │   │   └── *.json         ──┐                                    │
│  │   └── transactions/         │                                    │
│  │       └── *.json         ───┼──▶  Spark DataFrame               │
│  ├── batch_1/                  │     (verteilt auf alle Kerne)     │
│  │   └── ...                ───┘                                    │
│  └── ...                                                            │
│                                                                     │
└─────────────────────────────────────────────────────────────────────┘
                                 │
                                 ▼
┌─────────────────────────────────────────────────────────────────────┐
│  tx_df:                                                             │
│  ┌────────────┬─────────────┬────────────┬────────────┐            │
│  │    hash    │ block_num   │  inputs[]  │ outputs[]  │            │
│  ├────────────┼─────────────┼────────────┼────────────┤            │
│  │  abc123... │   100000    │   [...]    │   [...]    │            │
│  │  def456... │   100000    │   [...]    │   [...]    │            │
│  │    ...     │    ...      │    ...     │    ...     │            │
│  └────────────┴─────────────┴────────────┴────────────┘            │
└─────────────────────────────────────────────────────────────────────┘
```

**Warum .cache()?** Die Daten werden im RAM gehalten, damit nachfolgende Operationen schneller sind.

In [None]:
%%time
# Transaktionen und Bloecke laden
tx_df = load_transactions(spark, BLOCKCHAIN_DATA_PATH).cache()
blocks_df = load_blocks(spark, BLOCKCHAIN_DATA_PATH).cache()

TX_COUNT = tx_df.count()
BLOCK_COUNT = blocks_df.count()

print(f"Geladen: {TX_COUNT:,} Transaktionen aus {BLOCK_COUNT:,} Bloecken")

---
## 5. Outputs extrahieren: Verschachtelt zu Flach

```
┌─────────────────────────────────────────────────────────────────────┐
│                    VORHER: VERSCHACHTELT (Nested)                   │
├─────────────────────────────────────────────────────────────────────┤
│                                                                     │
│  Transaction: "abc123"                                              │
│  outputs: [                                                         │
│    { index: 0, value: 5000000000, addresses: ["1A..."] },          │
│    { index: 1, value: 2000000000, addresses: ["1B..."] }           │
│  ]                                                                  │
│                                                                     │
│  --> 1 Zeile mit Array                                              │
│                                                                     │
└─────────────────────────────────────────────────────────────────────┘
                                 │
                                 │ explode()
                                 ▼
┌─────────────────────────────────────────────────────────────────────┐
│                    NACHHER: FLACH (Flat)                            │
├─────────────────────────────────────────────────────────────────────┤
│                                                                     │
│  ┌──────────┬─────────────┬──────────────┬────────────┐            │
│  │  tx_hash │ output_idx  │    value     │  addresses │            │
│  ├──────────┼─────────────┼──────────────┼────────────┤            │
│  │ abc123   │      0      │  5000000000  │  ["1A..."] │            │
│  │ abc123   │      1      │  2000000000  │  ["1B..."] │            │
│  └──────────┴─────────────┴──────────────┴────────────┘            │
│                                                                     │
│  --> 2 Zeilen, jede fuer einen Output                               │
│                                                                     │
└─────────────────────────────────────────────────────────────────────┘
```

**Warum flach?** So koennen wir jeden Output einzeln analysieren, joinen und aggregieren.

In [None]:
def explode_outputs(tx_df):
    """
    Transformiert verschachtelte Outputs zu einer flachen Tabelle.
    Jede Zeile = 1 Output.
    """
    return tx_df \
        .select(
            col("hash").alias("tx_hash"),
            col("block_number"),
            col("block_timestamp"),
            explode_outer("outputs").alias("output")
        ) \
        .select(
            "tx_hash",
            "block_number",
            "block_timestamp",
            col("output.index").alias("output_index"),
            col("output.value").alias("value"),
            col("output.addresses").alias("addresses"),
            col("output.type").alias("output_type"),
        )

# Outputs extrahieren
outputs_df = explode_outputs(tx_df).cache()
OUTPUT_COUNT = outputs_df.count()
print(f"Outputs extrahiert: {OUTPUT_COUNT:,}")

---
## 6. Inputs extrahieren: Welche Outputs wurden ausgegeben?

```
┌─────────────────────────────────────────────────────────────────────┐
│                    VORHER: VERSCHACHTELT (Nested)                   │
├─────────────────────────────────────────────────────────────────────┤
│                                                                     │
│  Transaction: "def456"                                              │
│  inputs: [                                                          │
│    { spent_transaction_hash: "abc123", spent_output_index: 0 },    │
│    { spent_transaction_hash: "xyz789", spent_output_index: 1 }     │
│  ]                                                                  │
│                                                                     │
│  --> Diese TX gibt 2 vorherige Outputs aus                          │
│                                                                     │
└─────────────────────────────────────────────────────────────────────┘
                                 │
                                 │ explode()
                                 ▼
┌─────────────────────────────────────────────────────────────────────┐
│                    NACHHER: FLACH (Flat)                            │
├─────────────────────────────────────────────────────────────────────┤
│                                                                     │
│  ┌──────────┬────────────────┬───────────────────┐                 │
│  │  tx_hash │  spent_tx_hash │ spent_output_idx  │                 │
│  ├──────────┼────────────────┼───────────────────┤                 │
│  │ def456   │     abc123     │         0         │  <-- Referenz   │
│  │ def456   │     xyz789     │         1         │      auf alte   │
│  └──────────┴────────────────┴───────────────────┘      Outputs    │
│                                                                     │
└─────────────────────────────────────────────────────────────────────┘
```

**Wichtig:** Ein Input ist eine REFERENZ auf einen alten Output. Der Input "verbraucht" diesen Output.

In [None]:
def explode_inputs(tx_df):
    """
    Transformiert verschachtelte Inputs zu einer flachen Tabelle.
    Jede Zeile = 1 Input (= 1 ausgegebener Output).
    """
    return tx_df \
        .select(
            col("hash").alias("tx_hash"),
            col("block_number"),
            col("block_timestamp"),
            col("is_coinbase"),
            explode_outer("inputs").alias("input")
        ) \
        .select(
            "tx_hash",
            "block_number",
            "block_timestamp",
            "is_coinbase",
            col("input.index").alias("input_index"),
            col("input.spent_transaction_hash").alias("spent_tx_hash"),
            col("input.spent_output_index").alias("spent_output_index"),
            col("input.addresses").alias("addresses"),
            col("input.value").alias("value"),
        )

# Inputs extrahieren
inputs_df = explode_inputs(tx_df).cache()
INPUT_COUNT = inputs_df.count()
print(f"Inputs extrahiert: {INPUT_COUNT:,}")

---
## 7. UTXO berechnen: Outputs MINUS Spent = Unspent

```
┌──────────────────────────────────────────────────────────────────────┐
│                       ALLE OUTPUTS                                   │
│  ┌─────────┐  ┌─────────┐  ┌─────────┐  ┌─────────┐                 │
│  │ TX-1:0  │  │ TX-1:1  │  │ TX-2:0  │  │ TX-3:0  │                 │
│  │ 5 BTC   │  │ 2 BTC   │  │ 10 BTC  │  │ 1 BTC   │                 │
│  └────┬────┘  └─────────┘  └─────────┘  └─────────┘                 │
│       │                                                              │
│       ▼ wurde ausgegeben                                             │
│  ┌─────────┐                                                         │
│  │ SPENT   │  (wurde als Input in einer anderen TX verwendet)        │
│  └─────────┘                                                         │
└──────────────────────────────────────────────────────────────────────┘
                              │
                              ▼ LEFT ANTI JOIN
┌──────────────────────────────────────────────────────────────────────┐
│                       UTXOs (Unspent)                                │
│  ┌─────────┐  ┌─────────┐  ┌─────────┐                              │
│  │ TX-1:1  │  │ TX-2:0  │  │ TX-3:0  │   = 13 BTC                   │
│  │ 2 BTC   │  │ 10 BTC  │  │ 1 BTC   │   (noch nicht ausgegeben)    │
│  └─────────┘  └─────────┘  └─────────┘                              │
└──────────────────────────────────────────────────────────────────────┘
```

**Warum LEFT ANTI JOIN?**
- Wir behalten alle Outputs, die NICHT in den Inputs vorkommen
- "Anti" bedeutet: Behalte nur Zeilen OHNE Match
- Das Ergebnis sind alle noch nicht ausgegebenen Outputs (UTXOs)

In [None]:
def compute_utxo_set(outputs_df, inputs_df):
    """
    Berechnet das UTXO Set (Unspent Transaction Outputs).
    UTXO = Alle Outputs MINUS die, die schon ausgegeben wurden.
    """
    # Alle ausgegebenen Outputs (Referenzen aus Inputs)
    spent_refs = inputs_df \
        .filter(col("is_coinbase") == False) \
        .select(
            col("spent_tx_hash").alias("ref_tx_hash"),
            col("spent_output_index").alias("ref_output_index")
        ) \
        .distinct()

    # LEFT ANTI JOIN: Behalte nur Outputs die NICHT spent sind
    utxos = outputs_df.join(
        spent_refs,
        on=[
            outputs_df.tx_hash == spent_refs.ref_tx_hash,
            outputs_df.output_index == spent_refs.ref_output_index
        ],
        how="left_anti"
    )
    return utxos

# UTXO Set berechnen
utxo_df = compute_utxo_set(outputs_df, inputs_df).cache()
UTXO_COUNT = utxo_df.count()
SPENT_COUNT = OUTPUT_COUNT - UTXO_COUNT

print(f"Outputs:  {OUTPUT_COUNT:,}")
print(f"Spent:    {SPENT_COUNT:,} ({SPENT_COUNT/OUTPUT_COUNT*100:.1f}%)")
print(f"UTXOs:    {UTXO_COUNT:,} ({UTXO_COUNT/OUTPUT_COUNT*100:.1f}%)")

---
## 8. Common Input Ownership Heuristic

Die wichtigste Annahme fuer Address-Clustering:

```
┌─────────────────────────────────────────────────────────────────────┐
│                    MULTI-INPUT TRANSAKTION                          │
├─────────────────────────────────────────────────────────────────────┤
│                                                                     │
│   ┌─────────────────┐                                               │
│   │  Input 1        │                                               │
│   │  Adresse A      │───┐                                           │
│   │  (braucht       │   │                                           │
│   │   Key A)        │   │      ┌────────────────┐                   │
│   └─────────────────┘   │      │                │                   │
│                         ├─────▶│  Transaktion   │────▶ Output       │
│   ┌─────────────────┐   │      │                │                   │
│   │  Input 2        │   │      └────────────────┘                   │
│   │  Adresse B      │───┘                                           │
│   │  (braucht       │                                               │
│   │   Key B)        │                                               │
│   └─────────────────┘                                               │
│                                                                     │
└─────────────────────────────────────────────────────────────────────┘
                              │
                              ▼ Schlussfolgerung
┌─────────────────────────────────────────────────────────────────────┐
│                                                                     │
│   Um diese Transaktion zu erstellen, brauchte man BEIDE Keys.       │
│                                                                     │
│   ┌─────────────────────────────────────────────────────┐           │
│   │         Adresse A  +  Adresse B                     │           │
│   │              =  SELBER BESITZER                     │           │
│   └─────────────────────────────────────────────────────┘           │
│                                                                     │
│   (mit hoher Wahrscheinlichkeit)                                    │
│                                                                     │
└─────────────────────────────────────────────────────────────────────┘
```

**Wichtig:** Wir nutzen nur INPUTS, nicht Outputs! Bei Outputs wissen wir nicht, wer der Empfaenger ist.

In [None]:
# Analyse: Wie viele Multi-Input Transaktionen gibt es?
input_dist = tx_df.filter(col("is_coinbase") == False) \
    .groupBy("input_count").agg(count("*").alias("tx_count")) \
    .orderBy("input_count").toPandas()

total_non_coinbase = input_dist['tx_count'].sum()
single_input = input_dist[input_dist['input_count'] == 1]['tx_count'].sum()
multi_input = input_dist[input_dist['input_count'] > 1]['tx_count'].sum()

print(f"Transaktions-Input Analyse")
print(f"{'='*50}")
print(f"Total (ohne Coinbase): {total_non_coinbase:,}")
print(f"Single-Input:          {single_input:,} ({single_input/total_non_coinbase*100:.1f}%)")
print(f"Multi-Input:           {multi_input:,} ({multi_input/total_non_coinbase*100:.1f}%) <- nutzbar fuer Clustering")

In [None]:
def enrich_clustering_inputs(tx_df, outputs_df, min_inputs=2, max_inputs=50):
    """
    Bereitet Inputs aus Multi-Input Transaktionen fuer Clustering vor.
    Wir brauchen die Adressen, aber die stehen manchmal nur im Original-Output.
    """
    # Nur Multi-Input Transaktionen (ohne Coinbase)
    multi_input_txs = tx_df \
        .filter(
            (col("input_count") >= min_inputs) &
            (col("input_count") <= max_inputs) &
            (col("is_coinbase") == False)
        )

    # Inputs exploden
    inputs_exploded = multi_input_txs \
        .select(
            col("hash").alias("tx_hash"),
            explode("inputs").alias("input")
        ) \
        .select(
            "tx_hash",
            col("input.spent_transaction_hash").alias("spent_tx_hash"),
            col("input.spent_output_index").alias("spent_output_index"),
            col("input.addresses").alias("raw_addresses"),
        )

    # Lookup-Tabelle: Original-Output -> Adresse
    output_lookup = outputs_df \
        .select(
            col("tx_hash").alias("source_tx_hash"),
            col("output_index").alias("source_output_index"),
            col("addresses").alias("source_addresses"),
        )

    # Join: Input -> Original Output -> Adresse
    enriched = inputs_exploded.join(
        output_lookup,
        on=[
            inputs_exploded.spent_tx_hash == output_lookup.source_tx_hash,
            inputs_exploded.spent_output_index == output_lookup.source_output_index
        ],
        how="left"
    )

    # Adresse extrahieren (erste Adresse, falls Array)
    enriched = enriched.withColumn(
        "address",
        when(
            (col("source_addresses").isNotNull()) & (spark_size(col("source_addresses")) > 0),
            element_at(col("source_addresses"), 1)
        ).otherwise(
            when(
                (col("raw_addresses").isNotNull()) & (spark_size(col("raw_addresses")) > 0),
                element_at(col("raw_addresses"), 1)
            )
        )
    )

    result = enriched \
        .filter(col("address").isNotNull()) \
        .select("tx_hash", "address")

    return result

# Inputs fuer Clustering vorbereiten
clustering_inputs = enrich_clustering_inputs(tx_df, outputs_df, min_inputs=2, max_inputs=50).cache()
print(f"Inputs fuer Clustering vorbereitet: {clustering_inputs.count():,}")

---
## 9. Graph bauen: Adressen als Netzwerk

```
┌─────────────────────────────────────────────────────────────────────┐
│                    TRANSAKTIONEN                                    │
├─────────────────────────────────────────────────────────────────────┤
│                                                                     │
│   TX-1: Inputs von [A, B]       TX-2: Inputs von [B, C]            │
│   TX-3: Inputs von [D, E]       TX-4: Inputs von [F]   (nur 1)     │
│                                                                     │
└─────────────────────────────────────────────────────────────────────┘
                              │
                              ▼ Kanten erstellen
┌─────────────────────────────────────────────────────────────────────┐
│                    GRAPH-KANTEN                                     │
├─────────────────────────────────────────────────────────────────────┤
│                                                                     │
│   ┌─────────────────────────────────────────────────────────┐      │
│   │  src  │  dst  │   Bedeutung                             │      │
│   ├───────┼───────┼─────────────────────────────────────────┤      │
│   │   A   │   B   │   A und B waren zusammen Input in TX-1  │      │
│   │   B   │   C   │   B und C waren zusammen Input in TX-2  │      │
│   │   D   │   E   │   D und E waren zusammen Input in TX-3  │      │
│   └───────┴───────┴─────────────────────────────────────────┘      │
│                                                                     │
│   TX-4 erzeugt keine Kante (nur 1 Input)                           │
│                                                                     │
└─────────────────────────────────────────────────────────────────────┘
                              │
                              ▼ Als Graph
┌─────────────────────────────────────────────────────────────────────┐
│                                                                     │
│        A ─────── B ─────── C              D ─────── E               │
│                                                                     │
│        └─── Entity 1 ───┘                 └ Entity 2┘               │
│                                                                     │
└─────────────────────────────────────────────────────────────────────┘
```

**Transitive Verknuepfung:** A-B (TX-1) + B-C (TX-2) = A-B-C sind EINE Entity!

In [None]:
%%time
# Pro Transaktion: Alle Adressen sammeln
tx_addresses = clustering_inputs.groupBy("tx_hash").agg(collect_set("address").alias("addresses"))
tx_addresses = tx_addresses.filter(spark_size("addresses") >= 2)

# Kanten erstellen: Jedes Paar von Adressen in einer TX
def create_edges_udf(addresses):
    if not addresses or len(addresses) < 2:
        return []
    # Alle Paare bilden (Kombinationen)
    return [(a, b) for a, b in combinations(sorted(addresses), 2)]

edge_schema = ArrayType(StructType([StructField("src", StringType()), StructField("dst", StringType())]))
create_edges = udf(create_edges_udf, edge_schema)

# Kanten DataFrame
edges_df = tx_addresses.withColumn("edges", create_edges("addresses")) \
    .select(explode("edges").alias("edge")) \
    .select(col("edge.src").alias("src"), col("edge.dst").alias("dst")).distinct()

# Speichern (fuer Checkpointing)
edges_path = str(Path(OUTPUT_PATH) / "edges_temp.parquet")
edges_df.write.mode("overwrite").parquet(edges_path)
edges_df = spark.read.parquet(edges_path).cache()

# Knoten (alle Adressen)
vertices_df = edges_df.select(col("src").alias("id")).union(edges_df.select(col("dst").alias("id"))).distinct()
vertices_path = str(Path(OUTPUT_PATH) / "vertices_temp.parquet")
vertices_df.write.mode("overwrite").parquet(vertices_path)
vertices_df = spark.read.parquet(vertices_path).cache()

EDGE_COUNT = edges_df.count()
VERTEX_COUNT = vertices_df.count()
print(f"Graph erstellt: {VERTEX_COUNT:,} Knoten (Adressen) | {EDGE_COUNT:,} Kanten (Co-Input Paare)")

---
## 10. Connected Components: Entities finden

```
┌─────────────────────────────────────────────────────────────────────┐
│                    CONNECTED COMPONENTS ALGORITHMUS                 │
├─────────────────────────────────────────────────────────────────────┤
│                                                                     │
│   Graph:                                                            │
│                                                                     │
│       A ─── B ─── C           D ─── E           F                   │
│                                                                     │
│   A und C sind nicht direkt verbunden, aber transitiv ueber B.      │
│                                                                     │
└─────────────────────────────────────────────────────────────────────┘
                              │
                              ▼ Connected Components
┌─────────────────────────────────────────────────────────────────────┐
│                    ERGEBNIS: ENTITIES                               │
├─────────────────────────────────────────────────────────────────────┤
│                                                                     │
│   ┌────────────────┐  ┌────────────────┐  ┌────────────────┐       │
│   │   Entity 1     │  │   Entity 2     │  │   Entity 3     │       │
│   │   A, B, C      │  │   D, E         │  │   F            │       │
│   │                │  │                │  │   (isoliert)   │       │
│   └────────────────┘  └────────────────┘  └────────────────┘       │
│                                                                     │
│   Jede zusammenhaengende Komponente = 1 Entity (Besitzer)          │
│                                                                     │
└─────────────────────────────────────────────────────────────────────┘
```

**GraphFrames:** Spark-Bibliothek fuer Graph-Algorithmen auf grossen Datenmengen.

In [None]:
%%time
# GraphFrames importieren und Graph erstellen
from graphframes import GraphFrame

graph = GraphFrame(vertices_df, edges_df)

# Connected Components berechnen
# Jede Adresse bekommt eine "component" ID - alle Adressen mit gleicher ID gehoeren zusammen
entities_df = graph.connectedComponents(algorithm="graphframes", checkpointInterval=1, broadcastThreshold=100000)

# Ergebnis speichern
entities_path = str(Path(OUTPUT_PATH) / "entities_temp.parquet")
entities_df.write.mode("overwrite").parquet(entities_path)
entities_df = spark.read.parquet(entities_path).cache()

ADDRESS_COUNT = entities_df.count()
ENTITY_COUNT = entities_df.select("component").distinct().count()
REDUCTION = (1 - ENTITY_COUNT/ADDRESS_COUNT) * 100

print(f"\nClustering Ergebnis")
print(f"{'='*50}")
print(f"Adressen analysiert:   {ADDRESS_COUNT:,}")
print(f"Entities identifiziert: {ENTITY_COUNT:,}")
print(f"Reduktion:             {REDUCTION:.1f}%")

---
## 11. Whale Detection: Grosse Besitzer finden

```
┌─────────────────────────────────────────────────────────────────────┐
│                    ENTITY BALANCE BERECHNUNG                        │
├─────────────────────────────────────────────────────────────────────┤
│                                                                     │
│   ┌───────────────────┐      ┌───────────────────┐                 │
│   │  entities         │      │  utxos            │                 │
│   ├───────────────────┤      ├───────────────────┤                 │
│   │ Adresse │ Entity  │      │ Adresse │  BTC    │                 │
│   │    A    │   1     │      │    A    │  500    │                 │
│   │    B    │   1     │      │    B    │  300    │                 │
│   │    C    │   1     │      │    C    │  200    │                 │
│   │    D    │   2     │      │    D    │   50    │                 │
│   └───────────────────┘      └───────────────────┘                 │
│                                                                     │
└─────────────────────────────────────────────────────────────────────┘
                              │
                              ▼ JOIN auf Adresse
┌─────────────────────────────────────────────────────────────────────┐
│                    JOINED                                           │
├─────────────────────────────────────────────────────────────────────┤
│   ┌────────────────────────────────────┐                           │
│   │ Adresse │ Entity │  BTC            │                           │
│   │    A    │   1    │  500            │                           │
│   │    B    │   1    │  300            │                           │
│   │    C    │   1    │  200            │                           │
│   │    D    │   2    │   50            │                           │
│   └────────────────────────────────────┘                           │
└─────────────────────────────────────────────────────────────────────┘
                              │
                              ▼ GROUP BY Entity, SUM(BTC)
┌─────────────────────────────────────────────────────────────────────┐
│                    ENTITY BALANCES                                  │
├─────────────────────────────────────────────────────────────────────┤
│   ┌───────────────────────────────┐                                │
│   │ Entity │  Total BTC           │                                │
│   │   1    │  1.000 BTC   <- WAL! │                                │
│   │   2    │     50 BTC           │                                │
│   └───────────────────────────────┘                                │
└─────────────────────────────────────────────────────────────────────┘
```

**Ergebnis:** Entity 1 kontrolliert 1.000 BTC ueber 3 Adressen - das ist ein Wal!

In [None]:
%%time
# Entity-Mapping: Adresse -> Entity ID
entities_final = entities_df.select(col("id").alias("address"), col("component").alias("entity_id"))

# UTXOs explodieren (eine Zeile pro Adresse)
utxo_exploded = utxo_df.select(col("tx_hash"), col("output_index"), col("value"), explode(col("addresses")).alias("address"))

# Join: UTXO -> Entity
utxo_with_entities = utxo_exploded.join(entities_final, "address", "inner").select("entity_id", "value")

# Aggregation: Summe pro Entity
entity_balances = utxo_with_entities.groupBy("entity_id").agg(
    spark_sum("value").alias("balance_satoshi"),
    count("*").alias("utxo_count")
).withColumn("balance_btc", spark_round(col("balance_satoshi") / 100000000, 8)).orderBy(desc("balance_btc")).cache()

ENTITIES_WITH_BALANCE = entity_balances.count()
TOTAL_BTC = entity_balances.agg(spark_sum("balance_btc")).collect()[0][0]

print(f"Entity Balance Zusammenfassung")
print(f"{'='*50}")
print(f"Entities mit Guthaben: {ENTITIES_WITH_BALANCE:,}")
print(f"Total BTC in UTXOs:    {TOTAL_BTC:,.2f} BTC")
print(f"Durchschnitt/Entity:   {TOTAL_BTC/ENTITIES_WITH_BALANCE:.4f} BTC")

In [None]:
# Top 20 Wale
top_whales = entity_balances.limit(20).toPandas()
TOP_20_BTC = top_whales['balance_btc'].sum()
TOP_20_SHARE = TOP_20_BTC / TOTAL_BTC * 100

print(f"Top 20 Wale")
print(f"{'='*70}")
print(f"{'Rang':<6} {'Entity ID':<20} {'Balance (BTC)':<18} {'UTXOs':<10} {'Anteil'}")
print(f"{'-'*70}")
for i, row in top_whales.iterrows():
    share = row['balance_btc'] / TOTAL_BTC * 100
    print(f"#{i+1:<5} {int(row['entity_id']):<20} {row['balance_btc']:>15,.2f}   {int(row['utxo_count']):>8}   {share:>5.2f}%")
print(f"{'-'*70}")
print(f"{'Top 20 Total:':<27} {TOP_20_BTC:>15,.2f} BTC         {TOP_20_SHARE:>5.2f}%")

In [None]:
# Entity-Kategorisierung
entity_categories = entity_balances.withColumn("category",
    when(col("balance_btc") >= 1000, "Mega Whale (1000+ BTC)")
    .when(col("balance_btc") >= 100, "Whale (100-1000 BTC)")
    .when(col("balance_btc") >= 10, "Large (10-100 BTC)")
    .when(col("balance_btc") >= 1, "Medium (1-10 BTC)")
    .otherwise("Small (<1 BTC)")
)

category_stats = entity_categories.groupBy("category").agg(
    count("*").alias("entity_count"),
    spark_sum("balance_btc").alias("total_btc")
).orderBy(desc("total_btc")).toPandas()

print("Kategorie-Verteilung:")
print(category_stats.to_string(index=False))

---
## 11. Visualisierung: Whale-Verteilung

```
┌─────────────────────────────────────────────────────────────────────┐
│                    REICHTUMSKONZENTRATION                           │
├─────────────────────────────────────────────────────────────────────┤
│                                                                     │
│   BTC-Verteilung (wer hat wie viel?):                              │
│                                                                     │
│   Mega Whales  ████████████████████████████████████████  (>60%)    │
│   Whales       ██████████████                            (~25%)    │
│   Large        ██████                                    (~10%)    │
│   Medium/Small ██                                        (~5%)     │
│                                                                     │
│   Entity-Verteilung (wie viele gibt es?):                          │
│                                                                     │
│   Mega Whales  ██                                        (<1%)     │
│   Whales       ████                                      (~5%)     │
│   Large        ████████████                              (~15%)    │
│   Medium/Small ████████████████████████████████████████  (>75%)    │
│                                                                     │
│   Erkenntnis: Wenige Mega Whales halten den Grossteil der BTC!     │
│                                                                     │
└─────────────────────────────────────────────────────────────────────┘
```

In [None]:
# ==============================================================================
# VISUALISIERUNG: Whale-Verteilung als Balkendiagramme
# ==============================================================================

fig, axes = plt.subplots(1, 3, figsize=(16, 6))

# Chart 1: Top 10 Wale (bleibt gleich)
top_10 = entity_balances.limit(10).toPandas()
bars = axes[0].barh(range(len(top_10)), top_10['balance_btc'], color=COLORS['primary'])
axes[0].set_xlabel('Balance (BTC)')
axes[0].set_ylabel('Entity Rang')
axes[0].set_title('Top 10 Wale nach Balance')
axes[0].set_yticks(range(len(top_10)))
axes[0].set_yticklabels([f'#{i+1}' for i in range(len(top_10))])
axes[0].invert_yaxis()
for bar, val in zip(bars, top_10['balance_btc']):
    axes[0].text(val + val*0.02, bar.get_y() + bar.get_height()/2, f'{val:,.0f}', va='center', fontsize=9)

# Chart 2: Entity-Verteilung (horizontales Balkendiagramm)
colors_bar = [COLORS['secondary'], COLORS['accent'], COLORS['primary'], '#6B7280', '#D1D5DB']
y_pos = range(len(category_stats))
entity_pct = category_stats['entity_count'] / category_stats['entity_count'].sum() * 100

bars2 = axes[1].barh(y_pos, entity_pct, color=colors_bar[:len(category_stats)])
axes[1].set_xlabel('Anteil (%)')
axes[1].set_title('Entity-Verteilung: Wie viele Entities pro Kategorie?')
axes[1].set_yticks(y_pos)
axes[1].set_yticklabels(category_stats['category'])
axes[1].set_xlim(0, 100)
for bar, pct, cnt in zip(bars2, entity_pct, category_stats['entity_count']):
    axes[1].text(pct + 1, bar.get_y() + bar.get_height()/2, f'{pct:.1f}% ({cnt:,})', va='center', fontsize=9)

# Chart 3: BTC-Verteilung (horizontales Balkendiagramm)
btc_pct = category_stats['total_btc'] / category_stats['total_btc'].sum() * 100

bars3 = axes[2].barh(y_pos, btc_pct, color=colors_bar[:len(category_stats)])
axes[2].set_xlabel('Anteil (%)')
axes[2].set_title('BTC-Verteilung: Wie viel BTC pro Kategorie?')
axes[2].set_yticks(y_pos)
axes[2].set_yticklabels(category_stats['category'])
axes[2].set_xlim(0, 100)
for bar, pct, btc in zip(bars3, btc_pct, category_stats['total_btc']):
    axes[2].text(pct + 1, bar.get_y() + bar.get_height()/2, f'{pct:.1f}% ({btc:,.0f} BTC)', va='center', fontsize=9)

plt.tight_layout()
plt.savefig(str(Path(OUTPUT_PATH) / 'whale_analysis.png'), dpi=150, bbox_inches='tight')
plt.show()

print("")
print("Interpretation:")
print("- Links: Die absoluten Balancen der groessten Wale")
print("- Mitte: Wenige Mega Whales, viele kleine Entities")
print("- Rechts: Mega Whales halten den Grossteil der BTC (extreme Konzentration)")

---
## 12. Zeitreihen-Analyse: Whale-Balance ueber Zeit

```
┌─────────────────────────────────────────────────────────────────────────────┐
│                    WARUM ZEITREIHEN-ANALYSE?                                │
├─────────────────────────────────────────────────────────────────────────────┤
│                                                                             │
│   Bisher: Snapshot                        Neu: Historie                     │
│                                                                             │
│   ┌─────────────────────┐                ┌─────────────────────────────┐   │
│   │ Entity 1: 1000 BTC  │                │ Entity 1:                   │   │
│   │ (Stand: heute)      │                │                             │   │
│   └─────────────────────┘                │  1000 ┤        ╭───────     │   │
│                                          │   800 ┤    ╭───╯            │   │
│   Wir wissen:                            │   600 ┤ ╭──╯                │   │
│   - Aktueller Stand                      │   400 ┤╭╯                   │   │
│   - Anzahl UTXOs                         │       └─────────────────▶   │   │
│                                          │        2015   2020   2024   │   │
│   Wir wissen NICHT:                      │                             │   │
│   - Seit wann?                           │   Akkumulation erkennbar!   │   │
│   - Kauft oder verkauft?                 └─────────────────────────────┘   │
│   - Wachstums-Trend?                                                        │
│                                                                             │
└─────────────────────────────────────────────────────────────────────────────┘
```

**Ziel:** Fuer jede Entity rekonstruieren, wie sich das Guthaben ueber Zeit entwickelt hat.

**Ansatz:**
1. Jeden Output mit Erstellungs-Zeitpunkt tracken (+BTC)
2. Jeden Input mit Ausgabe-Zeitpunkt tracken (-BTC)
3. Kumulative Summe = Balance zu jedem Zeitpunkt

### Schritt 12a: UTXO History erstellen

```
┌─────────────────────────────────────────────────────────────────────────────┐
│                         OUTPUTS (mit Zeitstempel)                           │
├─────────────────────────────────────────────────────────────────────────────┤
│  tx_hash │ idx │  value  │ address │ created_at (block_timestamp)          │
│  abc123  │  0  │ 50 BTC  │   1A... │ 2015-01-01                            │
│  abc123  │  1  │ 10 BTC  │   1B... │ 2015-01-01                            │
│  def456  │  0  │ 30 BTC  │   1A... │ 2018-06-15                            │
└─────────────────────────────────────────────────────────────────────────────┘
                                    │
                                    ▼ LEFT JOIN mit Inputs
┌─────────────────────────────────────────────────────────────────────────────┐
│                         UTXO HISTORY                                        │
├─────────────────────────────────────────────────────────────────────────────┤
│  tx_hash │ idx │  value  │ address │ created_at  │ spent_at               │
│  abc123  │  0  │ 50 BTC  │   1A... │ 2015-01-01  │ 2018-06-15  (spent!)   │
│  abc123  │  1  │ 10 BTC  │   1B... │ 2015-01-01  │ NULL        (UTXO)     │
│  def456  │  0  │ 30 BTC  │   1A... │ 2018-06-15  │ NULL        (UTXO)     │
└─────────────────────────────────────────────────────────────────────────────┘
```

**Warum?** Um Balance ueber Zeit zu berechnen, brauchen wir fuer jeden Output:
- `created_at`: Wann wurde er erstellt? (= Einzahlung auf die Adresse)
- `spent_at`: Wann wurde er ausgegeben? (= Auszahlung, NULL wenn noch UTXO)

In [None]:
%%time
# ==============================================================================
# UTXO HISTORY: Jeden Output mit Erstellungs- und Ausgabe-Zeitpunkt
# ==============================================================================

from pyspark.sql.functions import date_trunc, lit, coalesce
from pyspark.sql.window import Window

# Spent-Referenzen mit Zeitstempel (wann wurde der Output ausgegeben?)
spent_info = inputs_df \
    .filter(col("is_coinbase") == False) \
    .select(
        col("spent_tx_hash").alias("ref_tx_hash"),
        col("spent_output_index").alias("ref_output_index"),
        col("block_timestamp").alias("spent_timestamp")
    )

# Outputs mit Entity-ID anreichern (erste Adresse extrahieren)
outputs_with_addr = outputs_df \
    .withColumn("address", element_at(col("addresses"), 1)) \
    .filter(col("address").isNotNull())

# Join mit Entities
outputs_with_entity = outputs_with_addr.join(
    entities_final,
    "address",
    "inner"
)

# UTXO History: LEFT JOIN mit spent_info
utxo_history = outputs_with_entity.join(
    spent_info,
    (outputs_with_entity.tx_hash == spent_info.ref_tx_hash) & 
    (outputs_with_entity.output_index == spent_info.ref_output_index),
    "left"
).select(
    outputs_with_entity.tx_hash,
    outputs_with_entity.output_index,
    outputs_with_entity.value,
    outputs_with_entity.address,
    outputs_with_entity.entity_id,
    col("block_timestamp").alias("created_timestamp"),
    col("spent_timestamp")  # NULL wenn noch UTXO
).cache()

HISTORY_COUNT = utxo_history.count()
SPENT_IN_HISTORY = utxo_history.filter(col("spent_timestamp").isNotNull()).count()
UNSPENT_IN_HISTORY = HISTORY_COUNT - SPENT_IN_HISTORY

print(f"UTXO History erstellt")
print(f"{'='*50}")
print(f"Total Outputs mit Entity: {HISTORY_COUNT:,}")
print(f"Davon spent:              {SPENT_IN_HISTORY:,}")
print(f"Davon unspent (UTXOs):    {UNSPENT_IN_HISTORY:,}")

### Schritt 12b: Balance-Events erstellen

```
┌─────────────────────────────────────────────────────────────────────────────┐
│                         BALANCE EVENTS                                      │
├─────────────────────────────────────────────────────────────────────────────┤
│                                                                             │
│   Fuer jede Entity sammeln wir alle "Buchungen":                           │
│                                                                             │
│   ┌─────────────────────────────────────────────────────────────────────┐   │
│   │ entity_id │   datum    │   delta   │  event_type                    │   │
│   ├───────────┼────────────┼───────────┼────────────────────────────────┤   │
│   │     1     │ 2015-01-01 │  +50 BTC  │  CREATED (Output erstellt)     │   │
│   │     1     │ 2016-03-15 │  +20 BTC  │  CREATED                       │   │
│   │     1     │ 2018-06-15 │  -50 BTC  │  SPENT   (Output ausgegeben)   │   │
│   │     1     │ 2020-01-01 │ +100 BTC  │  CREATED                       │   │
│   └───────────┴────────────┴───────────┴────────────────────────────────┘   │
│                                                                             │
│   Dann: Kumulative Summe = Balance zu jedem Zeitpunkt                       │
│                                                                             │
│   2015-01-01:  +50         =   50 BTC                                       │
│   2016-03-15:  +50 +20     =   70 BTC                                       │
│   2018-06-15:  +50 +20 -50 =   20 BTC                                       │
│   2020-01-01:  ... +100    =  120 BTC                                       │
│                                                                             │
└─────────────────────────────────────────────────────────────────────────────┘
```

In [None]:
%%time
# ==============================================================================
# BALANCE EVENTS: +delta fuer Created, -delta fuer Spent
# ==============================================================================

# Event 1: Output erstellt = +value (Einzahlung)
created_events = utxo_history.select(
    col("entity_id"),
    date_trunc("day", from_unixtime(col("created_timestamp"))).alias("event_date"),
    col("value").alias("delta"),
    lit("CREATED").alias("event_type")
)

# Event 2: Output spent = -value (Auszahlung)
spent_events = utxo_history \
    .filter(col("spent_timestamp").isNotNull()) \
    .select(
        col("entity_id"),
        date_trunc("day", from_unixtime(col("spent_timestamp"))).alias("event_date"),
        (-col("value")).alias("delta"),
        lit("SPENT").alias("event_type")
    )

# Alle Events zusammenfuehren
balance_events = created_events.unionAll(spent_events).cache()

EVENT_COUNT = balance_events.count()
CREATED_COUNT = balance_events.filter(col("event_type") == "CREATED").count()
SPENT_EVENT_COUNT = balance_events.filter(col("event_type") == "SPENT").count()

print(f"Balance Events erstellt")
print(f"{'='*50}")
print(f"Total Events:    {EVENT_COUNT:,}")
print(f"CREATED (+BTC):  {CREATED_COUNT:,}")
print(f"SPENT (-BTC):    {SPENT_EVENT_COUNT:,}")

In [None]:
%%time
# ==============================================================================
# DAILY BALANCE TIMELINE: Kumulative Summe pro Entity
# ==============================================================================

# Aggregation: Netto-Veraenderung pro Entity und Tag
daily_changes = balance_events \
    .groupBy("entity_id", "event_date") \
    .agg(spark_sum("delta").alias("daily_delta"))

# Window Function: Kumulative Summe ueber Zeit
window_spec = Window \
    .partitionBy("entity_id") \
    .orderBy("event_date") \
    .rowsBetween(Window.unboundedPreceding, Window.currentRow)

entity_balance_timeline = daily_changes \
    .withColumn("cumulative_satoshi", spark_sum("daily_delta").over(window_spec)) \
    .withColumn("cumulative_btc", spark_round(col("cumulative_satoshi") / 100000000, 4)) \
    .orderBy("entity_id", "event_date") \
    .cache()

TIMELINE_ROWS = entity_balance_timeline.count()
TIMELINE_ENTITIES = entity_balance_timeline.select("entity_id").distinct().count()

print(f"Balance Timeline erstellt")
print(f"{'='*50}")
print(f"Timeline-Eintraege: {TIMELINE_ROWS:,}")
print(f"Entities mit Historie: {TIMELINE_ENTITIES:,}")

# Beispiel: Timeline einer Entity anzeigen
print(f"\nBeispiel - Timeline von Top Whale:")
top_entity_id = entity_balances.first()["entity_id"]
entity_balance_timeline.filter(col("entity_id") == top_entity_id) \
    .select("event_date", "daily_delta", "cumulative_btc") \
    .orderBy("event_date") \
    .show(10, truncate=False)

### Schritt 12c: Whale-Trajektorien

Visualisierung der Balance-Entwicklung der Top 5 Whales ueber Zeit.

In [None]:
# ==============================================================================
# VISUALISIERUNG: Top 5 Whale Balance ueber Zeit
# ==============================================================================

# Top 5 Whales nach aktuellem Balance
top_5_whales = entity_balances.limit(5).select("entity_id").toPandas()["entity_id"].tolist()

# Timeline fuer Top 5 extrahieren
top_whale_timeline = entity_balance_timeline \
    .filter(col("entity_id").isin(top_5_whales)) \
    .select("entity_id", "event_date", "cumulative_btc") \
    .toPandas()

# Plot erstellen
fig, ax = plt.subplots(figsize=(14, 7))

colors_line = [COLORS['primary'], COLORS['secondary'], COLORS['accent'], '#6B7280', '#10B981']

for i, entity_id in enumerate(top_5_whales):
    entity_data = top_whale_timeline[top_whale_timeline['entity_id'] == entity_id].sort_values('event_date')
    if len(entity_data) > 0:
        ax.plot(entity_data['event_date'], entity_data['cumulative_btc'], 
                label=f'Entity {entity_id}', color=colors_line[i % len(colors_line)], linewidth=2)

ax.set_xlabel('Datum', fontsize=12)
ax.set_ylabel('Balance (BTC)', fontsize=12)
ax.set_title('Top 5 Whale Balance-Entwicklung ueber Zeit', fontsize=14, fontweight='bold')
ax.legend(loc='upper left')
ax.grid(True, alpha=0.3)

# Y-Achse formatieren
ax.yaxis.set_major_formatter(plt.FuncFormatter(lambda x, p: f'{x:,.0f}'))

plt.tight_layout()
plt.savefig(str(Path(OUTPUT_PATH) / 'whale_timeline.png'), dpi=150, bbox_inches='tight')
plt.show()

print("Top 5 Whale Trajektorien visualisiert.")

### Schritt 12d: Kategorien ueber Zeit

```
┌─────────────────────────────────────────────────────────────────────────────┐
│                    BTC-VERTEILUNG NACH KATEGORIE UEBER ZEIT                 │
├─────────────────────────────────────────────────────────────────────────────┤
│                                                                             │
│   100% ┌────────────────────────────────────────────────────────────┐      │
│        │████████████████████████████████████████████████████████████│      │
│    80% │████████████████████████████ Mega Whales ███████████████████│      │
│        │████████████████████████████████████████████████████████████│      │
│    60% │▓▓▓▓▓▓▓▓▓▓▓▓▓▓▓▓▓▓▓▓▓▓▓▓▓▓▓▓▓▓▓▓▓▓▓▓▓▓▓▓▓▓▓▓▓▓▓▓▓▓▓▓▓▓▓▓▓▓│      │
│        │▓▓▓▓▓▓▓▓▓▓▓▓▓▓▓▓▓▓▓ Whales ▓▓▓▓▓▓▓▓▓▓▓▓▓▓▓▓▓▓▓▓▓▓▓▓▓▓▓▓▓▓▓│      │
│    40% │▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒│      │
│        │▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒ Large ▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒│      │
│    20% │░░░░░░░░░░░░░░░░░░░░░░░░░░░░░░░░░░░░░░░░░░░░░░░░░░░░░░░░░░│      │
│        │░░░░░░░░░░░░░░░░░░░ Medium/Small ░░░░░░░░░░░░░░░░░░░░░░░░░│      │
│     0% └────────────────────────────────────────────────────────────┘      │
│           2010    2014    2018    2022    2024                              │
│                                                                             │
│   Frage: Wie hat sich die Konzentration ueber Zeit veraendert?             │
│                                                                             │
└─────────────────────────────────────────────────────────────────────────────┘
```

In [None]:
# ==============================================================================
# KATEGORIE-BALANCE UEBER ZEIT
# ==============================================================================

# Entity-Kategorien mit Timeline verbinden
# Hinweis: Wir verwenden die AKTUELLE Kategorie (vereinfacht)
entity_with_category = entity_balances.select(
    col("entity_id"),
    when(col("balance_btc") >= 1000, "Mega Whale (1000+ BTC)")
    .when(col("balance_btc") >= 100, "Whale (100-1000 BTC)")
    .when(col("balance_btc") >= 10, "Large (10-100 BTC)")
    .otherwise("Small/Medium (<10 BTC)").alias("category")
)

# Timeline mit Kategorie joinen
timeline_with_category = entity_balance_timeline.join(
    entity_with_category,
    "entity_id",
    "inner"
)

# Aggregation pro Datum und Kategorie (monatlich fuer bessere Uebersicht)
from pyspark.sql.functions import year, month, concat_ws

monthly_category_balance = timeline_with_category \
    .withColumn("year_month", concat_ws("-", year(col("event_date")), month(col("event_date")))) \
    .groupBy("year_month", "category") \
    .agg(spark_sum("cumulative_btc").alias("total_btc")) \
    .orderBy("year_month", "category") \
    .toPandas()

print(f"Monatliche Kategorie-Aggregation: {len(monthly_category_balance)} Eintraege")
print(monthly_category_balance.head(10))

In [None]:
# ==============================================================================
# AKKUMULATION VS DISTRIBUTION: Netto-Veraenderung der Whales
# ==============================================================================

# Nur Whale-Kategorien (>= 100 BTC)
whale_entity_ids = entity_balances \
    .filter(col("balance_btc") >= 100) \
    .select("entity_id") \
    .toPandas()["entity_id"].tolist()

# Monatliche Netto-Veraenderung aller Whales zusammen
whale_monthly_change = balance_events \
    .filter(col("entity_id").isin(whale_entity_ids)) \
    .withColumn("year_month", concat_ws("-", 
        year(col("event_date")), 
        when(month(col("event_date")) < 10, concat_ws("", lit("0"), month(col("event_date"))))
        .otherwise(month(col("event_date")))
    )) \
    .groupBy("year_month") \
    .agg(
        spark_sum("delta").alias("net_change_satoshi"),
        spark_sum(when(col("delta") > 0, col("delta")).otherwise(0)).alias("inflows"),
        spark_sum(when(col("delta") < 0, col("delta")).otherwise(0)).alias("outflows")
    ) \
    .withColumn("net_change_btc", spark_round(col("net_change_satoshi") / 100000000, 2)) \
    .withColumn("inflows_btc", spark_round(col("inflows") / 100000000, 2)) \
    .withColumn("outflows_btc", spark_round(col("outflows") / 100000000, 2)) \
    .orderBy("year_month") \
    .toPandas()

print(f"Whale Akkumulation/Distribution Analyse")
print(f"{'='*60}")
print(f"Zeitraeume analysiert: {len(whale_monthly_change)}")
print(f"\nLetzte 12 Monate:")
print(whale_monthly_change.tail(12)[['year_month', 'net_change_btc', 'inflows_btc', 'outflows_btc']].to_string(index=False))

In [None]:
# ==============================================================================
# VISUALISIERUNG: Whale-Aktivitaet ueber Zeit
# ==============================================================================

fig, axes = plt.subplots(2, 1, figsize=(14, 10))

# Nur die letzten 24 Monate (oder alle, wenn weniger)
recent_data = whale_monthly_change.tail(24).copy()

if len(recent_data) > 0:
    # Chart 1: Kumulative Netto-Veraenderung (zeigt Trend besser)
    recent_data['cumulative_net'] = recent_data['net_change_btc'].cumsum()
    
    ax1 = axes[0]
    ax1.fill_between(range(len(recent_data)), recent_data['cumulative_net'], 
                     where=recent_data['cumulative_net'] >= 0, alpha=0.3, color='green', label='Akkumulation')
    ax1.fill_between(range(len(recent_data)), recent_data['cumulative_net'], 
                     where=recent_data['cumulative_net'] < 0, alpha=0.3, color='red', label='Distribution')
    ax1.plot(range(len(recent_data)), recent_data['cumulative_net'], color='black', linewidth=2)
    ax1.axhline(y=0, color='gray', linestyle='--', linewidth=1)
    ax1.set_xlabel('Monat')
    ax1.set_ylabel('Kumulative Netto-Veraenderung (BTC)')
    ax1.set_title('Whale-Trend: Kaufen oder Verkaufen die Wale insgesamt?', fontsize=14, fontweight='bold')
    ax1.set_xticks(range(0, len(recent_data), max(1, len(recent_data)//6)))
    ax1.set_xticklabels([recent_data['year_month'].iloc[i] for i in range(0, len(recent_data), max(1, len(recent_data)//6))], rotation=45)
    ax1.legend(loc='upper left')
    ax1.grid(True, alpha=0.3)
    
    # Annotation
    final_value = recent_data['cumulative_net'].iloc[-1]
    trend_text = 'AKKUMULATION' if final_value > 0 else 'DISTRIBUTION'
    ax1.annotate(f'Gesamt: {final_value:+,.0f} BTC\n({trend_text})', 
                 xy=(len(recent_data)-1, final_value), fontsize=12, fontweight='bold',
                 color='green' if final_value > 0 else 'red')

    # Chart 2: Monatliche Aktivitaet (Inflows vs Outflows)
    x = range(len(recent_data))
    width = 0.8
    
    # Stacked bar: Inflows oben, Outflows unten
    axes[1].bar(x, recent_data['inflows_btc'], width, label='Empfangen (neue UTXOs)', color='green', alpha=0.7)
    axes[1].bar(x, recent_data['outflows_btc'], width, label='Ausgegeben (UTXOs spent)', color='red', alpha=0.7)
    axes[1].axhline(y=0, color='black', linestyle='-', linewidth=0.5)
    axes[1].set_xlabel('Monat')
    axes[1].set_ylabel('BTC')
    axes[1].set_title('Monatliche Whale-Aktivitaet: Wie viel BTC bewegt?', fontsize=14, fontweight='bold')
    axes[1].set_xticks(range(0, len(recent_data), max(1, len(recent_data)//6)))
    axes[1].set_xticklabels([recent_data['year_month'].iloc[i] for i in range(0, len(recent_data), max(1, len(recent_data)//6))], rotation=45)
    axes[1].legend(loc='upper right')
    axes[1].grid(True, alpha=0.3, axis='y')

plt.tight_layout()
plt.savefig(str(Path(OUTPUT_PATH) / 'whale_activity.png'), dpi=150, bbox_inches='tight')
plt.show()

# Erklaerung
print("")
print("Was zeigen diese Grafiken?")
print("="*60)
print("")
print("Chart 1 - Kumulativer Trend:")
print("  Zeigt ob Whales INSGESAMT mehr kaufen oder verkaufen.")
print("  Gruen = Akkumulation (Whales horten BTC)")
print("  Rot = Distribution (Whales verkaufen BTC)")
print("")
print("Chart 2 - Monatliche Aktivitaet:")
print("  Gruene Balken = BTC die Whales ERHALTEN haben")
print("  Rote Balken = BTC die Whales AUSGEGEBEN haben")
print("  Je hoeher die Balken, desto mehr Aktivitaet.")

### Interpretation der Zeitreihen-Analyse

```
┌─────────────────────────────────────────────────────────────────────────────┐
│                    ERKENNTNISSE AUS DER ZEITREIHEN-ANALYSE                  │
├─────────────────────────────────────────────────────────────────────────────┤
│                                                                             │
│  1. WHALE-TRAJEKTORIEN                                                      │
│     - Zeigen unterschiedliche Strategien (Akkumulatoren vs. Trader)        │
│     - Sprunghafte Anstiege = grosse Einzeltransaktionen                    │
│     - Flache Linien = HODLer (halten ohne Aktivitaet)                      │
│                                                                             │
│  2. AKKUMULATION VS DISTRIBUTION                                            │
│     - Gruene Balken = Whales kaufen netto                                  │
│     - Rote Balken = Whales verkaufen netto                                 │
│     - Kann als Sentiment-Indikator verwendet werden                        │
│                                                                             │
│  3. LIMITATIONEN                                                            │
│     - Nur Adressen aus Multi-Input TXs sind geclustert                     │
│     - Exchange-Wallets koennen das Bild verzerren                          │
│     - Kategorie-Zuordnung basiert auf aktuellem Stand                      │
│                                                                             │
│  4. WARUM SPARK HIER NOETIG IST                                            │
│     - Window Functions ueber Millionen von Events                          │
│     - Partitionierung nach Entity fuer parallele Berechnung                │
│     - Bei 620GB Blockchain: nur mit Cluster skalierbar                     │
│                                                                             │
└─────────────────────────────────────────────────────────────────────────────┘
```

**Naechste Schritte:**
- Korrelation mit Bitcoin-Preis analysieren
- Whale-Bewegungen vor grossen Preisaenderungen untersuchen
- Exchange-Wallets identifizieren und separat behandeln

In [None]:
# Daten speichern
entities_final.write.mode("overwrite").parquet(str(Path(OUTPUT_PATH) / "entities.parquet"))
utxo_df.write.mode("overwrite").parquet(str(Path(OUTPUT_PATH) / "utxos.parquet"))
outputs_df.write.mode("overwrite").parquet(str(Path(OUTPUT_PATH) / "outputs.parquet"))
inputs_df.write.mode("overwrite").parquet(str(Path(OUTPUT_PATH) / "inputs.parquet"))

print(f"Daten exportiert nach {OUTPUT_PATH}")

---
## Executive Summary

In [None]:
# Zusammenfassende Metriken berechnen
mega_whales = category_stats[category_stats['category'] == 'Mega Whale (1000+ BTC)']
MEGA_WHALE_COUNT = int(mega_whales['entity_count'].values[0]) if len(mega_whales) > 0 else 0
MEGA_WHALE_BTC = float(mega_whales['total_btc'].values[0]) if len(mega_whales) > 0 else 0
MEGA_WHALE_SHARE = MEGA_WHALE_BTC / TOTAL_BTC * 100 if TOTAL_BTC > 0 else 0

print(f"""
{'='*70}
                    BITCOIN WHALE INTELLIGENCE REPORT
{'='*70}

DATENUMFANG
{'-'*70}
  Bloecke analysiert:      {BLOCK_COUNT:>15,}
  Transaktionen:           {TX_COUNT:>15,}
  Outputs erstellt:        {OUTPUT_COUNT:>15,}
  UTXOs (unspent):         {UTXO_COUNT:>15,}

CLUSTERING ERGEBNISSE
{'-'*70}
  Adressen geclustert:     {ADDRESS_COUNT:>15,}
  Entities identifiziert:  {ENTITY_COUNT:>15,}
  Clustering-Reduktion:    {REDUCTION:>14.1f}%

WHALE-ANALYSE
{'-'*70}
  Total BTC erfasst:       {TOTAL_BTC:>15,.2f} BTC
  Mega Whales (1000+ BTC): {MEGA_WHALE_COUNT:>15,}
  Mega Whale Holdings:     {MEGA_WHALE_BTC:>15,.2f} BTC ({MEGA_WHALE_SHARE:.1f}%)
  Top 20 Konzentration:    {TOP_20_BTC:>15,.2f} BTC ({TOP_20_SHARE:.1f}%)

WICHTIGE ERKENNTNISSE
{'-'*70}
  1. Hohe Konzentration: Top 20 Entities kontrollieren {TOP_20_SHARE:.1f}% der BTC
  2. Power-Law Verteilung: Wenige grosse Wale, viele kleine Entities
  3. Clustering effektiv: {REDUCTION:.1f}% Adress-Reduktion erreicht

WAHRSCHEINLICHE WAL-IDENTITAETEN
{'-'*70}
  - Mega Whales (1000+ BTC): Early Adopter, Boersen, Institutionen
  - Whales (100-1000 BTC): Mining Pools, grosse Trader, Services
  - Large (10-100 BTC): Aktive Trader, Unternehmen, wohlhabende Privatpersonen

{'='*70}
""")

In [None]:
# Abschluss-Visualisierung: Key Metrics Overview
fig, ax = plt.subplots(figsize=(14, 8))

metrics = [
    f'Transaktionen\n{TX_COUNT:,}',
    f'Adressen\n{ADDRESS_COUNT:,}',
    f'Entities\n{ENTITY_COUNT:,}',
    f'Total BTC\n{TOTAL_BTC:,.0f}',
    f'Mega Whales\n{MEGA_WHALE_COUNT}'
]
values = [TX_COUNT, ADDRESS_COUNT, ENTITY_COUNT, TOTAL_BTC, MEGA_WHALE_COUNT]
normalized = [v / max(values) for v in values]

bars = ax.bar(metrics, normalized, color=[COLORS['primary'], COLORS['secondary'], COLORS['accent'], COLORS['dark'], COLORS['secondary']])
ax.set_ylabel('Relative Skala', fontsize=12)
ax.set_title('Bitcoin Whale Intelligence: Wichtige Metriken', fontsize=16, fontweight='bold', pad=20)
ax.set_ylim(0, 1.15)

for bar, val, norm in zip(bars, values, normalized):
    if val >= 1000:
        label = f'{val:,.0f}'
    else:
        label = str(int(val))
    ax.text(bar.get_x() + bar.get_width()/2, norm + 0.03, label, ha='center', va='bottom', fontsize=11, fontweight='bold')

ax.text(0.5, -0.15, f'Top 20 Whale Konzentration: {TOP_20_SHARE:.1f}% aller BTC | Clustering Reduktion: {REDUCTION:.1f}%',
        transform=ax.transAxes, ha='center', fontsize=12, style='italic', color=COLORS['dark'])

ax.spines['top'].set_visible(False)
ax.spines['right'].set_visible(False)
ax.set_yticks([])

plt.tight_layout()
plt.savefig(str(Path(OUTPUT_PATH) / 'executive_summary.png'), dpi=150, bbox_inches='tight')
plt.show()

In [None]:
# Spark Session (optional beenden)
# spark.stop()
print("Analyse abgeschlossen. Spark Session bleibt aktiv fuer weitere Erkundung.")

---
## Zusammenfassung: Die komplette Pipeline

```
┌─────────────────────────────────────────────────────────────────────────────┐
│                    BITCOIN WHALE INTELLIGENCE PIPELINE                      │
├─────────────────────────────────────────────────────────────────────────────┤
│                                                                             │
│   ┌─────────┐                                                               │
│   │  JSON   │  Blockchain-Daten von bitcoin-etl                            │
│   └────┬────┘                                                               │
│        │                                                                    │
│        ▼                                                                    │
│   ┌─────────────────────────────────────────────────────┐                  │
│   │ Schritt 1+2: Outputs & Inputs extrahieren (explode) │                  │
│   └────────────────────────┬────────────────────────────┘                  │
│                            │                                                │
│        ┌───────────────────┼───────────────────┐                           │
│        ▼                   ▼                   ▼                           │
│   ┌─────────┐         ┌─────────┐         ┌─────────┐                      │
│   │ outputs │         │ inputs  │         │ inputs  │                      │
│   └────┬────┘         └────┬────┘         └────┬────┘                      │
│        │                   │                   │                           │
│        └─────────┬─────────┘                   │                           │
│                  ▼                             │                           │
│   ┌──────────────────────────┐                 │                           │
│   │ Schritt 3: UTXO berechnen│                 │                           │
│   │   (LEFT ANTI JOIN)       │                 │                           │
│   └────────────┬─────────────┘                 │                           │
│                │                               │                           │
│                ▼                               ▼                           │
│   ┌─────────┐              ┌───────────────────────────────┐               │
│   │  utxos  │              │ Schritt 4a: Multi-Input Kanten│               │
│   └────┬────┘              │        (Adress-Paare)         │               │
│        │                   └───────────────┬───────────────┘               │
│        │                                   │                               │
│        │                                   ▼                               │
│        │                   ┌───────────────────────────────┐               │
│        │                   │ Schritt 4b: Connected         │               │
│        │                   │   Components (GraphFrames)    │               │
│        │                   └───────────────┬───────────────┘               │
│        │                                   │                               │
│        │                                   ▼                               │
│        │                              ┌─────────┐                          │
│        │                              │entities │                          │
│        │                              └────┬────┘                          │
│        │                                   │                               │
│        └─────────────────┬─────────────────┘                               │
│                          ▼                                                 │
│           ┌──────────────────────────────┐                                 │
│           │ Schritt 5: Whale Detection   │                                 │
│           │   (JOIN + GROUP BY + SUM)    │                                 │
│           └──────────────┬───────────────┘                                 │
│                          │                                                 │
│                          ▼                                                 │
│                     ┌─────────┐                                            │
│                     │  WALE   │  Entity-Balancen (Snapshot)                │
│                     └────┬────┘                                            │
│                          │                                                 │
│   ┌──────────────────────┴───────────────────────────┐                     │
│   │                                                  │                     │
│   ▼                                                  ▼                     │
│   ┌──────────────────────────────┐    ┌──────────────────────────────┐    │
│   │ Schritt 6: Zeitreihen        │    │ Visualisierung               │    │
│   │   - UTXO History             │    │   - Top Whale Balancen       │    │
│   │   - Balance Events (+/-)     │    │   - Verteilung nach Kat.     │    │
│   │   - Window Functions         │    │   - Executive Summary        │    │
│   │   - Akkumulation/Distribution│    │                              │    │
│   └──────────────────────────────┘    └──────────────────────────────┘    │
│                  │                                                         │
│                  ▼                                                         │
│        ┌─────────────────────┐                                             │
│        │ Whale-Trajektorien  │  Balance-Entwicklung ueber Zeit            │
│        └─────────────────────┘                                             │
│                                                                             │
└─────────────────────────────────────────────────────────────────────────────┘
```