In [1]:
import os

import pyarrow as pa

import pandas as pd
import dask.dataframe as dd
from src.fondant.pipeline import Pipeline

from src.fondant.pipeline.runner import DockerRunner
from src.fondant.pipeline.compiler import DockerCompiler

from src.fondant.component import PandasTransformComponent, DaskLoadComponent, DaskTransformComponent
os.environ["DOCKER_DEFAULT_PLATFORM"]="linux/amd64"

In [None]:

# generate some data
data = pd.DataFrame(
        {
            "x": [1, 2, 3],
            "y": [4, 5, 6],
        },
        index=pd.Index(["a", "b", "c"], name="id"),
    )

data.to_parquet("./foobar/sample.parquet")

In [None]:



pipeline = Pipeline(name="foo", description="bar", base_path="/Users/georgeslorre/ML6/internal/fondant-xmas/foobar")


dataset = pipeline.read(
    name_or_path="components/load_from_parquet",
    arguments={
        "dataset_uri": "../../foobar/sample.parquet",
    },
    produces={"x": pa.int32(), "y": pa.int32()},
)

DockerCompiler().compile(pipeline=pipeline)
DockerRunner().run(input="./docker-compose.yml")


In [None]:
class Addition(PandasTransformComponent):
    def __init__(self, *_, **__):
        pass

    def transform(self, dataframe: pd.DataFrame) -> pd.DataFrame:
        dataframe['z'] = dataframe['x'] + dataframe['y']
        return dataframe

Addition().__class__.__name__

In [None]:
%tb
_ = dataset.execute(component=Addition, produces={"z": pa.int32()}, consumes={"x": pa.int32(), "y": pa.int32()})

In [None]:
df = dd.read_parquet("./foobar/foo/foo-20231228164227/load_from_parquet")

In [8]:
"""This component loads a seed dataset from the hub."""
import logging
import typing as t

import dask
import dask.dataframe as dd
import pandas as pd
from fondant.component import DaskLoadComponent
from fondant.core.schema import Field

logger = logging.getLogger(__name__)

dask.config.set({"dataframe.convert-string": False})


class LoadFromParquet(DaskLoadComponent):
    def __init__(
        self,
        *,
        produces: t.Dict[str, Field],
        dataset_uri: str,
        column_name_mapping: t.Optional[dict],
        n_rows_to_load: t.Optional[int],
        index_column: t.Optional[str],
        **kwargs,
    ) -> None:
        """
        Args:
            produces: The schema the component should produce
            dataset_uri: The remote path to the parquet file/folder containing the dataset
            column_name_mapping: Mapping of the consumed dataset to fondant column names
            n_rows_to_load: optional argument that defines the number of rows to load. Useful for
              testing pipeline runs on a small scale.
            index_column: Column to set index to in the load component, if not specified a default
                globally unique index will be set.
            kwargs: Unhandled keyword arguments passed in by Fondant.
        """
        self.dataset_uri = dataset_uri
        self.column_name_mapping = column_name_mapping
        self.n_rows_to_load = n_rows_to_load
        self.index_column = index_column
        self.produces = produces

    def get_columns_to_keep(self) -> t.List[str]:
        # Only read required columns
        columns = []

        if self.column_name_mapping:
            invert_column_name_mapping = {
                v: k for k, v in self.column_name_mapping.items()
            }
        else:
            invert_column_name_mapping = {}

        for field_name, field in self.produces.items():
            column_name = field_name
            if invert_column_name_mapping and column_name in invert_column_name_mapping:
                columns.append(invert_column_name_mapping[column_name])
            else:
                columns.append(column_name)

        if self.index_column is not None:
            columns.append(self.index_column)

        return columns

    def set_df_index(self, dask_df: dd.DataFrame) -> dd.DataFrame:
        if self.index_column is None:
            logger.info(
                "Index column not specified, setting a globally unique index",
            )

            def _set_unique_index(dataframe: pd.DataFrame, partition_info=None):
                """Function that sets a unique index based on the partition and row number."""
                dataframe["id"] = 1
                dataframe["id"] = (
                    str(partition_info["number"])
                    + "_"
                    + (dataframe.id.cumsum()).astype(str)
                )
                dataframe.index = dataframe.pop("id")
                return dataframe

            def _get_meta_df() -> pd.DataFrame:
                meta_dict = {"id": pd.Series(dtype="object")}
                for field_name, field in self.produces.items():
                    meta_dict[field_name] = pd.Series(
                        dtype=pd.ArrowDtype(field.type.value),
                    )
                return pd.DataFrame(meta_dict).set_index("id")

            meta = _get_meta_df()
            dask_df = dask_df.map_partitions(_set_unique_index, meta=meta)
        else:
            logger.info(f"Setting `{self.index_column}` as index")
            dask_df = dask_df.set_index(self.index_column, drop=True)

        return dask_df

    def return_subset_of_df(self, dask_df: dd.DataFrame) -> dd.DataFrame:
        if self.n_rows_to_load is not None:
            partitions_length = 0
            npartitions = 1
            for npartitions, partition in enumerate(dask_df.partitions, start=1):
                if partitions_length >= self.n_rows_to_load:
                    logger.info(
                        f"""Required number of partitions to load\n
                    {self.n_rows_to_load} is {npartitions}""",
                    )
                    break
                partitions_length += len(partition)
            dask_df = dask_df.head(self.n_rows_to_load, npartitions=npartitions)
            dask_df = dd.from_pandas(dask_df, npartitions=npartitions)
        return dask_df

    def load(self) -> dd.DataFrame:
        # 1) Load data, read as Dask dataframe
        logger.info("Loading dataset from the hub...")

        columns = self.get_columns_to_keep()

        logger.debug(f"Columns to keep: {columns}")
        dask_df = dd.read_parquet(self.dataset_uri, columns=columns)

        # 2) Rename columns
        if self.column_name_mapping:
            logger.info("Renaming columns...")
            dask_df = dask_df.rename(columns=self.column_name_mapping)

        # 4) Optional: only return specific amount of rows
        dask_df = self.return_subset_of_df(dask_df)

        # 5) Set the index
        dask_df = self.set_df_index(dask_df)
        return dask_df


In [9]:
pipeline2 = Pipeline(name="foo", description="bar", base_path="/Users/georgeslorre/ML6/internal/fondant-xmas/foobar2")

dataset = pipeline2.read(
    name_or_path="components/load_from_parquet",
    arguments={
        "dataset_uri": "../../foobar/sample.parquet",
    },
    produces={"x": pa.int32(), "y": pa.int32()},
)


class LoadFromParquett(DaskLoadComponent):
    def __init__(self, *_, **__):
        pass
    def load(self) -> dd.DataFrame:
        dask_df = dd.read_parquet("./foobar/sample.parquet", columns=["x", "y"])
        return dask_df


dataset1 = dataset.execute(
    component=LoadFromParquett,
    produces={"x": pa.int32(), "y": pa.int32()},
)


class Addition(PandasTransformComponent):
    def __init__(self, *_, **__):
        pass

    def transform(self, dataframe: pd.DataFrame) -> pd.DataFrame:
        dataframe['z'] = dataframe['x'] + dataframe['y']
        return dataframe


dataset2 = dataset1.execute(
    component=Addition,
    produces={"z": pa.int32()},
    consumes={"x": pa.int32(), "y": pa.int32()},
)


TypeError: LoadFromParquet.__init__() missing 5 required keyword-only arguments: 'produces', 'dataset_uri', 'column_name_mapping', 'n_rows_to_load', and 'index_column'