# Python OOP

**Goal:** Use Object-Oriented Programming (OOP) to model reusable components such as **connectors**, **pipeline steps**, **validators**, and **AI workflow building blocks**.

**Prerequisites:** You already know basic Python syntax (functions, loops, dictionaries, lists).


## Learning objectives

By the end of this lesson, you should be able to:

- Explain **class vs object (instance)** and what `self` means.
- Create classes with **attributes** and **methods** using `__init__`.
- Distinguish **instance attributes** vs **class attributes**.
- Use `@classmethod` and `@staticmethod` appropriately.
- Apply **inheritance** + `super()` to reuse behavior.
- Use **polymorphism** to build pluggable pipeline steps.
- Use **encapsulation** with `@property` to enforce invariants (validation).


## Why OOP matters in Data Engineering / AI

In real projects you rarely write one-off scripts. You build **reusable components**:

- **Extractors**: read from files, APIs, databases.
- **Transformers**: clean, normalize, enrich, feature-engineer.
- **Validators**: schema checks, null checks, quality gates.
- **Loaders**: write to a warehouse, data lake, vector DB.
- **AI steps**: chunking, embedding, reranking, prompt formatting.

OOP gives you a clean way to package:

- **State** (configuration, credentials, batch sizes, paths)
- **Behavior** (read/transform/write/run)


## Quick mental model: class vs instance

- A **class** is a **blueprint** (definition).
- An **instance/object** is a **concrete thing** built from that blueprint.

Think of a pipeline step:

- Class: `FilterColumns` (the idea of filtering columns)
- Instance: `FilterColumns(columns=["id", "amount"])` (a specific configuration)

**Key takeaway:** Most bugs happen when we confuse *shared state* (class level) with *per-object state* (instance level).


In [7]:
# A tiny pipeline-step interface we'll reuse throughout the notebook

class PipelineStep:
    '''A minimal interface for a pipeline step.

    Each step receives `data` and returns transformed data.
    '''
    def run(self, data):
        raise NotImplementedError("Subclasses must implement run(data)")


class AddConstantColumn(PipelineStep):
    def __init__(self, column_name, value):
        self.column_name = column_name
        self.value = value

    def run(self, rows):
        # rows is a list[dict], a common "row-like" representation in DE demos
        out = []
        for r in rows:
            r2 = dict(r)
            r2[self.column_name] = self.value
            out.append(r2)
        return out


rows = [{"id": 1, "amount": 10}, {"id": 2, "amount": 15}]
step = AddConstantColumn(column_name="source", value="web")
print(step.run(rows))


[{'id': 1, 'amount': 10, 'source': 'web'}, {'id': 2, 'amount': 15, 'source': 'web'}]


In [9]:
class MissingRun(PipelineStep):
    pass

In [10]:
MR = MissingRun()

In [13]:
s = MR.run({})

NotImplementedError: Subclasses must implement run(data)

In [15]:
assert MR.run({}), "Subclasses must implement run(data)"

NotImplementedError: Subclasses must implement run(data)

---

## 1) Defining a class

Basic syntax:

```python
class MyClass:
    ...
```

- Use **CapWords** for class names: `CSVReader`, `VectorIndexer`.
- Keep behavior close to the data/config it uses.


In [2]:
class Empty:
    pass


e = Empty()
print(type(e))


<class '__main__.Empty'>


## 2) `__init__`, `self`, attributes, and methods

- `__init__` runs when you create the object.
- `self` is the *current instance*.
- Attributes store the instance state (configuration).
- Methods implement behavior.

We'll model a **CSV extraction config**.


In [8]:
class CSVExtractor:
    def __init__(self, path, delimiter=",", encoding="utf-8"):
        self.path = path
        self.delimiter = delimiter
        self.encoding = encoding

    def describe(self):
        return f"CSVExtractor(path={self.path}, delimiter={self.delimiter}, encoding={self.encoding})"


extractor = CSVExtractor("data/sales.csv", delimiter=";")
print(extractor.describe())
print(extractor.path)


CSVExtractor(path=data/sales.csv, delimiter=;, encoding=utf-8)
data/sales.csv


### Common beginner mistakes

- Forgetting parentheses: `obj.method` vs `obj.method()`
- Forgetting `self` in method signature
- Shadowing attributes with local variables (e.g., using `path = ...` instead of `self.path = ...`)


In [4]:
# obj.method vs obj.method()

print(extractor.describe)    # this is a bound method object
print(extractor.describe())  # this executes the method


<bound method CSVExtractor.describe of <__main__.CSVExtractor object at 0x7777e0200510>>
CSVExtractor(path=data/sales.csv, delimiter=;, encoding=utf-8)


---

## 3) Instance attributes vs class attributes

- **Instance attribute**: stored on each object (different per instance).
- **Class attribute**: stored on the class (shared across instances).

In DE/AI, class attributes are often used for **defaults** (timeouts, retries).
Be careful: if it’s mutable, you can accidentally share state across objects.


In [9]:
class APIClient:
    default_timeout_seconds = 10  # class attribute (shared default)

    def __init__(self, base_url, timeout_seconds=None):
        self.base_url = base_url
        self.timeout_seconds = timeout_seconds if timeout_seconds is not None else APIClient.default_timeout_seconds


c1 = APIClient("https://api.example.com", timeout_seconds=5)
c2 = APIClient("https://api.example.com")

print(c1.timeout_seconds)  # 5 (instance override)
print(c2.timeout_seconds)  # 10 (class default)


5
10


### Pitfall: mutable class attributes (shared state)

If you use a mutable object (like a list/dict) as a class attribute, **all instances share it**.
This is a very common bug.


In [10]:
class BadConfig:
    tags = []  # shared across all instances (danger!)

    def __init__(self, name):
        self.name = name


a = BadConfig("job_a")
b = BadConfig("job_b")

a.tags.append("critical")
print("a.tags:", a.tags)
print("b.tags:", b.tags)  # surprise: it changed too


a.tags: ['critical']
b.tags: ['critical']


**Fix:** use an instance attribute instead.


In [11]:
class GoodConfig:
    def __init__(self, name):
        self.name = name
        self.tags = []  # unique per instance


a = GoodConfig("job_a")
b = GoodConfig("job_b")

a.tags.append("critical")
print("a.tags:", a.tags)
print("b.tags:", b.tags)


a.tags: ['critical']
b.tags: []


---

## 4) `@classmethod`: alternative constructors and class-wide config

A **class method**:

- Receives the class as `cls` (not the instance).
- Is commonly used for **alternative constructors** like:
  - `from_config(dict)`
  - `from_env()`
  - `from_uri(uri)`

This is extremely common in DE/AI codebases.


In [12]:
import os

class WarehouseConnector:
    default_schema = "public"

    def __init__(self, host, user, password, schema=None):
        self.host = host
        self.user = user
        self.password = password
        self.schema = schema if schema is not None else WarehouseConnector.default_schema

    @classmethod
    def from_config(cls, cfg: dict):
        return cls(
            host=cfg["host"],
            user=cfg["user"],
            password=cfg["password"],
            schema=cfg.get("schema")
        )

    @classmethod
    def from_env(cls):
        # Example env vars: WAREHOUSE_HOST, WAREHOUSE_USER, WAREHOUSE_PASSWORD
        return cls(
            host=os.environ.get("WAREHOUSE_HOST", "localhost"),
            user=os.environ.get("WAREHOUSE_USER", "admin"),
            password=os.environ.get("WAREHOUSE_PASSWORD", "admin"),
        )


cfg = {"host": "db.company.net", "user": "etl", "password": "secret", "schema": "analytics"}
conn = WarehouseConnector.from_config(cfg)
print(conn.host, conn.user, conn.schema)


db.company.net etl analytics


In [24]:
os.environ["WAREHOUSE_PASSWORD"] = "not_admin"

In [25]:
os.environ.get("WAREHOUSE_PASSWORD")

'not_admin'

In [26]:
conn = WarehouseConnector.from_env()
print(conn.host, conn.user, conn.schema)

localhost admin public


---

## 5) `@staticmethod`: helper functions in a class namespace

A **static method**:

- Receives **no** `self` and **no** `cls`.
- Is a function that conceptually belongs to the class domain.
- Typical uses: parsing, validation, small conversions.


In [1]:
class PathUtils:
    @staticmethod
    def normalize_slashes(path: str) -> str:
        return path.replace("\\", "/")

    @staticmethod
    def is_parquet(path: str) -> bool:
        return PathUtils.normalize_slashes(path).lower().endswith(".parquet")


print(PathUtils.normalize_slashes(r"data\raw\events.parquet"))
print(PathUtils.is_parquet("data/raw/events.parquet"))


data/raw/events.parquet
True


In [5]:
r"data\raw\events.parquet"

'data\\raw\\events.parquet'

In [4]:
print(r"data\raw\events.parquet")

data\raw\events.parquet


---

## 6) Inheritance + `super()`: reuse and extend behavior

Inheritance models **"is-a"** relationships:

- `ExtractStep` is-a `PipelineStep`
- `TransformStep` is-a `PipelineStep`

We'll build a small hierarchy:
- `PipelineStep` (base)
- `NamedStep` (adds step naming + logging)
- concrete steps (extract/transform)

`super()` is how you call parent logic safely.


In [8]:
class NamedStep(PipelineStep):
    def __init__(self, name):
        self.name = name

    def _log(self, message):
        print(f"[{self.name}] {message}")


class FilterColumns(NamedStep):
    def __init__(self, name, keep_columns):
        super().__init__(name=name)  # call parent init
        self.keep_columns = set(keep_columns)

    def run(self, rows):
        self._log(f"Keeping columns: {sorted(self.keep_columns)}")
        out = []
        for r in rows:
            out.append({k: v for k, v in r.items() if k in self.keep_columns})
        return out


rows = [{"id": 1, "amount": 10, "country": "MX"}, {"id": 2, "amount": 15, "country": "US"}]
step = FilterColumns(name="filter_cols", keep_columns=["id", "amount"])
print(step.run(rows))


[filter_cols] Keeping columns: ['amount', 'id']
[{'id': 1, 'amount': 10}, {'id': 2, 'amount': 15}]


---

## 7) Polymorphism: same interface, different implementations

Polymorphism is the foundation of **pluggable pipelines**.

If every step implements:

```python
step.run(data) -> data
```

Then we can build a generic pipeline runner that does not care about the concrete class.


In [16]:
def run_pipeline(steps, data):
    for step in steps:
        data = step.run(data)
    return data


class MultiplyAmount(NamedStep):
    def __init__(self, name, factor):
        super().__init__(name=name)
        self.factor = factor

    def run(self, rows):
        self._log(f"Multiplying amount by {self.factor}")
        out = []
        for r in rows:
            r2 = dict(r)
            r2["amount"] = r2["amount"] * self.factor
            out.append(r2)
        return out


pipeline = [
    FilterColumns(name="cols", keep_columns=["id", "amount"]),
    MultiplyAmount(name="mul", factor=2),
    AddConstantColumn(column_name="source", value="batch_2026_01_27"),
]

rows = [{"id": 1, "amount": 10, "country": "MX"}, {"id": 2, "amount": 15, "country": "US"}]
print(run_pipeline(pipeline, rows))


[cols] Keeping columns: ['amount', 'id']
[mul] Multiplying amount by 2
[{'id': 1, 'amount': 20, 'source': 'batch_2026_01_27'}, {'id': 2, 'amount': 30, 'source': 'batch_2026_01_27'}]


### Duck typing (Pythonic)

In Python we often rely on duck typing:

> “If it has a `run(data)` method, we can run it.”

That’s why you can mix different step classes in the same pipeline.


---

## 8) Encapsulation with `@property`: enforce invariants

Encapsulation means: keep your internal state consistent and validated.

Python does not enforce strict privacy, but it provides conventions:

- `_x` means “internal/protected by convention”
- `__x` triggers **name mangling** (harder to access accidentally)

`@property` lets you expose a clean API while validating assignments.


In [18]:
class BatchConfig:
    def __init__(self, batch_size):
        self.batch_size = batch_size  # triggers setter

    @property
    def batch_size(self):
        return self._batch_size

    @batch_size.setter
    def batch_size(self, value):
        if not isinstance(value, int) or value <= 0:
            raise ValueError("batch_size must be a positive integer")
        self._batch_size = value

    @property
    def is_large_batch(self):
        return self._batch_size >= 1000


cfg = BatchConfig(batch_size=500)
print(cfg.batch_size, cfg.is_large_batch)

cfg.batch_size = 2000
print(cfg.batch_size, cfg.is_large_batch)

cfg.batch_size = -1  # uncomment to see the validation error


500 False
2000 True


ValueError: batch_size must be a positive integer

In [25]:
cfg.batch_size = -5

ValueError: batch_size must be a positive integer

In [23]:
cfg._batch_size = -5

In [None]:
cfg.__

---

## 9) Embedding preparation (toy example)

We’ll keep it simple: pretend we are preparing records for an embedding model.

- Input rows: `{"id": ..., "text": ...}`
- Output rows: add `text_len` and `chunk_id` fields

The point is not the embedding itself; it’s the **OOP structure** that supports AI workflows.


In [27]:
class PrepareForEmbedding(NamedStep):
    def __init__(self, name, max_chars=100):
        super().__init__(name=name)
        self.max_chars = max_chars

    @staticmethod
    def _chunk_text(text: str, max_chars: int):
        # naive chunking: split into fixed-size pieces
        return [text[i:i + max_chars] for i in range(0, len(text), max_chars)]

    def run(self, rows):
        self._log(f"Chunking text with max_chars={self.max_chars}")
        out = []
        for r in rows:
            chunks = self._chunk_text(r["text"], self.max_chars)
            for idx, chunk in enumerate(chunks):
                out.append({
                    "id": r["id"],
                    "chunk_id": idx,
                    "text": chunk,
                    "text_len": len(chunk),
                })
        return out


rows = [{"id": 1, "text": "hello " * 50}]
prep = PrepareForEmbedding(name="prep_embed", max_chars=40)
out = prep.run(rows)
print(out[:3])
print("total chunks:", len(out))


[prep_embed] Chunking text with max_chars=40
[{'id': 1, 'chunk_id': 0, 'text': 'hello hello hello hello hello hello hell', 'text_len': 40}, {'id': 1, 'chunk_id': 1, 'text': 'o hello hello hello hello hello hello he', 'text_len': 40}, {'id': 1, 'chunk_id': 2, 'text': 'llo hello hello hello hello hello hello ', 'text_len': 40}]
total chunks: 8


In [26]:
"hello " * 50

'hello hello hello hello hello hello hello hello hello hello hello hello hello hello hello hello hello hello hello hello hello hello hello hello hello hello hello hello hello hello hello hello hello hello hello hello hello hello hello hello hello hello hello hello hello hello hello hello hello hello '

---

## 10) (Optional) `@dataclass` for configuration objects

In DE/AI projects, you often have many config objects.
`@dataclass` reduces boilerplate and improves readability.

Use it when your class is mainly “data with light behavior”.


In [28]:
from dataclasses import dataclass

@dataclass
class JobConfigDC:
    job_name: str
    retries: int = 3
    timeout_seconds: int = 60


cfg = JobConfigDC(job_name="daily_sales_load", retries=5)
print(cfg)


JobConfigDC(job_name='daily_sales_load', retries=5, timeout_seconds=60)


---

## Summary

- **Classes** model reusable components; **instances** hold specific configurations.
- `__init__` sets instance state; `self` points to the current object.
- **Instance attributes** differ per object; **class attributes** are shared defaults.
- `@classmethod` is great for **alternative constructors** (`from_config`, `from_env`).
- `@staticmethod` is a **namespaced helper**.
- **Inheritance + `super()`** reuse and extend behavior.
- **Polymorphism** enables pluggable pipelines: “same interface, different implementation”.
- **Encapsulation** with `@property` helps enforce invariants.


> Content created by **Carlos Cruz-Maldonado**.  
> Updated with additional best practices and Data Engineering examples.

In [2]:
import argparse


In [None]:
%%writefile run_job.py
import argparse

from dataclasses import dataclass
from pathlib import Path


@dataclass(frozen=True)
class JobArgs:
    source_id: str
    full_load: bool
    job_id: str
    output_dir: Path
    retries: int
    
    
def run_pipeline(steps, data):
    for step in steps:
        data = step.run(data)
    return data


class MultiplyAmount(NamedStep):
    def __init__(self, name, factor):
        super().__init__(name=name)
        self.factor = factor

    def run(self, rows):
        self._log(f"Multiplying amount by {self.factor}")
        out = []
        for r in rows:
            r2 = dict(r)
            r2["amount"] = r2["amount"] * self.factor
            out.append(r2)
        return out


def main() -> None:
    
    ### Setting arguments
    
    parser = argparse.ArgumentParser(
        prog="run_job.py",
        description="Run extraction pipeline."
        )
    
    parser.add_argument("--source-id", choices=["csv", "parquet", "sftp"], required=True, help="Metadata key for source extraction.") # Required argument
    parser.add_argument("--full-load", action="store_true", help="Specifies if full load will be executed.") # Boolean argument
    parser.add_argument("--job-id", required=True, help="Job ID for metadata retrieval.")
    parser.add_argument("--output-dir", default=".", help="Where output is being written.")
    
    parser.add_argument("--retries", type=int, default=3)#
    

    args = parser.parse_args()
    print("Parsed arguments:")
    print(f"  job_id = {args.job_id}")
    print(f"  source_id = {args.source_id}")
    print(f"  retries = {args.retries, type(args.retries)}")
    
    job_args = JobArgs(
        source_id=args.source_id,
        full_load=args.full_load,
        job_id=args.job_id,
        output_dir=Path(args.output_dir),
        retries=args.retries
    )
    
    print(job_args)
    
    ### Begin Job
    
    # example run

    pipeline = [
        FilterColumns(name="cols", keep_columns=["id", "amount"]),
        MultiplyAmount(name="mul", factor=2),
        AddConstantColumn(column_name="source", value="batch_2026_01_27"),
    ]

    rows = [{"id": 1, "amount": 10, "country": "MX"}, {"id": 2, "amount": 15, "country": "US"}]
    print(run_pipeline(pipeline, rows))

if __name__ == "__main__":
    main()

Overwriting run_job.py


In [26]:
! python run_job.py -h

usage: run_job.py [-h] --source-id SOURCE_ID [--full-load] --job-id JOB_ID
                  [--output-dir OUTPUT_DIR]

Run extraction pipeline.

options:
  -h, --help            show this help message and exit
  --source-id SOURCE_ID
                        Metadata key for source extraction.
  --full-load           Specifies if full load will be executed.
  --job-id JOB_ID       Job ID for metadata retrieval.
  --output-dir OUTPUT_DIR
                        Where output is being written.


In [50]:
! python run_job.py --job-id PS01 --source-id csv --full-load --retries 4

Parsed arguments:
  job_id = PS01
  source_id = csv
  retries = (4, <class 'int'>)
JobArgs(source_id='csv', full_load=True, job_id='PS01', output_dir=PosixPath('.'), retries=4)


In [None]:
step.run(job_args)

In [None]:
import sqlite3
from pathlib import Path

DB_PATH = 