# Data to Metadata to Dummy Data

In [2]:
import numpy as np
import pandas as pd
import yaml

from utils import make_random_unique_id

In [3]:
df = pd.read_csv("penguins.csv")
df.head(3)

Unnamed: 0,species,island,bill_length_mm,bill_depth_mm,flipper_length_mm,body_mass_g,sex
0,Adelie,Torgersen,39.1,18.7,181.0,3750.0,MALE
1,Adelie,Torgersen,39.5,17.4,186.0,3800.0,FEMALE
2,Adelie,Torgersen,40.3,18.0,195.0,3250.0,FEMALE


## Augment Penguin Example

### Create an id

In [36]:
# Maximum 3 contribution per penguin.
# A same penguin "species", "island", "sex" are the same through their life.
df = make_random_unique_id(
    df,
    id_column = "penguin_id",
    fixed_fields = ["species", "island", "sex"],
    max_contributions = 3
)

  df.groupby(fixed_fields, dropna=False, group_keys=False).apply(random_merge)


### Sex as boolean

In [38]:
df["sex"] = df["sex"].map({"MALE": 1, "FEMALE": 0}).astype(bool)

### Add a timestamp (datetime)

In [39]:
np.random.seed(42)
start = pd.Timestamp("2025-01-01")
end = pd.Timestamp("2025-12-31")

df["timestamp"] = start + pd.to_timedelta(
    np.random.randint(0, (end - start).days, size=len(df)),
    unit="D"
)

### Add a favourite number between 0 and 10 (categorical int)

In [44]:
df["favourite_number"] = np.random.randint(0, 11, size=len(df))

### Result

In [48]:
df.head(2)

Unnamed: 0,species,island,bill_length_mm,bill_depth_mm,flipper_length_mm,body_mass_g,sex,penguin_id,timestamp,favourite_number
0,Adelie,Torgersen,39.1,18.7,181.0,3750.0,True,0,2025-04-13,2
1,Adelie,Torgersen,39.5,17.4,186.0,3800.0,False,1,2025-12-15,6


## Generate metadata

In [49]:
def is_categorical_int(col, max_unique=20):
    if not pd.api.types.is_numeric_dtype(col):
        return False

    non_null = col.dropna()
    if len(non_null) == 0:
        return False

    is_int = (non_null % 1 == 0).all()
    return is_int and non_null.nunique() <= max_unique

In [50]:
def csvw_dtype(col):
    if pd.api.types.is_bool_dtype(col):
        return "boolean"
    if pd.api.types.is_datetime64_any_dtype(col):
        return "dateTime"
    if pd.api.types.is_numeric_dtype(col):
        return "double"
    return "string"

In [51]:
def compute_max_partition_length(col_data) -> int:
    # An upper bound on the number of records in any one partition.
    # If you don’t know how many records are in the data, you can specify a very loose upper bound,
    # for example, the size of the total population you are sampling from.
    return int(col_data.value_counts().max())


def compute_max_num_partitions(df, col) -> int:
    # An upper bound on the number of distinct partitions.
    return int(df[col].nunique())


def compute_max_influenced_partitions(df, id_col, col) -> int:
    # The greatest number of partitions any one individual can contribute to.
    # = Max number of different partitions (col values) an individual (id_col) appears in
    return int(df.groupby(id_col)[col].nunique().max())


def compute_max_partition_contributions(df, id_col, col) -> int:
    # The greatest number of records an individual may contribute to any one partition.
    # = For each (id, col_value) pair, count number of records and take the max over all such pairs
    return int(df.groupby([id_col, col], observed=True).size().max())


def compute_margins(df, individual_col, col) -> dict:
    # https://docs.opendp.org/en/stable/api/python/opendp.extras.polars.html#opendp.extras.polars.Margin
    margin_col_info = {}

    col_data = df[col].dropna()
    margin_col_info["max_partition_length"] = compute_max_partition_length(col_data)

    margin_col_info["max_num_partitions"] = compute_max_num_partitions(df, col)
    margin_col_info["max_influenced_partitions"] = compute_max_influenced_partitions(df, individual_col, col)
    margin_col_info["max_partition_contributions"] = compute_max_partition_contributions(df, individual_col, col)

    return margin_col_info

In [52]:
def generate_csvw_dp_metadata(
    df: pd.DataFrame,
    csv_url: str,
    individual_col: str,
    max_contributions: int = 2,
):
    CSVW_DP_CONTEXT = [
        "http://www.w3.org/ns/csvw",
        {"dp": "http://example.org/ns/csvw-dp-ext#"}
    ]
    meta = {
        "@context": CSVW_DP_CONTEXT,
        "url": csv_url,
        "tableSchema": {
            "dp:maxContributions": int(max_contributions),
            "dp:maxTableLength": int(len(df)),
            "dp:tableLength": int(len(df)),
            "columns": []
        }
    }

    for col in df.columns:
        col_data = df[col]
        nullable_prop = float(col_data.isna().mean())

        col_info = {
            "name": col,
            "datatype": csvw_dtype(col_data),
            "dp:privacyId": col == individual_col,
            "required": nullable_prop == 0,
            "dp:nullableProportion": round(nullable_prop, 2),
        }

        non_null = col_data.dropna()

        # ---------- Numeric ----------
        if pd.api.types.is_numeric_dtype(non_null):

            if is_categorical_int(non_null):
                # treat as categorical
                categories = sorted(non_null.astype(int).unique().tolist())
                col_info["datatype"] = "integer"
                col_info["dp:publicPartitions"] = categories
        
                margins = compute_margins(df, individual_col, col)
        
                col_info.update({
                    "dp:maxNumPartitions": margins["max_num_partitions"],
                    "dp:maxPartitionLength": margins["max_partition_length"],
                    "dp:maxInfluencedPartitions": margins["max_influenced_partitions"],
                    "dp:maxPartitionContribution": margins["max_partition_contributions"],
                })
        
            else:
                # continuous numeric
                col_info["datatype"] = "double"
                col_info["minimum"] = float(np.floor(non_null.min()))
                col_info["maximum"] = float(np.ceil(non_null.max()))

        # ---------- Categorical ----------
        elif col_info["datatype"] in ("string", "boolean"):
            categories = sorted(
                non_null.dropna().astype(str).unique().tolist()
            )
        
            if col_info["datatype"] == "boolean":
                col_info["dp:publicPartitions"] = [True, False]
            else:
                col_info["dp:publicPartitions"] = categories

                margins = compute_margins(df, individual_col, col)

                col_info.update({
                    "dp:maxNumPartitions": margins["max_num_partitions"],
                    "dp:maxPartitionLength": margins["max_partition_length"],
                    "dp:maxInfluencedPartitions": margins["max_influenced_partitions"],
                    "dp:maxPartitionContribution": margins["max_partition_contributions"],
                })

        # ---------- Datetime ----------
        elif pd.api.types.is_datetime64_any_dtype(col_data):
            col_info["datatype"] = "dateTime"
            col_info["minimum"] = str(non_null.min())
            col_info["maximum"] = str(non_null.max())
    
        meta["tableSchema"]["columns"].append(col_info)

    return meta

In [53]:
metadata = generate_csvw_dp_metadata(
    df,
    csv_url="penguins.csv",
    individual_col="penguin_id",
    max_contributions=3
)

In [54]:
metadata

{'@context': ['http://www.w3.org/ns/csvw',
  {'dp': 'http://example.org/ns/csvw-dp-ext#'}],
 'url': 'penguins.csv',
 'tableSchema': {'dp:maxContributions': 3,
  'dp:maxTableLength': 344,
  'dp:tableLength': 344,
  'columns': [{'name': 'species',
    'datatype': 'string',
    'dp:privacyId': False,
    'required': True,
    'dp:nullableProportion': 0.0,
    'dp:publicPartitions': ['Adelie', 'Chinstrap', 'Gentoo'],
    'dp:maxNumPartitions': 3,
    'dp:maxPartitionLength': 152,
    'dp:maxInfluencedPartitions': 1,
    'dp:maxPartitionContribution': 3},
   {'name': 'island',
    'datatype': 'string',
    'dp:privacyId': False,
    'required': True,
    'dp:nullableProportion': 0.0,
    'dp:publicPartitions': ['Biscoe', 'Dream', 'Torgersen'],
    'dp:maxNumPartitions': 3,
    'dp:maxPartitionLength': 168,
    'dp:maxInfluencedPartitions': 1,
    'dp:maxPartitionContribution': 3},
   {'name': 'bill_length_mm',
    'datatype': 'double',
    'dp:privacyId': False,
    'required': False,
    '

In [55]:
metadata_path = "automated_penguin_metadata.yaml"
with open(metadata_path, "w") as f:
    yaml.dump(metadata, f, sort_keys=False)
with open(metadata_path, "r") as f: # ensure readable format
    metadata = yaml.safe_load(f)

In [56]:
def make_dummy_dataset_csvw_dp(metadata: dict, nb_rows: int = 100, seed: int = 0) -> pd.DataFrame:
    """
    Create a dummy dataset from CSVW-DP metadata.
    """
    rng = np.random.default_rng(seed)
    data_dict = {}

    columns = metadata["tableSchema"]["columns"]

    for col in columns:
        name = col["name"]
        dtype = col["datatype"]
        nullable_prop = col.get("dp:nullableProportion", 0)

        # ---------- STRING ----------
        if dtype == "string":
            categories = col.get("dp:publicPartitions", [])
            if not categories:
                raise ValueError(f"No categories for string column {name}")

            serie = pd.Series(rng.choice(categories, size=nb_rows), dtype="string")

        # ---------- BOOLEAN ----------
        elif dtype == "boolean":
            serie = pd.Series(rng.choice([True, False], size=nb_rows), dtype="boolean")

        # ---------- INTEGER (categorical or continuous) ----------
        elif dtype == "integer":
            if "dp:publicPartitions" in col:
                # categorical integer
                categories = col["dp:publicPartitions"]
                serie = pd.Series(rng.choice(categories, size=nb_rows), dtype="Int64")
            else:
                low = int(col["minimum"])
                high = int(col["maximum"])
                serie = pd.Series(
                    rng.integers(low, high + 1, size=nb_rows),
                    dtype="Int64"
                )

        # ---------- FLOAT ----------
        elif dtype == "double":
            low = float(col["minimum"])
            high = float(col["maximum"])
            serie = pd.Series(
                low + (high - low) * rng.random(size=nb_rows),
                dtype="float64"
            )

        # ---------- DATETIME ----------
        elif dtype == "dateTime":
            dates = pd.date_range(start=col["minimum"], end=col["maximum"])
            serie = pd.Series(rng.choice(dates, size=nb_rows))

        else:
            raise ValueError(f"Unsupported datatype {dtype} for column {name}")

        # ---------- INSERT NULLS ----------
        if nullable_prop > 0:
            n_null = int(nb_rows * nullable_prop)
            if n_null > 0:
                idx = rng.choice(serie.index, size=n_null, replace=False)
                if dtype == "dateTime":
                    serie.loc[idx] = pd.NaT
                else:
                    serie.loc[idx] = pd.NA

        data_dict[name] = serie

    return pd.DataFrame(data_dict)

In [57]:
dummy_df = make_dummy_dataset_csvw_dp(metadata, nb_rows = 100, seed = 0)

In [58]:
dummy_df

Unnamed: 0,species,island,bill_length_mm,bill_depth_mm,flipper_length_mm,body_mass_g,sex,penguin_id,timestamp,favourite_number
0,Gentoo,Dream,45.439662,14.687569,206.415883,3041.196042,0,105.131976,2025-11-18,9
1,Chinstrap,Torgersen,38.506442,19.052740,174.372875,2717.637894,1,79.360792,2025-10-27,7
2,Chinstrap,Biscoe,54.452656,14.755967,213.977723,3862.514893,0,223.212853,2025-07-16,10
3,Adelie,Biscoe,57.858844,18.199191,205.572525,6266.680986,0,90.787299,2025-03-05,4
4,Adelie,Torgersen,39.451648,18.420153,220.731476,3652.902462,1,295.760504,2025-05-05,2
...,...,...,...,...,...,...,...,...,...,...
95,Chinstrap,Torgersen,47.999077,14.104216,216.145232,6248.770687,1,64.279323,2025-06-06,2
96,Adelie,Dream,,21.403920,177.114198,4240.888269,1,108.464101,2025-11-14,8
97,Adelie,Torgersen,53.634178,19.156454,197.125518,5734.852973,0,129.210176,2025-11-01,2
98,Chinstrap,Dream,59.391440,20.414032,195.408361,2992.765289,1,169.510328,2025-02-16,2
