# Python Classes and Objects

A **class** is a blueprint for creating objects. It defines attributes (data) and methods (functions) that the objects will have.

## Why Use Classes?
- Organize code into reusable structures
- Model real-world entities
- Bundle data and functionality together

## 1. Defining a Class

Use the `class` keyword to define a class.

In [None]:
# Define a simple DataSource class
class DataSource:
    pass

# Create an object of the class
source1 = DataSource()
print(source1)
print(type(source1))

## 2. Class Attributes

Attributes are variables that belong to a class.

In [None]:
class DatabaseConfig:
    # Class attributes - shared across all instances
    default_port = 5432
    default_schema = "public"
    timeout_seconds = 30

# Create config objects
postgres_config = DatabaseConfig()
redshift_config = DatabaseConfig()

# Access class attributes
print(f"Default Port: {postgres_config.default_port}")
print(f"Default Schema: {postgres_config.default_schema}")
print(f"Timeout: {redshift_config.timeout_seconds}s")

# Modify attributes for specific instance
redshift_config.default_port = 5439
redshift_config.default_schema = "analytics"

print(f"\nPostgres Port: {postgres_config.default_port}")
print(f"Redshift Port: {redshift_config.default_port}")

## 3. The `__init__()` Method (Constructor)

The `__init__()` method is called automatically when an object is created. It's used to initialize the object's attributes.

In [None]:
class DatabaseConnection:
    # Constructor
    def __init__(self, host, database, user, password, port=5432):
        self.host = host
        self.database = database
        self.user = user
        self.password = password
        self.port = port
        self.is_connected = False

# Create connection objects with initial values
prod_db = DatabaseConnection(
    host="prod-db.company.com",
    database="analytics",
    user="etl_user",
    password="secret123",
    port=5432
)

staging_db = DatabaseConnection(
    host="staging-db.company.com",
    database="staging",
    user="dev_user",
    password="dev123"
)

print(f"Production DB: {prod_db.host}:{prod_db.port}/{prod_db.database}")
print(f"Staging DB: {staging_db.host}:{staging_db.port}/{staging_db.database}")

## 4. Methods

Methods are functions defined inside a class that describe the behaviors of an object.

In [None]:
import json
from datetime import datetime

class DataExtractor:
    def __init__(self, source_name, source_type):
        self.source_name = source_name
        self.source_type = source_type
        self.records_extracted = 0
        self.last_extraction = None
    
    # Instance method
    def extract(self, query):
        """Simulate data extraction"""
        print(f"Extracting from {self.source_name} ({self.source_type})...")
        print(f"Query: {query}")
        
        # Simulated data
        data = [
            {"id": 1, "name": "Alice", "amount": 100.50},
            {"id": 2, "name": "Bob", "amount": 250.75},
            {"id": 3, "name": "Charlie", "amount": 180.25}
        ]
        
        self.records_extracted += len(data)
        self.last_extraction = datetime.now()
        print(f"Extracted {len(data)} records")
        return data
    
    # Method to get extraction stats
    def get_stats(self):
        return {
            "source": self.source_name,
            "type": self.source_type,
            "total_records": self.records_extracted,
            "last_run": str(self.last_extraction)
        }

# Create extractor and use methods
sales_extractor = DataExtractor("sales_db", "PostgreSQL")
data = sales_extractor.extract("SELECT * FROM transactions")

print(f"\nExtraction Stats:")
print(json.dumps(sales_extractor.get_stats(), indent=2))

## 5. Complete Example: ETL Pipeline Class

Let's create a more complete example with an ETL Pipeline class.

In [None]:
from datetime import datetime
import hashlib

class ETLPipeline:
    # Class attribute
    supported_formats = ["csv", "json", "parquet"]
    
    def __init__(self, pipeline_name, source_path, target_path):
        # Instance attributes
        self.pipeline_name = pipeline_name
        self.source_path = source_path
        self.target_path = target_path
        self.status = "initialized"
        self.start_time = None
        self.end_time = None
        self.records_processed = 0
        self.errors = []
    
    def extract(self):
        """Extract data from source"""
        print(f"[EXTRACT] Reading from: {self.source_path}")
        self.status = "extracting"
        self.start_time = datetime.now()
        
        # Simulated raw data
        raw_data = [
            {"customer_id": "C001", "order_date": "2024-01-15", "amount": "150.50", "status": "completed"},
            {"customer_id": "C002", "order_date": "2024-01-16", "amount": "invalid", "status": "pending"},
            {"customer_id": "C003", "order_date": "2024-01-17", "amount": "275.00", "status": "completed"},
            {"customer_id": None, "order_date": "2024-01-18", "amount": "100.00", "status": "completed"},
        ]
        print(f"[EXTRACT] Found {len(raw_data)} records")
        return raw_data
    
    def transform(self, raw_data):
        """Transform and clean data"""
        print(f"\n[TRANSFORM] Processing {len(raw_data)} records...")
        self.status = "transforming"
        
        transformed_data = []
        for record in raw_data:
            try:
                # Skip records with null customer_id
                if record["customer_id"] is None:
                    self.errors.append({"record": record, "error": "NULL customer_id"})
                    continue
                
                # Parse and validate amount
                try:
                    amount = float(record["amount"])
                except ValueError:
                    self.errors.append({"record": record, "error": "Invalid amount"})
                    continue
                
                # Create transformed record
                transformed_record = {
                    "customer_id": record["customer_id"],
                    "order_date": datetime.strptime(record["order_date"], "%Y-%m-%d"),
                    "amount": amount,
                    "status": record["status"].upper(),
                    "processed_at": datetime.now(),
                    "row_hash": hashlib.md5(str(record).encode()).hexdigest()[:8]
                }
                transformed_data.append(transformed_record)
                
            except Exception as e:
                self.errors.append({"record": record, "error": str(e)})
        
        print(f"[TRANSFORM] Successfully transformed {len(transformed_data)} records")
        print(f"[TRANSFORM] Errors: {len(self.errors)}")
        return transformed_data
    
    def load(self, transformed_data):
        """Load data to target"""
        print(f"\n[LOAD] Writing to: {self.target_path}")
        self.status = "loading"
        
        # Simulate loading
        for record in transformed_data:
            self.records_processed += 1
        
        self.status = "completed"
        self.end_time = datetime.now()
        print(f"[LOAD] Successfully loaded {self.records_processed} records")
    
    def run(self):
        """Execute full ETL pipeline"""
        print(f"\n{'='*50}")
        print(f"Starting Pipeline: {self.pipeline_name}")
        print(f"{'='*50}\n")
        
        try:
            raw_data = self.extract()
            transformed_data = self.transform(raw_data)
            self.load(transformed_data)
        except Exception as e:
            self.status = "failed"
            self.errors.append({"error": str(e)})
            print(f"[ERROR] Pipeline failed: {e}")
        
        return self.get_run_summary()
    
    def get_run_summary(self):
        """Return pipeline run summary"""
        duration = None
        if self.start_time and self.end_time:
            duration = (self.end_time - self.start_time).total_seconds()
        
        return {
            "pipeline_name": self.pipeline_name,
            "status": self.status,
            "records_processed": self.records_processed,
            "errors_count": len(self.errors),
            "duration_seconds": duration,
            "errors": self.errors
        }


# Run the ETL Pipeline
orders_pipeline = ETLPipeline(
    pipeline_name="daily_orders_etl",
    source_path="s3://raw-data/orders/",
    target_path="s3://warehouse/fact_orders/"
)

summary = orders_pipeline.run()

print(f"\n{'='*50}")
print("Pipeline Summary:")
print(f"{'='*50}")
for key, value in summary.items():
    if key != "errors":
        print(f"{key}: {value}")

## 6. Practice Exercise

Create a `DataValidator` class with:
- Attributes: `dataset_name`, `rules`, `validation_results`
- Methods: `add_rule()`, `validate()`, `get_report()`

In [None]:
# Solution
class DataValidator:
    def __init__(self, dataset_name):
        self.dataset_name = dataset_name
        self.rules = []
        self.validation_results = []
    
    def add_rule(self, column, rule_type, rule_value=None):
        """Add a validation rule"""
        rule = {
            "column": column,
            "rule_type": rule_type,
            "rule_value": rule_value
        }
        self.rules.append(rule)
        print(f"Added rule: {rule_type} on column '{column}'")
    
    def validate(self, data):
        """Validate data against all rules"""
        print(f"\nValidating {len(data)} records against {len(self.rules)} rules...")
        
        for rule in self.rules:
            column = rule["column"]
            rule_type = rule["rule_type"]
            rule_value = rule["rule_value"]
            
            passed = 0
            failed = 0
            
            for record in data:
                value = record.get(column)
                
                # Check different rule types
                if rule_type == "not_null":
                    if value is not None and value != "":
                        passed += 1
                    else:
                        failed += 1
                        
                elif rule_type == "min_value":
                    if value is not None and float(value) >= rule_value:
                        passed += 1
                    else:
                        failed += 1
                        
                elif rule_type == "max_length":
                    if value is None or len(str(value)) <= rule_value:
                        passed += 1
                    else:
                        failed += 1
                        
                elif rule_type == "in_list":
                    if value in rule_value:
                        passed += 1
                    else:
                        failed += 1
            
            result = {
                "column": column,
                "rule_type": rule_type,
                "passed": passed,
                "failed": failed,
                "pass_rate": round(passed / len(data) * 100, 2)
            }
            self.validation_results.append(result)
        
        print("Validation complete!")
    
    def get_report(self):
        """Generate validation report"""
        print(f"\n{'='*60}")
        print(f"Data Quality Report: {self.dataset_name}")
        print(f"{'='*60}")
        
        total_passed = 0
        total_failed = 0
        
        for result in self.validation_results:
            status = "PASS" if result["failed"] == 0 else "FAIL"
            print(f"\n{status} | {result['column']} ({result['rule_type']})")
            print(f"      Passed: {result['passed']} | Failed: {result['failed']} | Rate: {result['pass_rate']}%")
            total_passed += result["passed"]
            total_failed += result["failed"]
        
        overall_rate = round(total_passed / (total_passed + total_failed) * 100, 2)
        print(f"\n{'='*60}")
        print(f"Overall Quality Score: {overall_rate}%")
        print(f"{'='*60}")
        
        return {"passed": total_passed, "failed": total_failed, "score": overall_rate}


# Test the DataValidator
sample_data = [
    {"user_id": "U001", "email": "alice@example.com", "age": 28, "status": "active"},
    {"user_id": "U002", "email": None, "age": 35, "status": "active"},
    {"user_id": "U003", "email": "charlie@example.com", "age": -5, "status": "inactive"},
    {"user_id": None, "email": "david@example.com", "age": 42, "status": "pending"},
    {"user_id": "U005", "email": "eve@example.com", "age": 31, "status": "unknown"},
]

# Create validator and add rules
validator = DataValidator("user_profiles")
validator.add_rule("user_id", "not_null")
validator.add_rule("email", "not_null")
validator.add_rule("age", "min_value", 0)
validator.add_rule("status", "in_list", ["active", "inactive", "pending"])

# Run validation
validator.validate(sample_data)

# Get report
report = validator.get_report()

## Summary

| Concept | Description | Data Engineering Example |
|---------|-------------|-------------------------|
| `class` | Blueprint for creating objects | `ETLPipeline`, `DataValidator` |
| `object` | Instance of a class | `orders_pipeline`, `validator` |
| `self` | Reference to the current instance | Access pipeline attributes |
| `__init__` | Constructor method | Initialize connection params |
| Attributes | Variables belonging to an object | `source_path`, `records_processed` |
| Methods | Functions belonging to a class | `extract()`, `transform()`, `load()` |

## Bonus: Real-World Data Engineering Class Patterns

Common classes you'll build as a Data Engineer:
- **DatabaseConnection** - Manage database connections
- **ETLPipeline** - Orchestrate Extract-Transform-Load jobs
- **DataValidator** - Check data quality rules
- **SchemaManager** - Handle schema evolution
- **FileHandler** - Read/write various file formats
- **BatchProcessor** - Process large datasets in chunks
- **ConfigManager** - Manage pipeline configurations