In [1]:
import pandas as pd
import numpy as np
from pandas.core.interchange.dataframe_protocol import DataFrame
from tqdm import tqdm
from pathlib import Path

INPUT_DIRECTORY = Path("../../../data/parquet/00-1-product-batches")
cols_to_load = ["snapshot_times", "delivery_start", "side", "orderbook_level"]
parquet_files = sorted(list(INPUT_DIRECTORY.glob("*.parquet")))

# count lines in all files with tqdm
all_lines = 0
all_lines_nodup = 0
latest_delivery_start = None
for f in tqdm(parquet_files[:10], desc="Zähle Zeilen in Batches"):
    df = pd.read_parquet(f, columns=cols_to_load)
    all_lines += len(df)

print(f"Anzahl Zeilen in allen Batches: {all_lines}")  # 1152795704
print(f"Anzahl Zeilen in allen Batches ohne Duplikate: {all_lines_nodup}")  # 29691286

Zähle Zeilen in Batches: 100%|██████████| 10/10 [00:01<00:00,  7.12it/s]

Anzahl Zeilen in allen Batches: 70157731
Anzahl Zeilen in allen Batches ohne Duplikate: 0





In [None]:
import pandas as pd
import numpy as np
from tqdm import tqdm
from pathlib import Path

INPUT_DIRECTORY = Path("/data/parquet/02-0-product-batches-technical-cleaned")
parquet_files = sorted(list(INPUT_DIRECTORY.glob("*.parquet")))

# count lines per delivery_start

delivery_start_counts = pd.DataFrame(columns=["delivery_start", "count"])
for f in tqdm(parquet_files, desc="Zähle Zeilen pro delivery_start"):
    df = pd.read_parquet(f, columns=["delivery_start"])
    for delivery_start, count in df['delivery_start'].value_counts().items():
        delivery_start_counts = pd.concat(
            [delivery_start_counts, pd.DataFrame({"delivery_start": [delivery_start], "count": [count]})],
            ignore_index=True)

print(f"Anzahl unterschiedlicher delivery_starts: {len(delivery_start_counts)}")  # 29691286
print("Anzahl gesamt:", delivery_start_counts['count'].sum())  # 1152795704

In [None]:
# check if length of batch file matches with sum of counts for delivery_starts in that batch
batch_file_path = Path(
    "/data/parquet/00-1-product-batches/2023-11-21T23-00-00_to_2023-11-26T02-00-00.parquet")

df_batch = pd.read_parquet(batch_file_path, columns=["delivery_start"])
first_delivery_start = df_batch['delivery_start'].min()
last_delivery_start = df_batch['delivery_start'].max()
batch_length = len(df_batch)
print(f"Batch Länge: {batch_length}")

relevant_counts = delivery_start_counts[
    (delivery_start_counts['delivery_start'] >= first_delivery_start) &
    (delivery_start_counts['delivery_start'] <= last_delivery_start)
    ]
sum_relevant_counts = relevant_counts['count'].sum()
print(f"Summe der counts für delivery_starts in diesem Batch: {sum_relevant_counts}")

assert batch_length == sum_relevant_counts, "Länge des Batch stimmt nicht mit Summe der counts überein!"



In [11]:
INPUT_DIRECTORY = Path("../../../data/parquet/02-1-product-batches-crossed-cleaned")
cols_to_load = ["snapshot_times", "delivery_start", "side", "orderbook_level"]
parquet_files = sorted(list(INPUT_DIRECTORY.glob("*.parquet")))

# count lines in all files with tqdm
all_lines = 0
all_lines_nodup = 0
latest_delivery_start = None
#init empty dataf
df = pd.DataFrame()
for f in tqdm(parquet_files[:10], desc="Zähle Zeilen in Batches"):
    file_df = pd.read_parquet(f, columns=cols_to_load)
    # append to df
    df = pd.concat([df, file_df], ignore_index=True)


Zähle Zeilen in Batches: 100%|██████████| 10/10 [00:06<00:00,  1.56it/s]


In [12]:
def analyze_empty_sides_with_neighborhood(df: pd.DataFrame, snapshot_keys: list = ['snapshot_times', 'delivery_start']):
    """
    Analysiert Snapshots mit leeren Seiten und prüft den Zustand ihrer direkten Nachbarn
    innerhalb desselben Produkts.
    """
    print("\n" + "=" * 50)
    print("Analyse leerer Seiten mit Nachbarschaftsprüfung gestartet...")

    if df.empty:
        print("  -> DataFrame ist leer. Keine Analyse möglich.")
        return

    # --- Schritt 1: Erstelle eine Zustandsübersicht für JEDEN Snapshot ---
    tqdm.write("  - Schritt 1/4: Erstelle eine Zustandsübersicht für alle Snapshots...")
    side_counts = df.groupby(snapshot_keys)['side'].value_counts().unstack(fill_value=0)

    # Sicherstellen, dass beide Spalten existieren, auch wenn eine Seite nie vorkommt
    if 'BID' not in side_counts: side_counts['BID'] = 0
    if 'ASK' not in side_counts: side_counts['ASK'] = 0

    snapshot_summary = side_counts.reset_index()
    snapshot_summary['is_bid_empty'] = snapshot_summary['BID'] == 0
    snapshot_summary['is_ask_empty'] = snapshot_summary['ASK'] == 0
    snapshot_summary['has_empty_side'] = snapshot_summary['is_bid_empty'] | snapshot_summary['is_ask_empty']
    snapshot_summary['is_plausible'] = (snapshot_summary['BID'] > 0) & (snapshot_summary['ASK'] > 0)

    total_snapshots = len(snapshot_summary)
    print(f"  Gesamtanzahl einzigartiger Snapshots im Datensatz: {total_snapshots:,}")

    # --- Schritt 2: Isoliere die zu untersuchenden "leeren" Snapshots ---
    tqdm.write("  - Schritt 2/4: Isoliere Snapshots mit mindestens einer leeren Seite...")
    empty_side_snapshots = snapshot_summary[snapshot_summary['has_empty_side']].copy()
    num_empty_side = len(empty_side_snapshots)

    if num_empty_side == 0:
        print("  -> Keine Snapshots mit leeren Seiten gefunden. Analyse beendet.")
        print("=" * 50 + "\n")
        return

    percent_empty = (num_empty_side / total_snapshots) * 100
    print(f"  Anzahl Snapshots mit mind. einer leeren Seite: {num_empty_side:,} ({percent_empty:.2f}%)")

    # --- Schritt 3: Führe die Nachbarschaftsanalyse durch ---
    tqdm.write("  - Schritt 3/4: Analysiere die Nachbarschaft jedes leeren Snapshots...")

    # Sortiere den gesamten Datensatz für eine korrekte Nachbarschaftssuche
    snapshot_summary_sorted = snapshot_summary.sort_values(by=['delivery_start', 'snapshot_times']).reset_index(
        drop=True)

    # Erstelle Spalten für die Nachbar-Zustände
    snapshot_summary_sorted['prev_is_plausible'] = (
            (snapshot_summary_sorted['delivery_start'] == snapshot_summary_sorted['delivery_start'].shift(1)) &
            (snapshot_summary_sorted['is_plausible'].shift(1))
    )
    snapshot_summary_sorted['next_is_plausible'] = (
            (snapshot_summary_sorted['delivery_start'] == snapshot_summary_sorted['delivery_start'].shift(-1)) &
            (snapshot_summary_sorted['is_plausible'].shift(-1))
    )

    # Filtere die Ergebnisse nur für die "leeren" Snapshots
    neighborhood_results = snapshot_summary_sorted[snapshot_summary_sorted['has_empty_side']].copy()

    # --- Schritt 4: Klassifiziere und zähle die Ergebnisse ---
    tqdm.write("  - Schritt 4/4: Klassifiziere und zähle die Ergebnisse...")

    is_sandwiched = neighborhood_results['prev_is_plausible'] & neighborhood_results['next_is_plausible']
    is_leading = ~neighborhood_results['prev_is_plausible'] & neighborhood_results['next_is_plausible']
    is_trailing = neighborhood_results['prev_is_plausible'] & ~neighborhood_results['next_is_plausible']
    is_isolated = ~neighborhood_results['prev_is_plausible'] & ~neighborhood_results['next_is_plausible']

    stats = {
        'Sandwiched': is_sandwiched.sum(),
        'Leading Edge': is_leading.sum(),
        'Trailing Edge': is_trailing.sum(),
        'Isolated': is_isolated.sum()
    }

    # --- Ergebnisse ausgeben ---
    print("\n" + "-" * 50)
    print("--- ERGEBNISSE DER NACHBARSCHAFTSANALYSE ---")
    print("Definition 'Plausibel': Snapshot hat Orders auf BEIDEN (Bid & Ask) Seiten.")
    print(f"Analysierte Snapshots (mit leeren Seiten): {num_empty_side:,}\n")

    for category, count in stats.items():
        percentage = (count / num_empty_side) * 100 if num_empty_side > 0 else 0
        print(f"{category:<15}: {count:>10,} ({percentage:>6.2f}%)")

    print("\nErläuterung:")
    print("  - Sandwiched:    Ein kurzer 'Aussetzer' in einem ansonsten liquiden Markt.")
    print("  - Leading Edge:  Beginn einer liquiden Marktphase.")
    print("  - Trailing Edge: Ende einer liquiden Marktphase.")
    print("  - Isolated:      Teil eines längeren illiquiden Zeitraums.")
    print("=" * 50 + "\n")


analyze_empty_sides_with_neighborhood(df)



Analyse leerer Seiten mit Nachbarschaftsprüfung gestartet...
  - Schritt 1/4: Erstelle eine Zustandsübersicht für alle Snapshots...
  Gesamtanzahl einzigartiger Snapshots im Datensatz: 1,764,515
  - Schritt 2/4: Isoliere Snapshots mit mindestens einer leeren Seite...
  Anzahl Snapshots mit mind. einer leeren Seite: 24,070 (1.36%)
  - Schritt 3/4: Analysiere die Nachbarschaft jedes leeren Snapshots...
  - Schritt 4/4: Klassifiziere und zähle die Ergebnisse...

--------------------------------------------------
--- ERGEBNISSE DER NACHBARSCHAFTSANALYSE ---
Definition 'Plausibel': Snapshot hat Orders auf BEIDEN (Bid & Ask) Seiten.
Analysierte Snapshots (mit leeren Seiten): 24,070

Sandwiched     :        316 (  1.31%)
Leading Edge   :        837 (  3.48%)
Trailing Edge  :        822 (  3.42%)
Isolated       :     22,095 ( 91.79%)

Erläuterung:
  - Sandwiched:    Ein kurzer 'Aussetzer' in einem ansonsten liquiden Markt.
  - Leading Edge:  Beginn einer liquiden Marktphase.
  - Trailing Edge

In [13]:
def analyze_empty_sides_by_time_to_delivery(df: pd.DataFrame,
                                            snapshot_keys: list = ['snapshot_times', 'delivery_start']):
    """
    Analysiert das Vorkommen von leeren Buchseiten in Relation zur Zeit bis zur Lieferung.
    """
    print("\n" + "=" * 60)
    print("Analyse leerer Seiten relativ zur Zeit bis Lieferung gestartet...")

    if df.empty:
        print("  -> DataFrame ist leer. Keine Analyse möglich.")
        return

    # --- Schritt 1: Erforderliche Zeit- und Zustandsspalten erstellen ---
    tqdm.write("  - Schritt 1/4: Erstelle Zustands- und Zeitübersicht für alle Snapshots...")

    # Konvertiere Spalten, falls nötig (robust)
    df['snapshot_times'] = pd.to_datetime(df['snapshot_times'])
    df['delivery_start'] = pd.to_datetime(df['delivery_start'])

    # Erstelle eine Basis-Tabelle mit allen einzigartigen Snapshots
    snapshot_summary = df[snapshot_keys].drop_duplicates().copy()
    total_snapshots = len(snapshot_summary)

    if total_snapshots == 0:
        print("  -> Keine einzigartigen Snapshots im DataFrame gefunden.")
        return

    print(f"  Gesamtanzahl einzigartiger Snapshots im Datensatz: {total_snapshots:,}")

    # Berechne die Zeit bis zur Lieferung für jeden Snapshot
    snapshot_summary['time_to_delivery_min'] = \
        (snapshot_summary['delivery_start'] - snapshot_summary['snapshot_times']).dt.total_seconds() / 60

    # Ermittle, welche Seiten für jeden Snapshot existieren
    side_counts = df.groupby(snapshot_keys)['side'].value_counts().unstack(fill_value=0)
    if 'BID' not in side_counts: side_counts['BID'] = 0
    if 'ASK' not in side_counts: side_counts['ASK'] = 0

    snapshot_summary = snapshot_summary.merge(side_counts, on=snapshot_keys, how='left').fillna(0)
    snapshot_summary['is_bid_empty'] = snapshot_summary['BID'] == 0
    snapshot_summary['is_ask_empty'] = snapshot_summary['ASK'] == 0

    # --- Schritt 2: Gruppiere die Snapshots in Zeitfenster (Bins) ---
    tqdm.write("  - Schritt 2/4: Gruppiere Snapshots in 30-Minuten-Zeitfenster...")
    # Erstelle Bins von 0 bis 300 Minuten (5 Stunden) in 30-Minuten-Schritten
    bins = range(0, 301, 30)
    labels = [f"{i}-{i + 30} min" for i in bins[:-1]]
    snapshot_summary['time_bin'] = pd.cut(snapshot_summary['time_to_delivery_min'], bins=bins, labels=labels,
                                          right=False)

    # Ignoriere Snapshots außerhalb des 5-Stunden-Fensters für eine saubere Analyse
    snapshot_summary.dropna(subset=['time_bin'], inplace=True)

    # --- Schritt 3: Aggregiere die Statistiken pro Zeitfenster ---
    tqdm.write("  - Schritt 3/4: Aggregiere Statistiken pro Zeitfenster...")
    analysis_df = snapshot_summary.groupby('time_bin').agg(
        total_snapshots=('snapshot_times', 'count'),
        empty_bids=('is_bid_empty', 'sum'),
        empty_asks=('is_ask_empty', 'sum')
    ).reset_index()

    # --- Schritt 4: Berechne Raten und gib die finale Tabelle aus ---
    tqdm.write("  - Schritt 4/4: Berechne Raten und formatiere die Ausgabe...")
    analysis_df['empty_bid_rate_%'] = (analysis_df['empty_bids'] / analysis_df['total_snapshots']) * 100
    analysis_df['empty_ask_rate_%'] = (analysis_df['empty_asks'] / analysis_df['total_snapshots']) * 100

    print("\n" + "-" * 60)
    print("--- ERGEBNISSE: ANTEIL LEERER SEITEN PRO ZEITFENSTER BIS LIEFERUNG ---")
    print("-" * 60)
    print(f"{'Time To Delivery':<20} | {'Total Snapshots':>18} | {'Empty Bid Rate':>18} | {'Empty Ask Rate':>18}")
    print(f"{'-' * 20:20} | {'-' * 18:18} | {'-' * 18:18} | {'-' * 18:18}")

    for _, row in analysis_df.iterrows():
        print(
            f"{row['time_bin']:<20} | {int(row['total_snapshots']):>18,} | {row['empty_bid_rate_%']:>15.2f}% | {row['empty_ask_rate_%']:>15.2f}%"
        )
    print("=" * 60 + "\n")


analyze_empty_sides_by_time_to_delivery(df)


Analyse leerer Seiten relativ zur Zeit bis Lieferung gestartet...
  - Schritt 1/4: Erstelle Zustands- und Zeitübersicht für alle Snapshots...
  Gesamtanzahl einzigartiger Snapshots im Datensatz: 1,764,515
  - Schritt 2/4: Gruppiere Snapshots in 30-Minuten-Zeitfenster...
  - Schritt 3/4: Aggregiere Statistiken pro Zeitfenster...
  - Schritt 4/4: Berechne Raten und formatiere die Ausgabe...

------------------------------------------------------------
--- ERGEBNISSE: ANTEIL LEERER SEITEN PRO ZEITFENSTER BIS LIEFERUNG ---
------------------------------------------------------------
Time To Delivery     |    Total Snapshots |     Empty Bid Rate |     Empty Ask Rate
-------------------- | ------------------ | ------------------ | ------------------
0-30 min             |            148,970 |            0.00% |            0.00%
30-60 min            |            178,750 |            0.00% |            0.02%
60-90 min            |            178,992 |            0.14% |            0.25%
90-12

  analysis_df = snapshot_summary.groupby('time_bin').agg(
