In [1]:
# Tasks Prototypes
# file: prototypes.py
from abc import ABC, abstractmethod


class Task(ABC):
    @property
    def name(self) -> str:
        return self.__class__.__name__

    @abstractmethod
    def run(self, *args, **kwargs):
        pass

In [2]:
# DataTasks gather and runner
# file: data/datatasks.py
# from prototypes import Task


from __future__ import annotations
from queue import PriorityQueue
import pandas as pd


class DataTasks:
    def __init__(self, tasks: PriorityQueue = None) -> DataTasks:
        if tasks is None:
            self.tasks = PriorityQueue()

    def set_task(self, priority: int, task: Task) -> DataTasks:
        self.tasks.put((priority, task))
        return self

    def run(self, data: pd.DataFrame) -> pd.DataFrame:
        while not self.tasks.empty():
            _, task = self.tasks.get()
            logger.debug(f"priority: {_} task: {task.name}")
            data = task.run(data)
        return data

In [3]:
# Tasks that can be divided in there own files
# from prototypes import Task

from typing import Literal
import numpy as np
from loguru import logger
from sklearn.compose import make_column_transformer
from sklearn.pipeline import make_pipeline
from sklearn.preprocessing import FunctionTransformer, OneHotEncoder

from sklearn import set_config


set_config(transform_output="pandas")


transformer = make_column_transformer(
    (
        FunctionTransformer(np.log),
        [
            "Weight",
            "Length1",
        ],
    ),
    (
        OneHotEncoder(sparse_output=False),
        [
            "Species",
        ],
    ),
    verbose_feature_names_out=False,
    remainder="passthrough",
)


class DropZerosTask(Task):
    def run(self, data: pd.DataFrame) -> pd.DataFrame:
        cleaned_data = data[~data.select_dtypes("number").eq(0).any(axis=1)]
        return cleaned_data


class DropColumnsTask(Task):
    def __init__(self, columns: list[str]):
        self.columns = columns

    def run(self, data: pd.DataFrame) -> pd.DataFrame:
        cleaned_data = data.drop(columns=self.columns)
        return cleaned_data


# TODO: decoractor to save and load transformer
class TransformerTask(Task):
    def __init__(self, stage: Literal["train", "predict"] = "train"):
        self.stage = stage

    def run(self, data: pd.DataFrame) -> pd.DataFrame:
        if self.stage == "predict":
            cleaned_data = transformer.transform(data)
        else:
            cleaned_data = transformer.fit_transform(data)

        return cleaned_data


class Floats2IntsTask(Task):
    def __init__(self, columns: list[str]):
        self.columns = columns

    def run(self, data: pd.DataFrame) -> pd.DataFrame:
        cleaned_data = data  # .copy()
        cleaned_data[self.columns] = cleaned_data[self.columns].transform(
            pd.to_numeric,
            errors="coerce",
            downcast="integer",
        )
        return cleaned_data

In [4]:
# Implentation
# from data.datatasks import DataTasks
# from data.tasks import ...


def process_data(data: pd.DataFrame) -> pd.DataFrame:
    data_chain = DataTasks()
    (
        data_chain.set_task(priority=2, task=DropZerosTask())
        .set_task(
            priority=1,
            task=DropColumnsTask(
                columns=[
                    "Length2",
                    "Length3",
                ]
            ),
        )
        .set_task(priority=3, task=TransformerTask())
        .set_task(
            priority=4,
            task=Floats2IntsTask(
                columns=[
                    "Species_Bream",
                    "Species_Parkki",
                    "Species_Perch",
                    "Species_Whitefish",
                ]
            ),
        )
        # Add more or remove tasks to the chain as needed
    )

    # Send data through the chain
    return data_chain.run(data)

In [5]:
# Usage:

URI = "https://raw.githubusercontent.com/Ankit152/Fish-Market/main/Fish.csv"
dataf = pd.read_csv(URI)

In [6]:
dataf[dataf.select_dtypes("number").eq(0).any(axis=1)]

Unnamed: 0,Species,Weight,Length1,Length2,Length3,Height,Width
40,Roach,0.0,19.0,20.5,22.8,6.4752,3.3516


In [7]:
processed_data = process_data(dataf)

[32m2023-08-25 11:41:52.032[0m | [34m[1mDEBUG   [0m | [36m__main__[0m:[36mrun[0m:[36m23[0m - [34m[1mpriority: 1 task: DropColumnsTask[0m
[32m2023-08-25 11:41:52.039[0m | [34m[1mDEBUG   [0m | [36m__main__[0m:[36mrun[0m:[36m23[0m - [34m[1mpriority: 2 task: DropZerosTask[0m
[32m2023-08-25 11:41:52.046[0m | [34m[1mDEBUG   [0m | [36m__main__[0m:[36mrun[0m:[36m23[0m - [34m[1mpriority: 3 task: TransformerTask[0m
[32m2023-08-25 11:41:52.059[0m | [34m[1mDEBUG   [0m | [36m__main__[0m:[36mrun[0m:[36m23[0m - [34m[1mpriority: 4 task: Floats2IntsTask[0m


In [8]:
processed_data #.loc[39:41, "Weight"]

Unnamed: 0,Weight,Length1,Species_Bream,Species_Parkki,Species_Perch,Species_Pike,Species_Roach,Species_Smelt,Species_Whitefish,Height,Width
0,5.488938,3.144152,1,0,0,0.0,0.0,0.0,0,11.5200,4.0200
1,5.669881,3.178054,1,0,0,0.0,0.0,0.0,0,12.4800,4.3056
2,5.828946,3.173878,1,0,0,0.0,0.0,0.0,0,12.3778,4.6961
3,5.894403,3.269569,1,0,0,0.0,0.0,0.0,0,12.7300,4.4555
4,6.063785,3.277145,1,0,0,0.0,0.0,0.0,0,12.4440,5.1340
...,...,...,...,...,...,...,...,...,...,...,...
154,2.501436,2.442347,0,0,0,0.0,0.0,1.0,0,2.0904,1.3936
155,2.595255,2.459589,0,0,0,0.0,0.0,1.0,0,2.4300,1.2690
156,2.501436,2.493205,0,0,0,0.0,0.0,1.0,0,2.2770,1.2558
157,2.980619,2.580217,0,0,0,0.0,0.0,1.0,0,2.8728,2.0672
