# Merge Crunchbase CSV Datasets

This notebook merges multiple Crunchbase CSV exports into a single organization-level table using a **YAML configuration** that specifies

- which file is the *base* table,
- which columns to keep (to reduce memory and I/O),
- optional renames and dtypes (to align keys across exports), and
- a sequence of explicit joins.

## Imports and Global Settings

- `pandas` is used for the in-memory merge pipeline.
- `yaml` provides a compact and auditable way to describe merge steps.
- `duckdb` is imported (only inside the aggregate function), because aggregates are optional.

In [1]:
# Standard library
from pathlib import Path
from typing import Any, Dict, List, Optional, Union
import csv
import sys

# Third-party
import pandas as pd
import yaml

# Pandas safety setting for interactive work: avoids many chained-assignment pitfalls.
pd.options.mode.copy_on_write = True


## CSV Reading and Join Utilities

These helper functions implement a small, explicit interface for the merge pipeline:

- **`read_csv_select`** centralizes `pd.read_csv(...)` so all tables are read consistently (column selection, renaming, and dtypes).
- **`_apply_presort_dedupe`** optionally sorts and deduplicates the *right-hand* table before joining. This is useful when a join key is not strictly unique in the raw exports but we want a single, deterministic match.
- **`apply_exact_join`** performs a standard pandas merge, but first ensures the join-key columns exist on both sides. This keeps the pipeline robust to partial exports where a key column might be absent.

In [2]:
def read_csv_select(path: Path,
                    usecols: Optional[List[str]] = None,
                    renames: Optional[Dict[str, str]] = None,
                    dtype: Optional[Dict[str, str]] = None) -> pd.DataFrame:
    """Read a CSV table with optional schema controls.

    I keep all `pd.read_csv(...)` calls behind this helper so that the merge pipeline
    reads files consistently.

    Parameters
    ----------
    path:
        File path to the CSV export.
    usecols:
        Optional list of columns to load (reduces I/O + memory for large exports).
    renames:
        Optional mapping applied after reading (used to align join keys, e.g. `uuid` → `org_uuid`).
    dtype:
        Optional dtype mapping forwarded to pandas (used to prevent mixed-type key columns).

    Returns
    -------
    pd.DataFrame
        Loaded (optionally column-filtered and renamed) DataFrame.
    """
    # `low_memory=False` makes pandas process the file in one pass, which reduces
    # dtype-guessing surprises on wide / heterogeneous Crunchbase exports.
    kwargs: Dict[str, Any] = dict(low_memory=False)
    if usecols is not None:
        kwargs["usecols"] = usecols
    if dtype is not None:
        kwargs["dtype"] = dtype

    df = pd.read_csv(path, **kwargs)

    # Column normalization happens after reading so we can refer to consistent names downstream.
    if renames:
        df = df.rename(columns=renames)

    return df


def _apply_presort_dedupe(df: pd.DataFrame, join_cfg: Dict[str, Any]) -> pd.DataFrame:
    """Apply optional sort + drop-duplicates rules defined in a join configuration.

    This is mainly a *data quality / determinism* guard:
    - sorting with a stable algorithm (`mergesort`) makes the chosen "first" row reproducible,
    - deduplication can enforce one-row-per-key behavior on the right-hand table.
    """
    if "sort_by" in join_cfg:
        ascending: Union[bool, List[bool]] = join_cfg.get("ascending", False)
        df = df.sort_values(by=join_cfg["sort_by"], ascending=ascending, kind="mergesort")
    if "dedupe_on" in join_cfg:
        df = df.drop_duplicates(subset=join_cfg["dedupe_on"], keep="first")
    return df


def apply_exact_join(base: pd.DataFrame,
                     right: pd.DataFrame,
                     how: str,
                     left_on: List[str],
                     right_on: List[str],
                     suffix: str) -> pd.DataFrame:
    """Perform a pandas merge with explicit key handling.

    Before merging, the function ensures that all key columns exist on both sides.
    In practice, Crunchbase exports can be incomplete or filtered; creating missing
    key columns with `pd.NA` keeps the pipeline robust and avoids `KeyError`s.

    Notes
    -----
    - The join semantics are entirely governed by `how`, `left_on`, and `right_on`.
    - `suffix` is appended to right-hand columns when a name collision occurs.
    """
    for col in left_on:
        if col not in base.columns:
            base[col] = pd.NA
    for col in right_on:
        if col not in right.columns:
            right[col] = pd.NA

    return base.merge(right, how=how, left_on=left_on, right_on=right_on, suffixes=("", suffix))


## YAML-Driven Merge Pipeline

The core merge logic is defined by a YAML configuration. Conceptually, the configuration has three parts:

1. **`base`**: the primary table (typically an organizations table), including optional `select`, `rename`, and `dtypes`.
2. **`joins`**: an ordered list of joins to apply to the growing base table.
3. **`final_order`** (optional): a preferred column ordering for readability.

A minimal (illustrative) configuration looks like this:

```yaml
base:
  path: /path/to/organizations.csv
  select: [uuid, name, country_code, founded_on]
  rename: {uuid: org_uuid}
  dtypes: {uuid: string}

joins:
  - type: exact
    path: /path/to/funding_rounds.csv
    select: [org_uuid, raised_amount_usd, announced_on]
    how: left
    left_on: org_uuid
    right_on: org_uuid
    suffix: _fr
    # Optional quality controls for non-unique right-hand keys:
    sort_by: [org_uuid, announced_on]
    ascending: [true, true]
    dedupe_on: [org_uuid]

final_order: [org_uuid, name, founded_on]
```

The join order is intentional: later joins can depend on columns produced by earlier steps.

In [3]:
def run_yaml_pipeline(config: Dict[str, Any]) -> pd.DataFrame:
    """Execute the YAML-driven merge pipeline and return the merged DataFrame."""
    base_cfg = config["base"]

    # 1) Load the base table (the left-most table for all subsequent joins).
    base = read_csv_select(
        Path(base_cfg["path"]),
        base_cfg.get("select"),
        base_cfg.get("rename"),
        base_cfg.get("dtypes"),
    )

    # 2) Apply each configured join in order.
    for join_cfg in config.get("joins", []):
        if join_cfg["type"] != "exact":
            raise ValueError(f"Unsupported join type {join_cfg['type']} in this script")

        right_df = read_csv_select(
            Path(join_cfg["path"]),
            join_cfg.get("select"),
            join_cfg.get("rename"),
            join_cfg.get("dtypes"),
        )

        # Optional deterministic cleaning of the right-hand side.
        right_df = _apply_presort_dedupe(right_df, join_cfg)

        # Normalize YAML values so the merge always receives lists of key columns.
        left_on = join_cfg["left_on"] if isinstance(join_cfg["left_on"], list) else [join_cfg["left_on"]]
        right_on = join_cfg["right_on"] if isinstance(join_cfg["right_on"], list) else [join_cfg["right_on"]]

        base = apply_exact_join(
            base,
            right_df,
            join_cfg.get("how", "left"),
            left_on,
            right_on,
            join_cfg.get("suffix", "_r"),
        )

    # 3) Optional: place key columns first for readability (without dropping any columns).
    if "final_order" in config:
        ordered_cols = [col for col in config["final_order"] if col in base.columns]
        remaining = [col for col in base.columns if col not in ordered_cols]
        base = base[ordered_cols + remaining]

    return base


## DuckDB Aggregate Enrichment

Beyond the direct joins, I optionally compute a set of *derived* organization-level features from the raw Crunchbase tables using DuckDB.

Why DuckDB here:
- It can scan multiple CSVs via `read_csv_auto(...)` without loading everything into pandas.
- The aggregation logic stays in SQL, which makes the feature definitions auditable.

Implementation details:
- The function checks that the required raw CSVs exist in the same directory as the base table.
- Aggregates are computed **in batches of `org_uuid`** to reduce peak memory usage.
- The result is merged back onto the base DataFrame via a left join on `org_uuid`.

One convenience feature is that I also build a detailed per-round funding string list and then expand it into columns (`funding_round_1`, `funding_round_2`, …) for downstream modeling.

In [4]:
# DuckDB aggregate enrichment: the SQL is intentionally kept intact for reproducibility.
# I only add comments/documentation in this notebook version.

def _quote_path(path: Path) -> str:
    """Escape a file path so it can be safely embedded inside a DuckDB SQL string."""
    text = str(path)
    return "'" + text.replace("'", "''") + "'"

def add_org_aggregates(
    df: pd.DataFrame,
    data_dir: Path,
    verbose: bool = True,
    batch_size: int = 500_000,
) -> pd.DataFrame:
    """Add organization-level aggregates via DuckDB (optional enrichment step).

    This mirrors the scripted pipeline: rather than joining these wide, relational tables
    in pandas, I compute organization-level features in SQL (DuckDB) and merge the results
    back onto the base DataFrame.

    Implementation notes
    --------------------
    - Uses `read_csv_auto(...)` to scan CSV exports directly from disk.
    - Filters all feature engineering to the set of `org_uuid` present in `df`.
    - Runs in batches to control memory usage on large exports.
    """
    try:
        import duckdb
    except ImportError:
        print("WARNING: duckdb not installed; skipping aggregates. Run: pip install duckdb", file=sys.stderr)
        return df

    required = [
        "funding_rounds.csv",
        "investments.csv",
        "investors.csv",
        "funds.csv",
        "jobs.csv",
        "people.csv",
        "degrees.csv",
        "people_descriptions.csv",
        "acquisitions.csv",
        "ipos.csv",
        "organizations.csv",
        "category_groups.csv",
    ]
    missing = [name for name in required if not (data_dir / name).exists()]
    if missing:
        print(f"WARNING: aggregate step skipped; missing files: {missing}", file=sys.stderr)
        return df

    paths = {name: data_dir / name for name in required}

    # Temp directory for DuckDB spills
    temp_dir = (data_dir / ".duckdbtmp").resolve()
    temp_dir.mkdir(parents=True, exist_ok=True)
    safe_temp = str(temp_dir).replace("'", "''")

    # Build the SQL once.
    #
    # The SQL is long by design: it defines all aggregates in a single audited block
    # (funding, investors, founders/education, acquisitions/IPOs, categories, and per-round formatting).
    SQL = f"""
WITH leads AS (
  SELECT i.funding_round_uuid,
         string_agg(inv.name, ', ' ORDER BY inv.name) AS lead_investors
  FROM read_csv_auto({_quote_path(paths['investments.csv'])}) i
  JOIN read_csv_auto({_quote_path(paths['investors.csv'])}) inv ON inv.uuid = i.investor_uuid
  WHERE lower(cast(i.is_lead_investor AS varchar)) IN ('true','1')
  GROUP BY i.funding_round_uuid
),

-- Full funding rounds timeline (readable)
fr AS (
  SELECT fr.org_uuid,
         string_agg(
           printf('%s: %s (type=%s / %s) raised=%s pmv=%s investors=%s leads=%s',
             coalesce(CAST(fr.announced_on AS TEXT), ''),
             coalesce(fr.name, ''),
             coalesce(fr.type, ''),
             coalesce(fr.investment_type, ''),
             coalesce(CAST(fr.raised_amount_usd AS TEXT), ''),
             coalesce(CAST(fr.post_money_valuation_usd AS TEXT), ''),
             coalesce(CAST(fr.investor_count AS TEXT), ''),
             coalesce(leads.lead_investors, '')
           ),
           ' | ' ORDER BY fr.announced_on NULLS LAST
         ) AS funding_rounds
  FROM read_csv_auto({_quote_path(paths['funding_rounds.csv'])}) fr
  LEFT JOIN leads ON leads.funding_round_uuid = fr.uuid
  GROUP BY fr.org_uuid
),

-- Earliest funding round (any type) for context
fr_ranked_all AS (
  SELECT fr.*,
         ROW_NUMBER() OVER (
           PARTITION BY fr.org_uuid
           ORDER BY fr.announced_on ASC NULLS LAST
         ) AS rn
  FROM read_csv_auto({_quote_path(paths['funding_rounds.csv'])}) fr
),
fr_technical_first AS (
  SELECT org_uuid,
         printf('%s (%s)',
           coalesce(NULLIF(fr.investment_type,''), coalesce(NULLIF(fr.type,''), 'unknown')),
           coalesce(CAST(fr.announced_on AS TEXT), '')
         ) AS first_technicaly_funding_type
  FROM fr_ranked_all fr
  WHERE fr.rn = 1
),

-- First (earliest dated) funding round per org, with scalar features
-- UPDATED LOGIC: Strict exclusion list as requested.
fr_ranked AS (
  SELECT fr.*,
         ROW_NUMBER() OVER (
           PARTITION BY fr.org_uuid
           ORDER BY fr.announced_on ASC NULLS LAST
         ) AS rn
  FROM read_csv_auto({_quote_path(paths['funding_rounds.csv'])}) fr
  WHERE lower(coalesce(fr.investment_type, fr.type, '')) NOT IN (
      'grant',
      'debt_financing',
      'post_ipo_equity',
      'post_ipo_debt',
      'secondary_market',
      'non_equity_assistance',
      'undisclosed',
      -- Newly excluded per request:
      'convertible_note',
      'equity_crowdfunding',
      'private_equity',
      'corporate_round'
  )
),
fr_investor_types AS (
  SELECT fr.org_uuid,
         fr.uuid AS funding_round_uuid,
         string_agg(DISTINCT inv.investor_types, ' | ' ORDER BY inv.investor_types) AS investor_types_first_round
  FROM read_csv_auto({_quote_path(paths['funding_rounds.csv'])}) fr
  JOIN read_csv_auto({_quote_path(paths['investments.csv'])}) i  ON i.funding_round_uuid = fr.uuid
  JOIN read_csv_auto({_quote_path(paths['investors.csv'])})  inv ON inv.uuid = i.investor_uuid
  GROUP BY fr.org_uuid, fr.uuid
),
fr_first AS (
  SELECT r.org_uuid, r.uuid AS first_round_uuid,
         r.announced_on  AS first_funding_date,
         r.name          AS first_funding_name,
         r.type          AS first_funding_type,
         r.investment_type AS first_funding_investment_type,
         r.raised_amount_usd        AS first_funding_raised_usd,
         r.post_money_valuation_usd AS first_funding_post_money_usd,
         r.investor_count           AS first_funding_investor_count,
         l.lead_investors           AS first_funding_leads,
         it_round.investor_types_first_round AS first_funding_investor_type
  FROM fr_ranked r
  LEFT JOIN leads l ON l.funding_round_uuid = r.uuid
  LEFT JOIN fr_investor_types it_round ON it_round.funding_round_uuid = r.uuid
  WHERE r.rn = 1
),
fr_first_investor AS (
  SELECT fr.org_uuid,
         fr.first_round_uuid,
         string_agg(DISTINCT inv.uuid, ' | ' ORDER BY inv.uuid) AS first_round_investor_uuid
  FROM fr_first fr
  JOIN read_csv_auto({_quote_path(paths['investments.csv'])}) i ON i.funding_round_uuid = fr.first_round_uuid
  JOIN read_csv_auto({_quote_path(paths['investors.csv'])}) inv ON inv.uuid = i.investor_uuid
  GROUP BY fr.org_uuid, fr.first_round_uuid
),

-- Funds invested with investor_types included (dedupe first, then aggregate)
funds_vals AS (
  SELECT DISTINCT fr.org_uuid,
         f.name AS fund_name,
         printf('%s (types=%s)',
           coalesce(f.name,''), coalesce(inv.investor_types,'')
         ) AS val,
         inv.investor_types
  FROM read_csv_auto({_quote_path(paths['funding_rounds.csv'])}) fr
  JOIN read_csv_auto({_quote_path(paths['investments.csv'])}) i  ON i.funding_round_uuid = fr.uuid
  JOIN read_csv_auto({_quote_path(paths['investors.csv'])})  inv ON inv.uuid = i.investor_uuid
  JOIN read_csv_auto({_quote_path(paths['funds.csv'])})      f   ON f.entity_uuid = inv.uuid
),
funds_invested AS (
  SELECT org_uuid, string_agg(val, ' | ' ORDER BY fund_name) AS funds_invested
  FROM funds_vals
  GROUP BY org_uuid
),
investor_types_all AS (
  SELECT org_uuid,
         string_agg(DISTINCT investor_types, ' | ' ORDER BY investor_types) AS investor_types_all
  FROM funds_vals
  GROUP BY org_uuid
),

-- Organizations filtered to the base orgs (reuse for funding features and categories)
orgs_filtered AS (
  SELECT o.uuid AS org_uuid,
         o.founded_on,
         o.total_funding_usd,
         o.category_list,
         o.homepage_url,
         o.created_at,
         o.updated_at
  FROM read_csv_auto({_quote_path(paths['organizations.csv'])}) o
  JOIN base_orgs b ON b.org_uuid = o.uuid
),

-- Weighted average time (months) at which funding was raised
funding_with_foundation AS (
  SELECT fr.org_uuid,
         fr.raised_amount_usd,
         fr.announced_on,
         DATE_DIFF('month', of.founded_on, fr.announced_on) AS months_from_founding
  FROM read_csv_auto({_quote_path(paths['funding_rounds.csv'])}) fr
  JOIN orgs_filtered of ON of.org_uuid = fr.org_uuid
  WHERE fr.raised_amount_usd IS NOT NULL
    AND fr.announced_on IS NOT NULL
    AND of.founded_on IS NOT NULL
),
weighted_time AS (
  SELECT org_uuid,
         SUM(CAST(raised_amount_usd AS DOUBLE) * months_from_founding) / NULLIF(SUM(CAST(raised_amount_usd AS DOUBLE)), 0) AS weighted_time
  FROM funding_with_foundation
  GROUP BY org_uuid
),

-- Funding progression to 25% of total funding
funding_rounds_ordered AS (
  SELECT fr.org_uuid,
         fr.announced_on,
         fr.raised_amount_usd,
         ROW_NUMBER() OVER (
           PARTITION BY fr.org_uuid
           ORDER BY fr.announced_on ASC NULLS LAST, fr.uuid
         ) AS round_number,
         SUM(CAST(fr.raised_amount_usd AS DOUBLE)) OVER (
           PARTITION BY fr.org_uuid
           ORDER BY fr.announced_on ASC NULLS LAST, fr.uuid
           ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW
         ) AS cumulative_raised
  FROM read_csv_auto({_quote_path(paths['funding_rounds.csv'])}) fr
  JOIN base_orgs b ON b.org_uuid = fr.org_uuid
  WHERE fr.raised_amount_usd IS NOT NULL
),
funding_25pct_ranked AS (
  SELECT fro.org_uuid,
         fro.announced_on,
         fro.round_number,
         ROW_NUMBER() OVER (
           PARTITION BY fro.org_uuid
           ORDER BY fro.round_number
         ) AS rn
  FROM funding_rounds_ordered fro
  JOIN orgs_filtered of ON of.org_uuid = fro.org_uuid
  WHERE of.total_funding_usd IS NOT NULL
    AND CAST(of.total_funding_usd AS DOUBLE) > 0
    AND fro.cumulative_raised >= 0.25 * CAST(of.total_funding_usd AS DOUBLE)
),
funding_25pct AS (
  SELECT org_uuid,
         announced_on AS funding_25pct_date,
         round_number AS funding_25pct_round_number
  FROM funding_25pct_ranked
  WHERE rn = 1
),


-- Funding date when cumulative raised crosses 1 million USD
funding_1m_ranked AS (
  SELECT fro.org_uuid,
         fro.announced_on,
         ROW_NUMBER() OVER (
           PARTITION BY fro.org_uuid
           ORDER BY fro.round_number
         ) AS rn
  FROM funding_rounds_ordered fro
  WHERE fro.cumulative_raised >= 1000000
),
funding_1m AS (
  SELECT org_uuid,
         announced_on AS date_of_1_million
  FROM funding_1m_ranked
  WHERE rn = 1
),

-- Founders: detect by job title / job_type; then build readable strings & features
founders_base AS (
  SELECT DISTINCT j.org_uuid, j.person_uuid
  FROM read_csv_auto({_quote_path(paths['jobs.csv'])}) j
  WHERE lower(coalesce(j.title,'')) LIKE '%%founder%%' OR lower(coalesce(j.job_type,'')) = 'founder'
),
founders_vals AS (
  SELECT DISTINCT fb.org_uuid,
         printf('%s (%s, %s/%s/%s) – %s – %s',
           coalesce(p.name,''), coalesce(p.gender,''), coalesce(p.country_code,''), coalesce(p.state_code,''), coalesce(p.city,''),
           coalesce(p.featured_job_title,''), coalesce(p.linkedin_url,'')
         ) AS val
  FROM founders_base fb
  JOIN read_csv_auto({_quote_path(paths['people.csv'])}) p ON p.uuid = fb.person_uuid
),
founders AS (
  SELECT org_uuid, string_agg(val, ' | ' ORDER BY val) AS founders
  FROM founders_vals
  GROUP BY org_uuid
),
founders_counts AS (
  SELECT fb.org_uuid,
         COUNT(DISTINCT fb.person_uuid) AS founders_count,
         SUM(CASE WHEN lower(p.gender)='female' THEN 1 ELSE 0 END) AS founders_female_count,
         SUM(CASE WHEN lower(p.gender)='male'   THEN 1 ELSE 0 END) AS founders_male_count,
         SUM(CASE WHEN coalesce(p.linkedin_url,'') <> '' THEN 1 ELSE 0 END) AS founders_linkedin_count,
         string_agg(DISTINCT p.country_code, ' | ' ORDER BY p.country_code) AS founders_countries
  FROM founders_base fb
  JOIN read_csv_auto({_quote_path(paths['people.csv'])}) p ON p.uuid = fb.person_uuid
  GROUP BY fb.org_uuid
),

-- Best degree per founder (completed first, then latest), then degree flags
founder_degrees_ranked AS (
  SELECT fb.org_uuid,
         d.person_uuid, p.name AS person_name,
         d.name AS degree_name, d.institution_name, d.degree_type, d.subject,
         d.is_completed, d.completed_on,
         ROW_NUMBER() OVER (
           PARTITION BY fb.org_uuid, d.person_uuid
           ORDER BY (CASE WHEN lower(cast(d.is_completed AS varchar)) IN ('true','1') THEN 1 ELSE 0 END) DESC,
                    d.completed_on DESC NULLS LAST
         ) AS rn
  FROM founders_base fb
  JOIN read_csv_auto({_quote_path(paths['degrees.csv'])}) d ON d.person_uuid = fb.person_uuid
  JOIN read_csv_auto({_quote_path(paths['people.csv'])})   p ON p.uuid = d.person_uuid
),
founder_degrees_vals AS (
  SELECT fdr.org_uuid,
         fdr.person_name,
         fdr.rn,
         printf('%s: %s (%s, %s, %s) completed_on=%s',
           coalesce(fdr.person_name,''),
           coalesce(fdr.degree_name,''),
           coalesce(fdr.institution_name,''),
           coalesce(fdr.degree_type,''),
           coalesce(fdr.subject,''),
           coalesce(CAST(fdr.completed_on AS TEXT),'')
         ) AS val
  FROM founder_degrees_ranked fdr
),
founders_degrees AS (
  SELECT org_uuid,
         string_agg(val, ' | ' ORDER BY person_name, rn) AS founders_degrees
  FROM founder_degrees_vals
  GROUP BY org_uuid
),
degree_flags AS (
  SELECT org_uuid,
         MAX(CASE WHEN lower(coalesce(degree_type,'')) LIKE '%%phd%%' OR lower(coalesce(degree_name,'')) LIKE '%%phd%%'
                   OR lower(coalesce(degree_type,'')) LIKE '%%doctor%%' OR lower(coalesce(degree_name,'')) LIKE '%%doctor%%'
                   OR lower(coalesce(degree_name,'')) LIKE '%%doctorate%%'
             THEN 1 ELSE 0 END) AS founders_has_phd,
         MAX(CASE WHEN lower(coalesce(degree_type,'')) LIKE '%%mba%%' OR lower(coalesce(degree_name,'')) LIKE '%%mba%%' THEN 1 ELSE 0 END) AS founders_has_mba,
         MAX(CASE WHEN lower(coalesce(degree_type,'')) LIKE '%%master%%' OR lower(coalesce(degree_type,'')) IN ('ms','msc','m.s.','ma','m.a','meng','m.eng')
                       OR lower(coalesce(degree_name,'')) LIKE '%%master%%'
                  THEN 1 ELSE 0 END) AS founders_has_masters,
         MAX(CASE WHEN lower(coalesce(degree_type,'')) LIKE '%%bachelor%%' OR lower(coalesce(degree_type,'')) IN ('ba','b.a.','bs','b.s.','bsc','b.eng','beng')
                       OR lower(coalesce(degree_name,'')) LIKE '%%bachelor%%'
                  THEN 1 ELSE 0 END) AS founders_has_bachelors,
         MAX(CASE WHEN lower(coalesce(degree_type,'')) LIKE '%%juris doctor%%'
                       OR lower(coalesce(degree_name,'')) LIKE '%%juris doctor%%'
                       OR lower(coalesce(degree_type,'')) = 'jd'
                       OR lower(coalesce(degree_name,'')) = 'jd'
                  THEN 1 ELSE 0 END) AS founders_has_jd
  FROM founder_degrees_ranked
  WHERE rn = 1
  GROUP BY org_uuid
),

-- Founder descriptions (dedupe first)
founders_desc_vals AS (
  SELECT DISTINCT fb.org_uuid,
         printf('%s: %s', p.name, pd.description) AS val
  FROM founders_base fb
  JOIN read_csv_auto({_quote_path(paths['people.csv'])}) p ON p.uuid = fb.person_uuid
  JOIN read_csv_auto({_quote_path(paths['people_descriptions.csv'])}) pd ON pd.uuid = p.uuid
),
founders_descriptions AS (
  SELECT org_uuid, string_agg(val, ' | ' ORDER BY val) AS founders_descriptions
  FROM founders_desc_vals
  GROUP BY org_uuid
),

-- Acquisitions (both directions)
acq_base AS (
  SELECT *
  FROM read_csv_auto({_quote_path(paths['acquisitions.csv'])})
),
acq_by AS (
  SELECT a.acquiree_uuid AS org_uuid,
         string_agg(
           printf('%s (%s USD; %s)',
             coalesce(a.acquirer_name,''), coalesce(CAST(a.price_usd AS TEXT),''), coalesce(CAST(a.acquired_on AS TEXT),'')
           ),
           ' | ' ORDER BY a.acquired_on NULLS LAST
         ) AS acquired_by
  FROM acq_base a
  GROUP BY a.acquiree_uuid
),
acq_by_dates AS (
  SELECT a.acquiree_uuid AS org_uuid,
         MIN(a.acquired_on) AS acquired_on_first,
         MAX(a.acquired_on) AS acquired_on_last
  FROM acq_base a
  WHERE a.acquired_on IS NOT NULL
  GROUP BY a.acquiree_uuid
),
acq_has AS (
  SELECT a.acquirer_uuid AS org_uuid,
         string_agg(
           printf('%s (%s USD; %s)',
             coalesce(a.acquiree_name,''), coalesce(CAST(a.price_usd AS TEXT),''), coalesce(CAST(a.acquired_on AS TEXT),'')
           ),
           ' | ' ORDER BY a.acquired_on NULLS LAST
         ) AS has_acquired
  FROM acq_base a
  GROUP BY a.acquirer_uuid
),
acq_has_dates AS (
  SELECT a.acquirer_uuid AS org_uuid,
         MIN(a.acquired_on) AS first_acquired_company_on,
         MAX(a.acquired_on) AS last_acquired_company_on
  FROM acq_base a
  WHERE a.acquired_on IS NOT NULL
  GROUP BY a.acquirer_uuid
),

-- IPO information per organization
ipo_ranked AS (
  SELECT i.*,
         ROW_NUMBER() OVER (
           PARTITION BY i.org_uuid
           ORDER BY i.updated_at DESC NULLS LAST,
                    i.created_at DESC NULLS LAST
         ) AS rn
  FROM read_csv_auto({_quote_path(paths['ipos.csv'])}) i
),
ipo_info AS (
  SELECT org_uuid,
         went_public_on AS ipo_went_public_on,
         created_at     AS ipo_created_at,
         updated_at     AS ipo_updated_at
  FROM ipo_ranked
  WHERE rn = 1
),

-- Categories: expand org.category_list using the category catalog (category_groups.csv)
org_core AS (
  SELECT o.org_uuid,
         o.category_list,
         o.homepage_url,
         o.created_at AS org_created_at,
         o.updated_at AS org_updated_at
  FROM orgs_filtered o
),
org_with_cats AS (
  SELECT org_uuid, category_list
  FROM org_core
),
-- explode org's comma-separated categories; alias UNNEST column as c
org_cat_names AS (
  SELECT owc.org_uuid, trim(c) AS category_name
  FROM org_with_cats owc,
       UNNEST(str_split(coalesce(owc.category_list,''), ',')) AS t(c)
),
org_cat_names_distinct AS (
  SELECT DISTINCT org_uuid, category_name FROM org_cat_names
),
cat_catalog AS (
  SELECT name, category_groups_list
  FROM read_csv_auto({_quote_path(paths['category_groups.csv'])})
),
categories_canonical AS (
  SELECT ocdn.org_uuid,
         string_agg(DISTINCT c.name, ' | ' ORDER BY c.name) AS category_names_from_catalog
  FROM org_cat_names_distinct ocdn
  JOIN cat_catalog c ON lower(trim(c.name)) = lower(trim(ocdn.category_name))
  GROUP BY ocdn.org_uuid
),
-- explode each matched category's groups; alias UNNEST column as g
cat_groups_vals AS (
  SELECT ocdn.org_uuid, trim(g) AS grp
  FROM org_cat_names_distinct ocdn
  JOIN cat_catalog c ON lower(trim(c.name)) = lower(trim(ocdn.category_name)),
       UNNEST(str_split(coalesce(c.category_groups_list,''), ',')) AS t(g)
),
category_groups_from_catalog AS (
  SELECT org_uuid,
         string_agg(DISTINCT grp, ' | ' ORDER BY grp) AS category_groups_from_catalog
  FROM cat_groups_vals
  GROUP BY org_uuid
),

-- NEW LOGIC: Individual Funding Round Columns
-- 1. Gather all investor UUIDs per round
round_investors_uuids AS (
    SELECT i.funding_round_uuid,
           string_agg(i.investor_uuid, ', ' ORDER BY i.investor_uuid) AS inv_uuids
    FROM read_csv_auto({_quote_path(paths['investments.csv'])}) i
    GROUP BY i.funding_round_uuid
),
-- 2. Format rounds exactly as requested: "investment_type (date) XXXXXUSD investor_uuids"
formatted_rounds_per_org AS (
    SELECT fr.org_uuid,
           fr.announced_on,
           printf('%s (%s) %sUSD %s',
               coalesce(fr.investment_type, fr.type, 'unknown'),
               coalesce(CAST(fr.announced_on AS TEXT), 'unknown'),
               coalesce(CAST(fr.raised_amount_usd AS TEXT), '0'),
               coalesce(riu.inv_uuids, '')
           ) AS round_fmt
    FROM read_csv_auto({_quote_path(paths['funding_rounds.csv'])}) fr
    LEFT JOIN round_investors_uuids riu ON riu.funding_round_uuid = fr.uuid
),
-- 3. Aggregate rounds into a LIST, ordered by date
rounds_list_agg AS (
    SELECT org_uuid,
           list(round_fmt ORDER BY announced_on ASC NULLS LAST) AS detailed_rounds_list
    FROM formatted_rounds_per_org
    GROUP BY org_uuid
),


aggs AS (
  SELECT o.org_uuid,
         fr.funding_rounds,
         ffirst.first_funding_date,
         ffirst.first_funding_name,
         ffirst.first_funding_type,
         ffirst.first_funding_investment_type,
         ft.first_technicaly_funding_type,
         ffirst.first_funding_raised_usd,
         ffirst.first_funding_post_money_usd,
         ffirst.first_funding_investor_count,
         fri.first_round_investor_uuid,
         ffirst.first_funding_leads,
         ffirst.first_funding_investor_type,
         fi.funds_invested,
         it.investor_types_all,
         wt.weighted_time,
         f25.funding_25pct_date,
         f25.funding_25pct_round_number,
         f1m.date_of_1_million,
         f.founders,
         fc.founders_count,
         fc.founders_female_count,
         fc.founders_male_count,
         fc.founders_linkedin_count,
         fc.founders_countries,
         dflags.founders_has_phd,
         dflags.founders_has_mba,
         dflags.founders_has_jd,
         dflags.founders_has_masters,
         dflags.founders_has_bachelors,
         fd.founders_degrees,
         fdesc.founders_descriptions,
         ab.acquired_by,
         abdates.acquired_on_first,
         abdates.acquired_on_last,
         ah.has_acquired,
         ahdates.first_acquired_company_on,
         ahdates.last_acquired_company_on,
         ipo.ipo_went_public_on,
         ipo.ipo_created_at,
         ipo.ipo_updated_at,
         org.homepage_url,
         org.org_created_at,
         org.org_updated_at,
         cc.category_names_from_catalog,
         cg.category_groups_from_catalog,
         rl.detailed_rounds_list -- Include the list column
  FROM base_orgs o
  LEFT JOIN fr               ON fr.org_uuid = o.org_uuid
  LEFT JOIN fr_first ffirst  ON ffirst.org_uuid = o.org_uuid
  LEFT JOIN fr_technical_first ft ON ft.org_uuid = o.org_uuid
  LEFT JOIN fr_first_investor fri ON fri.org_uuid = o.org_uuid
  LEFT JOIN funds_invested fi ON fi.org_uuid = o.org_uuid
  LEFT JOIN investor_types_all it ON it.org_uuid = o.org_uuid
  LEFT JOIN weighted_time wt ON wt.org_uuid = o.org_uuid
  LEFT JOIN funding_25pct f25 ON f25.org_uuid = o.org_uuid
  LEFT JOIN funding_1m f1m ON f1m.org_uuid = o.org_uuid
  LEFT JOIN founders f       ON f.org_uuid  = o.org_uuid
  LEFT JOIN founders_counts fc ON fc.org_uuid = o.org_uuid
  LEFT JOIN degree_flags dflags ON dflags.org_uuid = o.org_uuid
  LEFT JOIN founders_degrees fd  ON fd.org_uuid = o.org_uuid
  LEFT JOIN founders_descriptions fdesc ON fdesc.org_uuid = o.org_uuid
  LEFT JOIN acq_by ab        ON ab.org_uuid = o.org_uuid
  LEFT JOIN acq_by_dates abdates ON abdates.org_uuid = o.org_uuid
  LEFT JOIN acq_has ah       ON ah.org_uuid = o.org_uuid
  LEFT JOIN acq_has_dates ahdates ON ahdates.org_uuid = o.org_uuid
  LEFT JOIN ipo_info ipo     ON ipo.org_uuid = o.org_uuid
  LEFT JOIN org_core org     ON org.org_uuid = o.org_uuid
  LEFT JOIN categories_canonical cc ON cc.org_uuid = o.org_uuid
  LEFT JOIN category_groups_from_catalog cg ON cg.org_uuid = o.org_uuid
  LEFT JOIN rounds_list_agg rl ON rl.org_uuid = o.org_uuid
)
SELECT * FROM aggs
"""

    # --- Batching logic starts here ---
    # We batch over distinct org IDs to keep peak memory stable on very large datasets.
    org_ids = df["org_uuid"].dropna().drop_duplicates().to_numpy()
    if len(org_ids) == 0:
        if verbose:
            print("No org_uuid values found; skipping aggregates.", file=sys.stderr)
        return df

    if verbose:
        print(f"Running DuckDB aggregates in batches of {batch_size:,}…", file=sys.stderr)

    all_aggs: List[pd.DataFrame] = []
    total = len(org_ids)

    for start in range(0, total, batch_size):
        end = min(start + batch_size, total)
        batch_orgs = org_ids[start:end]

        if verbose:
            print(f"  Batch {start // batch_size + 1}: orgs {start}–{end - 1}", file=sys.stderr)

        # New connection per batch to avoid accumulation of state/memory
        con = duckdb.connect(
            database=":memory:",
            config={
                "temp_directory": str(temp_dir),
                "memory_limit": "8GB",
                "threads": 1,
                "preserve_insertion_order": "false",
            },
        )
        con.execute(f"PRAGMA temp_directory='{safe_temp}'")
        con.execute("PRAGMA memory_limit='8GB'")
        con.execute("PRAGMA threads=1")
        con.execute("PRAGMA preserve_insertion_order=false")

        # Register only this batch's orgs
        base_orgs_batch = pd.DataFrame({"org_uuid": batch_orgs})
        con.register("base_orgs", base_orgs_batch)

        batch_aggs = con.execute(SQL).df()
        all_aggs.append(batch_aggs)

        con.close()

    aggregates = pd.concat(all_aggs, ignore_index=True)

    # --- NEW: EXPLODE THE LIST COLUMN INTO SEPARATE COLUMNS ---
    if "detailed_rounds_list" in aggregates.columns:
        # 1. Clean data: Handle Python Lists OR Numpy Arrays (often returned by DuckDB)
        def to_list_safe(x):
            if isinstance(x, list):
                return x
            if hasattr(x, 'tolist'):  # Checks for numpy arrays without importing numpy
                return x.tolist()
            return [] # Fallback for None/NaN

        cleaned_rounds = [to_list_safe(x) for x in aggregates["detailed_rounds_list"]]

        # 2. Create the expanded DataFrame
        rounds_expanded = pd.DataFrame(cleaned_rounds, index=aggregates.index)
        
        # 3. Rename columns to funding_round_1, funding_round_2, etc.
        rounds_expanded.columns = [f"funding_round_{i+1}" for i in range(rounds_expanded.shape[1])]
        
        # 4. Concatenate back to aggregates and drop the temporary list column
        aggregates = pd.concat([aggregates, rounds_expanded], axis=1)
        aggregates = aggregates.drop(columns=["detailed_rounds_list"])

    if verbose:
        print(f"Aggregates rows: {len(aggregates):,}", file=sys.stderr)

    return df.merge(aggregates, on="org_uuid", how="left")

## High-Level Merge Wrapper

`merge_crunchbase_data(...)`:

- reads the YAML config from disk,
- runs the YAML-driven merge pipeline,
- optionally runs the DuckDB aggregate enrichment,
- optionally writes the merged table to a CSV file.

I also compute a small derived ratio feature (`first_round_size_to_total_funding`) when both ingredients are present.

In [5]:
def merge_crunchbase_data(config_path: Union[str, Path],
                          output_path: Optional[Union[str, Path]] = None,
                          sep: str = ",",
                          skip_aggs: bool = False,
                          verbose: bool = True) -> pd.DataFrame:
    """Merge Crunchbase CSV exports using the YAML config and optionally write the merged CSV."""
    config_path = Path(config_path)

    # Load merge specification from disk (explicit, versionable, and auditable).
    with config_path.open("r", encoding="utf-8") as handle:
        config = yaml.safe_load(handle)

    # Apply the declarative join pipeline.
    df = run_yaml_pipeline(config)

    # Optional feature enrichment using DuckDB.
    if not skip_aggs:
        data_dir = Path(config["base"]["path"]).resolve().parent
        df = add_org_aggregates(df, data_dir, verbose=verbose)

    # Derived ratio: scale of the first funding round relative to total funding.
    if {"first_funding_raised_usd", "total_funding_usd"}.issubset(df.columns):
        first_round = pd.to_numeric(df["first_funding_raised_usd"], errors="coerce")
        total_funding = pd.to_numeric(df["total_funding_usd"], errors="coerce")
        ratio = first_round / total_funding
        df["first_round_size_to_total_funding"] = ratio.where(total_funding > 0)

    # Optional: apply a stable, readable column ordering.
    if "final_order" in config:
        ordered_cols = [col for col in config["final_order"] if col in df.columns]
        remaining = [col for col in df.columns if col not in ordered_cols]
        df = df[ordered_cols + remaining]

    # Optional output writing (kept separate so the function can be used interactively).
    if output_path is not None:
        output_path = Path(output_path)
        df.to_csv(output_path, index=False, quoting=csv.QUOTE_MINIMAL, sep=sep)
        if verbose:
            print(
                f"Wrote {output_path} with {len(df):,} rows and {len(df.columns)} columns.",
                file=sys.stderr,
            )

    return df



## Example Usage

The cell below shows the typical workflow:

1. Point `config_path` to the YAML configuration you want to run.
2. Set `output_csv` to a filename (or `None` to skip writing).
3. Run the merge and inspect the head of the resulting DataFrame.

If you want a quick test without DuckDB, set `skip_aggs=True`.

In [6]:
from pathlib import Path

# Point this to the YAML configuration that defines your merge (base table + joins).
config_path = Path("/Users/stefan/Desktop/Thesis/v4/Crunchbase Data/Merging CB Datasets/org_big_config.yaml")  # update as needed

# Output filename is relative to the current working directory of the notebook.
# Set to None if you only want an in-memory DataFrame.
output_csv = Path("merged_big_17_12.csv")      # set to None to skip writing

# Main entry point.
merged_df = merge_crunchbase_data(config_path, output_path=output_csv, sep=",", skip_aggs=False)
merged_df.head()


Running DuckDB aggregates in batches of 500,000…
  Batch 1: orgs 0–499999
  Batch 2: orgs 500000–999999
  Batch 3: orgs 1000000–1499999
  Batch 4: orgs 1500000–1999999
  Batch 5: orgs 2000000–2499999
  Batch 6: orgs 2500000–2999999
  Batch 7: orgs 3000000–3499999
  Batch 8: orgs 3500000–3992261
Aggregates rows: 3,992,262
Wrote merged_big_17_12.csv with 3,994,580 rows and 118 columns.


Unnamed: 0,org_uuid,org_name,legal_name,status,founded_on,first_funding_date,last_funding_on,closed_on,went_public_on,acquired_on_first,...,funding_round_34,funding_round_35,funding_round_36,funding_round_37,funding_round_38,funding_round_39,funding_round_40,funding_round_41,funding_round_42,funding_round_43
0,e1393508-30ea-8a36-3f96-dd3226033abd,Wetpaint,"wetpaint.com, inc.",acquired,2005-06-01,2005-10-01,2008-05-19,,,2013-12-16,...,,,,,,,,,,
1,bf4d7b0e-b34d-2fd8-d292-6049c4f7efc7,Zoho,Zoho Corporation Pvt. Ltd.,operating,1996-03-17,NaT,,,,NaT,...,,,,,,,,,,
2,5f2b40b8-d1b3-d323-d81a-b7a8e89553d0,Digg,"Digg Holdings, LLC",acquired,2004-10-11,2005-10-28,2016-09-13,,,2012-07-12,...,,,,,,,,,,
3,f4d5ab44-058b-298b-ea81-380e6e9a8eec,Omidyar Network,Omidyar Network Services LLC,operating,2004-01-01,NaT,,,,NaT,...,,,,,,,,,,
4,df662812-7f97-0b43-9d3e-12f64f504fbb,Meta,"Meta Platforms, Inc.",ipo,2004-02-04,2004-09-01,2024-11-12,,2012-05-18,NaT,...,,,,,,,,,,
