## MagnusWeb Panel Construction: Intuition & Methods

This notebook cell transforms our raw MagnusWeb exports into a **firm–year panel** ready for panel regressions (e.g. fixed-effects, VARs). Below is a summary of the key design choices and processing steps:

---

### 1. Why a Firm–Year "Wide" Panel?

- **Model-ready**: Most econometric libraries (`linearmodels.PanelOLS`, `statsmodels`, etc.) expect one row per firm × year, with each variable in its own column.  
- **Efficient storage**: 185 k firms × ~24 years ≈ 4–5 million rows, ~25 variables → ~200 MB.  
- **No repeated melts**: Once built, we can `read_parquet()` in ~2 s and immediately start modeling.

---

### 2. Reading & Concatenating Exports Lazily

- We scan all `export-*.csv` files with **Polars' `scan_csv`**, which builds a **lazy execution graph** instead of loading data immediately.  
- This keeps memory usage low and lets Polars fuse operations for maximum speed.

---

### 3. Static vs. Time-coded Columns

- **Static columns** (`STATIC_COLS`): firm identifiers, location, NACE/OKEČ codes, audit flags, dates.  
- **Time columns**: every header containing a year/quarter (e.g. `"2022/4Q Náklady"`, `"2019 Obrat Výnosy"`).  

Separating them lets us **melt only the time columns** into a long format without touching firm metadata.

---

### 4. Building the Long Table

1. **Melt** time columns into two new fields:  
   - `raw` (original header)  
   - `val` (numeric value)  
2. **Extract** `year` (4-digit) and `quarter` (`1Q`–`4Q`) via regex.  
3. **Derive** `metric_cs` by stripping the leading "YYYY/4Q " or "4Q/YYYY " or "YYYY ".  
4. **Map** Czech metric names (e.g. `"Náklady"`) to canonical English slugs (`costs`, `profit_pre_tax`, etc.).  
5. **Filter** to keep **only annual snapshots** or **4Q observations** (we treat `quarter == 4` as year-end).

This yields a **long** table with columns `[IČO, year, quarter, metric, val, ...other_static_cols]`.

---

### 5. Pivoting to the Final Wide Panel

- The long table is **pivoted** on `IČO` and `year`, with each unique `metric` slug becoming its own column.
- Static firm metadata is deduplicated and joined back to create the final dataset.
- **Result**: A wide firm-year panel where each row represents a unique firm-year observation, with all financial metrics as separate columns (e.g., `profit_pre_tax`, `total_assets`, `sales_revenue`).

This structure is optimized for econometric modeling and can be directly used in panel regression analyses.

Note: Before saving the final panel to Parquet, we remove columns that are static for each ICO (do not change across years), including original Czech financial metric columns. This ensures the output file contains only time-variant and relevant columns for econometric analysis. See code section before Parquet write for details.

In [13]:
import os, re, polars as pl

In [14]:
# ------------------------------------------------------------------
# 1. folders
# ------------------------------------------------------------------
project_root  = os.path.abspath(os.path.join(os.getcwd(), ".."))
in_dir        = os.path.join(project_root, "data", "source_raw",  "magnusweb")
out_dir       = os.path.join(project_root, "data", "source_cleaned")
os.makedirs(out_dir, exist_ok=True)


In [15]:
# ------------------------------------------------------------------
# 2. read all exports lazily
# ------------------------------------------------------------------
csv_files = sorted(
    f for f in os.listdir(in_dir) if f.startswith("export-") and f.endswith(".csv")
)
if not csv_files:
    raise FileNotFoundError("No export-*.csv files found!")

lazy_frames = [
    pl.scan_csv(
        os.path.join(in_dir, f),
        separator=";",
        quote_char='"',                # handle quoted fields correctly
        encoding="utf8",
        try_parse_dates=False,         # faster – we parse dates later
        infer_schema_length=0,         # let Polars sample entire file to infer dtypes
    )
    for f in csv_files
]

raw = pl.concat(lazy_frames, how="vertical")        # still lazy
# No need to strip quotes manually anymore
print(f"Found  {len(csv_files)}  CSV parts ➜ concatenated lazily")

Found  8  CSV parts ➜ concatenated lazily


In [16]:
# ------------------------------------------------------------------
# 3. identify static-vs-time columns
# ------------------------------------------------------------------
STATIC_COLS = [
    "IČO", "Název subjektu",
    "Hlavní NACE", "Hlavní NACE - kód",
    "Vedlejší NACE CZ", "Vedlejší NACE CZ - kód",
    "Hlavní OKEČ", "Hlavní OKEČ - kód",
    "Vedlejší OKEČ", "Vedlejší OKEČ - kód",
    "Institucionální sektory (ESA 2010)", "Institucionální sektory (ESA 95)",
    "Lokalita", "Kraj", "Počet zaměstnanců", "Kategorie počtu zaměstnanců CZ", "Kategorie obratu",
    "Audit", "Konsolidace", "Měna",
    "Datum vzniku", "Datum zrušení", "Rok", "Čtvrtletí", "Stav subjektu", "Právní forma", "Typ subjektu",
    'Hospodářský výsledek před zdaněním',
    'Hospodářský výsledek za účetní období',
    'Provozní hospodářský výsledek',
    'Náklady',
    'Obrat, Výnosy',
    'Tržby, Výkony',
    'Aktiva celkem',
    'Stálá aktiva',
    'Oběžná aktiva',
    'Ostatní aktiva',
    'Pasiva celkem',
    'Vlastní kapitál',
    'Cizí zdroje',
    'Ostatní pasiva',
    ]


time_cols = [c for c in raw.collect_schema().names() if c not in STATIC_COLS]

In [17]:
# %%
# ------------------------------------------------------------------
# 4. melt ➜ parse (year, quarter, metric) ➜ keep 4Q/annual rows (ROBUST)
# ------------------------------------------------------------------
metric_map = {
    "Hospodářský výsledek před zdaněním":   "profit_pre_tax",
    "Hospodářský výsledek za účetní období":"profit_net",
    "Provozní hospodářský výsledek":        "oper_profit",
    "Náklady":                              "costs",
    "Obrat, Výnosy":                        "turnover",
    "Obrat Výnosy":                         "turnover",
    "Tržby, Výkony":                        "sales_revenue",
    "Tržby Výkony":                         "sales_revenue",
    "Aktiva celkem":                        "total_assets",
    "Stálá aktiva":                         "fixed_assets",
    "Oběžná aktiva":                        "current_assets",
    "Ostatní aktiva":                       "other_assets",
    "Pasiva celkem":                        "total_liabilities_and_equity",
    "Vlastní kapitál":                      "equity",
    "Cizí zdroje":                          "total_liabilities",
    "Ostatní pasiva":                       "other_liabilities",
}


def build_long(lf: pl.LazyFrame) -> pl.LazyFrame:
    """melt & parse year/quarter/metric, return long table"""
    long = (
        lf.melt(id_vars=STATIC_COLS, value_vars=time_cols,
                variable_name="raw", value_name="val")
          .drop_nulls("val")
          # --- extract pieces with regex ----------------------------------
          .with_columns([
              # 4-digit year anywhere in the string
              pl.col("raw").str.extract(r"(\d{4})", 1).cast(pl.Int32).alias("year"),
              # quarter pattern like '4Q'
              pl.col("raw").str.extract(r"([1-4]Q)", 1).alias("q_raw"),
          ])
          # metric = raw minus leading 'YYYY/4Q ' or '4Q/YYYY ' or 'YYYY '
          .with_columns(
              pl.when(pl.col("raw").str.contains(r"^\d{4}/[1-4]Q"))
                 .then(pl.col("raw").str.replace(r"^\d{4}/[1-4]Q\s*", ""))
               .when(pl.col("raw").str.contains(r"^[1-4]Q/\d{4}"))
                 .then(pl.col("raw").str.replace(r"^[1-4]Q/\d{4}\s*", ""))
               .when(pl.col("raw").str.contains(r"^\d{4}$"))
                 .then(pl.col("raw").str.replace(r"^\d{4}$", ""))
               .when(pl.col("raw").str.contains(r"^\d{4}\s"))
                 .then(pl.col("raw").str.replace(r"^\d{4}\s*", ""))
               .otherwise(pl.col("raw"))
               .alias("metric_cs")
          )
          .with_columns([
              # map Czech metric name to English slug; drop rows we don't recognise
              pl.col("metric_cs").map_elements(lambda x: metric_map.get(x), return_dtype=pl.String).alias("metric")
          ])
          .filter(pl.col("metric").is_not_null())
          # quarter = int( q_raw[0] ) or null
          # THIS IS THE CORRECTED LINE:
          .with_columns(
              pl.col("q_raw").str.replace("Q", "").cast(pl.Int8, strict=False).alias("quarter")
          )
          .drop("q_raw", "raw", "metric_cs")
          # keep only annual or 4Q values
          # This filter will now work correctly
          .filter(pl.col("quarter").is_null() | (pl.col("quarter") == 4))
    )
    return long

# Now, re-run the rest of your script, starting with creating the 'long' frame:
long = build_long(raw)

# ... then proceed with the lazy pivot, lazy join, and final collect.
# (The code for steps 5, 6, 7 from the previous answer is correct)

  lf.melt(id_vars=STATIC_COLS, value_vars=time_cols,


In [18]:
# 5. build the wide panel using an ISOLATED "group_by-agg" pivot
#    (This is the corrected step)
# ------------------------------------------------------------------
# First, get the list of metric slugs that will become columns.
final_metric_columns = list(set(metric_map.values())) # Use set to handle duplicate mappings

# Now, we construct the pivot manually.
# CRITICAL CHANGE: Select ONLY the columns needed for the pivot.
panel_lazy = (
    long.select(["IČO", "year", "metric", "val"])
        .group_by(["IČO", "year"], maintain_order=True)
        .agg(
            [
                pl.col("val").filter(pl.col("metric") == metric).first().alias(metric)
                for metric in final_metric_columns
            ]
        )
)

In [19]:
# ------------------------------------------------------------------
# 6. attach (static) firm attributes (No changes needed)
# ------------------------------------------------------------------
# This step from the previous answer is correct and robustly handles duplicates.
other_static_cols = [c for c in STATIC_COLS if c != "IČO"]
static_unique_lazy = (
    raw.select(STATIC_COLS)
       .group_by("IČO", maintain_order=False)
       .agg([pl.col(c).first() for c in other_static_cols])
)

# Join the two lazy frames. This will now work without any name conflicts.
final_lazy_panel = panel_lazy.join(
    static_unique_lazy, on="IČO", how="left"
)

# Execute the final plan.
print("✔  Lazy plan built. Now executing and collecting the final panel...")
panel = final_lazy_panel.collect(streaming=True)
print("✔  Collection complete.")



✔  Lazy plan built. Now executing and collecting the final panel...


  panel = final_lazy_panel.collect(streaming=True)


✔  Collection complete.


In [20]:
# ------------------------------------------------------------------
# 7. light dtype + English column names
# ------------------------------------------------------------------
rename_static = {
    "IČO":"ico", "Název subjektu":"name",
    "Hlavní NACE":"main_nace", "Hlavní NACE - kód":"main_nace_code",
    "Vedlejší NACE CZ":"sub_nace_cz", "Vedlejší NACE CZ - kód":"sub_nace_cz_code",
    "Hlavní OKEČ":"main_okec", "Hlavní OKEČ - kód":"main_okec_code",
    "Vedlejší OKEČ":"sub_okec", "Vedlejší OKEČ - kód":"sub_okec_code",
    "Institucionální sektory (ESA 2010)":"esa2010",
    "Institucionální sektory (ESA 95)":"esa95",
    "Lokalita":"locality", "Kraj":"region",
    "Počet zaměstnanců":"num_employees",
    "Kategorie počtu zaměstnanců CZ":"num_employees_cat",
    "Kategorie obratu":"turnover_cat",
    "Audit":"audit", "Konsolidace":"consolidation",
    "Měna":"currency",
    "Datum vzniku":"date_founded", "Datum zrušení":"date_dissolved", 
    "Stav subjektu":"status",
    "Právní forma":"legal_form", 
    "Typ subjektu":"entity_type",
}
# Make rename more robust
current_cols = set(panel.columns)
rename_map_filtered = {k: v for k, v in rename_static.items() if k in current_cols}
panel = panel.rename(rename_map_filtered)


# Convert metric columns to Float64
metric_cols = list(set(metric_map.values()))
panel = panel.with_columns([
    pl.col(c).cast(pl.Float64, strict=False) for c in metric_cols if c in panel.columns
])


# simple dtype tweaks (polars is already memory-efficient)
panel = panel.with_columns([
    pl.col("year").cast(pl.Int16),
    # THIS IS THE CORRECTED LINE:
    pl.col("num_employees").cast(pl.Int32, strict=False),
    # The rest is correct as is:
    pl.col([c for c in ["audit","consolidation","currency","esa2010","esa95",
                        "main_nace","sub_nace_cz","main_okec","sub_okec",
                        "locality","region","turnover_cat"]
           if c in panel.columns]).cast(pl.Categorical),
])

In [21]:
# ------------------------------------------------------------------
# 8. write once – Parquet
# ------------------------------------------------------------------
# Remove specified columns before saving to Parquet
cols_to_remove = [
    'Hospodářský výsledek před zdaněním', 'Hospodářský výsledek za účetní období',
    'Provozní hospodářský výsledek', 'Náklady', 'Obrat, Výnosy', 'Tržby, Výkony',
    'Aktiva celkem', 'Stálá aktiva', 'Oběžná aktiva', 'Ostatní aktiva',
    'Pasiva celkem', 'Vlastní kapitál', 'Cizí zdroje', 'Ostatní pasiva', 'Rok', 'Čtvrtletí',
]
cols_to_drop = [c for c in cols_to_remove if c in panel.columns]
panel = panel.drop(cols_to_drop)

out_path = os.path.join(out_dir, "magnusweb_panel.parquet")
panel.write_parquet(out_path, compression="snappy")
print("✔  firm-year panel saved   ➜", out_path)


✔  firm-year panel saved   ➜ /Users/adam/Library/Mobile Documents/com~apple~CloudDocs/School/Master's Thesis/Analysis/profit-margins-inflation/data/source_cleaned/magnusweb_panel.parquet


In [22]:
# ------------------------------------------------------------------
# 9. quick sanity check (optional)
# ------------------------------------------------------------------
preview = (
    panel.select(["ico","year","profit_pre_tax","sales_revenue","num_employees"])
          .filter(pl.col("year") >= 2021)
          .limit(5)
)
print(preview)

shape: (5, 5)
┌──────────┬──────┬────────────────┬───────────────┬───────────────┐
│ ico      ┆ year ┆ profit_pre_tax ┆ sales_revenue ┆ num_employees │
│ ---      ┆ ---  ┆ ---            ┆ ---           ┆ ---           │
│ str      ┆ i16  ┆ f64            ┆ f64           ┆ i32           │
╞══════════╪══════╪════════════════╪═══════════════╪═══════════════╡
│ 62302418 ┆ 2022 ┆ 9.299e6        ┆ 5.314e7       ┆ 21            │
│ 18055532 ┆ 2022 ┆ 0.0            ┆ 0.0           ┆ 1             │
│ 25071939 ┆ 2022 ┆ 363000.0       ┆ 2.27631e8     ┆ 14            │
│ 62361163 ┆ 2022 ┆ null           ┆ null          ┆ 10            │
│ 14614561 ┆ 2022 ┆ 5.971e6        ┆ 4.3258e7      ┆ 28            │
└──────────┴──────┴────────────────┴───────────────┴───────────────┘


In [23]:
print("Type of panel:", type(panel))

def find_static_columns(df, id_col="ico"):
    import pandas as pd
    # Convert to pandas DataFrame for groupby compatibility
    pdf = df.to_pandas() if hasattr(df, "to_pandas") else pd.DataFrame(df)
    static_cols = []
    for col in pdf.columns:
        if col == id_col:
            continue
        unique_counts = pdf.groupby(id_col)[col].nunique()
        if unique_counts.max() == 1:
            static_cols.append(col)
    return static_cols

static_columns = find_static_columns(panel)
print("Static columns (do not change across years for each ICO):", static_columns)

Type of panel: <class 'polars.dataframe.frame.DataFrame'>
Static columns (do not change across years for each ICO): ['name', 'main_nace', 'main_nace_code', 'sub_nace_cz', 'sub_nace_cz_code', 'main_okec', 'main_okec_code', 'sub_okec', 'sub_okec_code', 'esa2010', 'esa95', 'locality', 'region', 'num_employees', 'num_employees_cat', 'turnover_cat', 'audit', 'consolidation', 'currency', 'date_founded', 'date_dissolved', 'status', 'legal_form', 'entity_type']


In [24]:
print("Type of panel:", type(panel))

def find_time_series_columns(df, id_col="ico"):
    import pandas as pd
    # Convert to pandas DataFrame for groupby compatibility
    pdf = df.to_pandas() if hasattr(df, "to_pandas") else pd.DataFrame(df)
    time_series_cols = []
    for col in pdf.columns:
        if col == id_col:
            continue
        unique_counts = pdf.groupby(id_col)[col].nunique()
        if unique_counts.max() > 1:
            time_series_cols.append(col)
    return time_series_cols

time_series_columns = find_time_series_columns(panel)
print("Time-series columns (change across years for each ICO):", time_series_columns)

Type of panel: <class 'polars.dataframe.frame.DataFrame'>
Time-series columns (change across years for each ICO): ['year', 'costs', 'sales_revenue', 'equity', 'profit_net', 'turnover', 'oper_profit', 'total_assets', 'total_liabilities_and_equity', 'profit_pre_tax']
