# How to use `Source`?
## Synthetic Data

`tab2seq` package has a function that can generate synthetic datasets: `health`, `labour`, `income` and `survey`. Each of these has a unique data structure.

In [1]:
from tab2seq.datasets import generate_synthetic_data
import polars as pl

In [2]:
data_paths = generate_synthetic_data(output_dir="synthetic_data", 
                                     n_entities=10000, 
                                     seed=742, 
                                     registries=["health", "labour", "survey", "income"],
                                     file_format="parquet")
print("Generated synthetic data at:", data_paths)

Generated synthetic data at: {'health': PosixPath('synthetic_data/health.parquet'), 'labour': PosixPath('synthetic_data/labour.parquet'), 'survey': PosixPath('synthetic_data/survey.parquet'), 'income': PosixPath('synthetic_data/income.parquet')}


You can use `polars` to load and look at these datasets.

In [3]:
lf_health = pl.read_parquet(data_paths["health"])
lf_health.head()

entity_id,date,diagnosis,procedure,department,cost,length_of_stay
str,date,str,str,str,f64,i64
"""E00001""",2016-09-15,"""J18.1""","""CABG""","""gastroenterology""",7306.17,2
"""E00001""",2017-05-25,"""E78.0""","""XRAY""","""neurology""",138.65,1
"""E00001""",2018-01-18,"""E78.0""","""MRI""","""general_surgery""",6704.59,10
"""E00001""",2019-11-11,"""C34.1""","""ECHO""","""general_surgery""",910.12,0
"""E00001""",2020-05-20,"""E78.0""","""DIALYSIS""","""neurology""",2266.52,2


In [4]:
lf_labour = pl.read_parquet(data_paths["labour"])
lf_labour.sample(10)

entity_id,date,status,occupation,weekly_hours,residence_region,birthday
str,date,str,str,f64,str,date
"""E08275""",2018-10-01,"""employed""","""engineering""",35.2,"""island""",1958-01-03
"""E01592""",2015-09-01,"""unemployed""","""transport""",0.0,"""west""",1989-02-14
"""E03390""",2020-05-01,"""sick_leave""","""finance""",0.0,"""central""",1984-03-30
"""E07769""",2016-11-01,"""sick_leave""","""finance""",0.0,"""north""",1994-06-16
"""E09141""",2019-02-01,"""self_employed""","""transport""",34.7,"""east""",1993-03-08
"""E07700""",2023-08-01,"""employed""","""agriculture""",40.7,"""south""",1987-08-08
"""E04750""",2015-11-01,"""employed""","""manufacturing""",39.5,"""island""",1977-11-16
"""E00464""",2016-04-01,"""unemployed""","""IT""",0.0,"""north""",1990-12-06
"""E01583""",2023-09-01,"""self_employed""","""hospitality""",36.5,"""west""",1998-09-07
"""E08483""",2019-02-01,"""employed""","""retail""",36.7,"""central""",1993-02-02


## Sources
A `Source` represents a data table(-s) of a specific event type. This could be a hospital admissions registry,
an income registry, or a labor market record... you name it.

Each `Source` stores the information needed to read and validate that table:
1. where it lives on disk,
2. which column identifies the entity (e.g. a person, firm, or object),
3. which column holds the timestamp, and
4. which columns carry categorical or continuous features.

`Source` heavily relies on the `pydantic` configuration files: makes it straightforward to define new event types simply by writing 
a config, without touching any reading or validation logic.

**Note**: `Source` makes the first filtering and preprocessing steps by removing rows with empty `enitity_ids` 
and rows with empty `timestamp_cols` (in case you specified these).

In [5]:
from tab2seq.source import Source, SourceConfig, SourceCollection, CategoricalColConfig, ContinuousColConfig, TimestampColConfig

In [7]:
source_H = Source(config=SourceConfig(
    name="health",
    filepath="synthetic_data/health.parquet",
    id_col="entity_id",
    categorical_cols=[
        CategoricalColConfig(col_name="diagnosis", prefix="DIAG"),
        CategoricalColConfig(col_name="procedure", prefix="PROC"),
        CategoricalColConfig(col_name="department", prefix="DEPT"),
    ],
    continuous_cols=[
        ContinuousColConfig(col_name="cost", prefix="COST", n_bins=20, strategy="quantile"),
        ContinuousColConfig(col_name="length_of_stay", prefix="LOS", n_bins=20, strategy="quantile"),
    ],
    output_format="parquet",
    timestamp_cols=[
        TimestampColConfig(col_name="date", is_primary=True, drop_na=True)
    ]
))

print("Number of unique IDs:", len(source_H.get_entity_ids()))
source_H.process(cache=True)

Number of unique IDs: 9797


entity_id,date,diagnosis,procedure,department,cost,length_of_stay
str,date,str,str,str,f64,i64
"""E00001""",2016-09-15,"""DIAG_J18.1""","""PROC_CABG""","""DEPT_gastroenterology""",7306.17,2
"""E00001""",2017-05-25,"""DIAG_E78.0""","""PROC_XRAY""","""DEPT_neurology""",138.65,1
"""E00001""",2018-01-18,"""DIAG_E78.0""","""PROC_MRI""","""DEPT_general_surgery""",6704.59,10
"""E00001""",2019-11-11,"""DIAG_C34.1""","""PROC_ECHO""","""DEPT_general_surgery""",910.12,0
"""E00001""",2020-05-20,"""DIAG_E78.0""","""PROC_DIALYSIS""","""DEPT_neurology""",2266.52,2
…,…,…,…,…,…,…
"""E09999""",2021-11-10,"""DIAG_J96.0""","""PROC_SPIROMETRY""","""DEPT_pulmonology""",3082.13,1
"""E09999""",2024-12-15,"""DIAG_I25.1""","""PROC_DIALYSIS""","""DEPT_general_surgery""",1306.49,1
"""E10000""",2017-03-02,"""DIAG_S72.0""","""PROC_ECHO""","""DEPT_gastroenterology""",72.14,3
"""E10000""",2020-06-10,"""DIAG_E78.0""","""PROC_SPIROMETRY""","""DEPT_cardiology""",75.03,2


In [7]:
# or you could define the Source config separately and then create the Source

config_L = SourceConfig(
    name="labour",
    filepath="synthetic_data/labour.parquet",
    id_col="entity_id",
    categorical_cols=[
        CategoricalColConfig(col_name="status", prefix="STATUS"),
        CategoricalColConfig(col_name="occupation", prefix="OCC"),
        CategoricalColConfig(col_name="residence_region", prefix="REGION"),
    ],
    continuous_cols=[
        ContinuousColConfig(col_name="weekly_hours", prefix="WEEKLY_HOURS")
    ],
    output_format="parquet",
    timestamp_cols=[
        TimestampColConfig(col_name="date", is_primary=True, drop_na=True),
        TimestampColConfig(col_name="birthday", is_primary=False, drop_na=True),
    ],
)
source_L = Source(config=config_L)

print("Number of unique IDs:", len(source_L.get_entity_ids()))

Number of unique IDs: 10000


In [8]:
# You can also create a SourceCollection to manage multiple sources together
collection = SourceCollection(sources=[source_H, source_L])


print("All unique entity IDs in collection:", len(collection.get_all_entity_ids()))
#You can get access to the individual sources in the collection by running the following:
collection.sources

All unique entity IDs in collection: 10000


{'health': Source(name='health', path=PosixPath('synthetic_data/health.parquet')),
 'labour': Source(name='labour', path=PosixPath('synthetic_data/labour.parquet'))}

In [9]:
# Or you can make collections directly from configs
from tab2seq.source import SourceCollection, SourceConfig, CategoricalColConfig, ContinuousColConfig, TimestampColConfig

# Define your data sources
configs = [
    SourceConfig(
        name="health",
        filepath="synthetic_data/health.parquet",
        id_col="entity_id",
        categorical_cols=[
            CategoricalColConfig(col_name="diagnosis", prefix="DIAG"),
            CategoricalColConfig(col_name="procedure", prefix="PROC"),
            CategoricalColConfig(col_name="department", prefix="DEPT"),
        ],
        continuous_cols=[
            ContinuousColConfig(col_name="cost", prefix="COST", n_bins=20, strategy="quantile"),
            ContinuousColConfig(col_name="length_of_stay", prefix="LOS", n_bins=20, strategy="quantile"),
        ],
        output_format="parquet",
        timestamp_cols=[
            TimestampColConfig(col_name="date", is_primary=True, drop_na=True)
        ]
    ),
    SourceConfig(
        name="labour",
        filepath="synthetic_data/labour.parquet",
        id_col="entity_id",
        categorical_cols=[
            CategoricalColConfig(col_name="status", prefix="STATUS"),
            CategoricalColConfig(col_name="occupation", prefix="OCC"),
            CategoricalColConfig(col_name="residence_region", prefix="REGION"),
        ],
        continuous_cols=[
            ContinuousColConfig(col_name="weekly_hours", prefix="WEEKLY_HOURS")
        ],
        output_format="parquet",
        timestamp_cols=[
            TimestampColConfig(col_name="date", is_primary=True, drop_na=True),
            TimestampColConfig(col_name="birthday", is_primary=False, drop_na=True),
        ],
    ),
]

# Create a source collection
collection = SourceCollection.from_configs(configs)

# Access individual sources
health = collection["health"]
df = health.read_all()

# Or iterate over all sources
for source in collection:
    print(f"{source.name}: {len(source.get_entity_ids())} entities")

# Cross-source operations
all_entity_ids = collection.get_all_entity_ids()

health: 9797 entities
labour: 10000 entities
