# Week 2: Advanced Pandas with Redshift

## Learning Objectives
- Read and process large datasets from Redshift efficiently
- Master chunked processing with pandas
- Implement memory optimization techniques
- Optimize SQL queries for large tables
- Write data back to Redshift efficiently
- Use Dask for out-of-memory datasets
- Benchmark and optimize performance
- Apply production-ready patterns

## Prerequisites
```bash
pip install pandas numpy psycopg2-binary sqlalchemy boto3
pip install dask[complete] dask[dataframe] pyarrow fastparquet
pip install s3fs awscli memory_profiler
```

## 1. Setup and Configuration

In [None]:
# Standard library
import os
import sys
import logging
import time
from datetime import datetime, timedelta
from typing import Iterator, List, Dict, Optional
import gc
import warnings

# Data manipulation
import numpy as np
import pandas as pd
import dask
import dask.dataframe as dd
from dask.distributed import Client, progress

# Database
import psycopg2
from psycopg2.extras import execute_values
from sqlalchemy import create_engine, text
import boto3

# Utilities
from io import StringIO
import psutil

warnings.filterwarnings('ignore')

# Configure logging
logging.basicConfig(
    level=logging.INFO,
    format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
logger = logging.getLogger(__name__)

# Display settings
pd.set_option('display.max_columns', 50)
pd.set_option('display.max_rows', 100)
pd.set_option('display.float_format', '{:.2f}'.format)

print(f"Pandas version: {pd.__version__}")
print(f"Dask version: {dask.__version__}")
print(f"NumPy version: {np.__version__}")

## 2. Redshift Connection Manager

In [None]:
class RedshiftManager:
    """Production-ready Redshift connection and query manager."""
    
    def __init__(self, config: Dict[str, str]):
        self.config = config
        self.engine = None
        self.logger = logging.getLogger(self.__class__.__name__)
        self._create_engine()
    
    def _create_engine(self):
        """Create SQLAlchemy engine with connection pooling."""
        connection_string = (
            f"postgresql+psycopg2://{self.config['user']}:{self.config['password']}"
            f"@{self.config['host']}:{self.config.get('port', 5439)}/{self.config['database']}"
        )
        
        self.engine = create_engine(
            connection_string,
            pool_size=10,
            max_overflow=20,
            pool_pre_ping=True,
            pool_recycle=3600,
            connect_args={
                'connect_timeout': 10,
                'options': '-c statement_timeout=600000'  # 10 minutes
            }
        )
        self.logger.info("Redshift engine created")
    
    def read_query(self, query: str, chunksize: Optional[int] = None) -> pd.DataFrame:
        """Execute query and return results as DataFrame."""
        self.logger.info(f"Executing query (chunksize={chunksize})")
        start_time = time.time()
        
        try:
            if chunksize:
                return pd.read_sql(query, self.engine, chunksize=chunksize)
            else:
                df = pd.read_sql(query, self.engine)
                elapsed = time.time() - start_time
                self.logger.info(f"Query returned {len(df):,} rows in {elapsed:.2f}s")
                return df
        except Exception as e:
            self.logger.error(f"Query failed: {e}")
            raise
    
    def read_table_chunks(self, table: str, chunk_size: int = 100000,
                         columns: Optional[List[str]] = None,
                         where: Optional[str] = None) -> Iterator[pd.DataFrame]:
        """Read table in chunks for memory-efficient processing."""
        cols = ', '.join(columns) if columns else '*'
        where_clause = f"WHERE {where}" if where else ""
        
        query = f"SELECT {cols} FROM {table} {where_clause}"
        
        self.logger.info(f"Reading table {table} in chunks of {chunk_size:,}")
        
        for chunk_num, chunk in enumerate(pd.read_sql(query, self.engine, chunksize=chunk_size), 1):
            self.logger.debug(f"Processing chunk {chunk_num}: {len(chunk):,} rows")
            yield chunk
    
    def write_dataframe(self, df: pd.DataFrame, table: str, 
                       if_exists: str = 'append', method: str = 'multi',
                       chunksize: int = 10000):
        """Write DataFrame to Redshift table."""
        self.logger.info(f"Writing {len(df):,} rows to {table}")
        start_time = time.time()
        
        try:
            df.to_sql(
                table,
                self.engine,
                if_exists=if_exists,
                index=False,
                method=method,
                chunksize=chunksize
            )
            elapsed = time.time() - start_time
            self.logger.info(f"Write completed in {elapsed:.2f}s ({len(df)/elapsed:.0f} rows/s)")
        except Exception as e:
            self.logger.error(f"Write failed: {e}")
            raise
    
    def execute_query(self, query: str) -> None:
        """Execute DDL/DML query without returning results."""
        self.logger.info("Executing query")
        
        with self.engine.connect() as conn:
            conn.execute(text(query))
            conn.commit()
        
        self.logger.info("Query executed successfully")
    
    def get_table_stats(self, table: str) -> Dict:
        """Get statistics about a table."""
        query = f"""
        SELECT 
            COUNT(*) as row_count,
            pg_size_pretty(pg_total_relation_size('{table}')) as total_size,
            pg_size_pretty(pg_relation_size('{table}')) as table_size
        """
        
        return self.read_query(query).to_dict('records')[0]
    
    def analyze_query(self, query: str) -> pd.DataFrame:
        """Get EXPLAIN plan for a query."""
        explain_query = f"EXPLAIN {query}"
        return self.read_query(explain_query)
    
    def close(self):
        """Close database connections."""
        if self.engine:
            self.engine.dispose()
            self.logger.info("Engine disposed")


# Configuration
REDSHIFT_CONFIG = {
    'host': os.getenv('REDSHIFT_HOST', 'your-cluster.region.redshift.amazonaws.com'),
    'port': int(os.getenv('REDSHIFT_PORT', 5439)),
    'database': os.getenv('REDSHIFT_DB', 'marketing'),
    'user': os.getenv('REDSHIFT_USER', 'analyst'),
    'password': os.getenv('REDSHIFT_PASSWORD', 'your-password')
}

# Initialize manager
# rs = RedshiftManager(REDSHIFT_CONFIG)

print("✓ Redshift manager configured")

## 3. Memory-Optimized Pandas Operations

In [None]:
def optimize_dataframe_dtypes(df: pd.DataFrame) -> pd.DataFrame:
    """Optimize DataFrame memory usage by downcasting dtypes."""
    
    initial_memory = df.memory_usage(deep=True).sum() / 1024**2
    logger.info(f"Initial memory usage: {initial_memory:.2f} MB")
    
    # Optimize integer columns
    for col in df.select_dtypes(include=['int64']).columns:
        df[col] = pd.to_numeric(df[col], downcast='integer')
    
    # Optimize float columns
    for col in df.select_dtypes(include=['float64']).columns:
        df[col] = pd.to_numeric(df[col], downcast='float')
    
    # Convert object columns to category if beneficial
    for col in df.select_dtypes(include=['object']).columns:
        num_unique = df[col].nunique()
        num_total = len(df)
        
        if num_unique / num_total < 0.5:  # Less than 50% unique values
            df[col] = df[col].astype('category')
    
    final_memory = df.memory_usage(deep=True).sum() / 1024**2
    reduction = (1 - final_memory / initial_memory) * 100
    
    logger.info(f"Final memory usage: {final_memory:.2f} MB")
    logger.info(f"Memory reduction: {reduction:.1f}%")
    
    return df


def analyze_dataframe_memory(df: pd.DataFrame) -> pd.DataFrame:
    """Analyze memory usage by column."""
    
    memory_usage = df.memory_usage(deep=True)
    
    analysis = pd.DataFrame({
        'column': memory_usage.index,
        'dtype': [df[col].dtype if col != 'Index' else 'Index' for col in memory_usage.index],
        'memory_mb': memory_usage.values / 1024**2,
        'percent': memory_usage.values / memory_usage.sum() * 100
    }).sort_values('memory_mb', ascending=False)
    
    return analysis


# Example
sample_df = pd.DataFrame({
    'user_id': np.random.randint(1, 100000, 1000000),
    'campaign_id': np.random.randint(1, 1000, 1000000),
    'channel': np.random.choice(['email', 'social', 'search'], 1000000),
    'revenue': np.random.uniform(0, 1000, 1000000)
})

print("Before optimization:")
print(analyze_dataframe_memory(sample_df).head(10))
print()

optimized_df = optimize_dataframe_dtypes(sample_df)
print("\nAfter optimization:")
print(analyze_dataframe_memory(optimized_df).head(10))

## 4. Chunked Processing Patterns

In [None]:
class ChunkedProcessor:
    """Process large datasets in chunks."""
    
    def __init__(self, chunk_size: int = 100000):
        self.chunk_size = chunk_size
        self.logger = logging.getLogger(self.__class__.__name__)
    
    def aggregate_chunks(self, query: str, engine, 
                        agg_func: callable) -> pd.DataFrame:
        """Aggregate data from chunks."""
        
        self.logger.info("Starting chunked aggregation")
        results = []
        total_rows = 0
        
        for chunk_num, chunk in enumerate(pd.read_sql(query, engine, chunksize=self.chunk_size), 1):
            chunk_result = agg_func(chunk)
            results.append(chunk_result)
            total_rows += len(chunk)
            
            if chunk_num % 10 == 0:
                self.logger.info(f"Processed {total_rows:,} rows")
                gc.collect()
        
        # Combine results
        final_result = pd.concat(results).groupby(level=0).sum()
        
        self.logger.info(f"Aggregation complete: {total_rows:,} rows processed")
        return final_result
    
    def filter_and_transform(self, query: str, engine,
                            filter_func: callable,
                            transform_func: callable) -> pd.DataFrame:
        """Filter and transform data in chunks."""
        
        self.logger.info("Starting chunked filter and transform")
        results = []
        
        for chunk in pd.read_sql(query, engine, chunksize=self.chunk_size):
            # Filter
            filtered = chunk[filter_func(chunk)]
            
            if len(filtered) > 0:
                # Transform
                transformed = transform_func(filtered)
                results.append(transformed)
        
        if results:
            return pd.concat(results, ignore_index=True)
        else:
            return pd.DataFrame()
    
    def write_chunks_to_redshift(self, df: pd.DataFrame, table: str,
                                 engine, chunk_size: int = 10000):
        """Write large DataFrame to Redshift in chunks."""
        
        self.logger.info(f"Writing {len(df):,} rows in chunks of {chunk_size:,}")
        
        total_chunks = (len(df) - 1) // chunk_size + 1
        
        for i in range(total_chunks):
            start_idx = i * chunk_size
            end_idx = min((i + 1) * chunk_size, len(df))
            chunk = df.iloc[start_idx:end_idx]
            
            if_exists = 'replace' if i == 0 else 'append'
            
            chunk.to_sql(
                table,
                engine,
                if_exists=if_exists,
                index=False,
                method='multi'
            )
            
            if (i + 1) % 10 == 0:
                self.logger.info(f"Written {end_idx:,} rows")
        
        self.logger.info("Write complete")


# Example usage
processor = ChunkedProcessor(chunk_size=100000)

# Define aggregation function
def campaign_aggregation(chunk):
    return chunk.groupby('campaign_id').agg({
        'revenue': 'sum',
        'user_id': 'nunique',
        'event_id': 'count'
    })

# Execute chunked aggregation
# query = "SELECT * FROM marketing_events WHERE date >= '2024-01-01'"
# results = processor.aggregate_chunks(query, rs.engine, campaign_aggregation)

print("✓ Chunked processor defined")

## 5. Query Optimization for Large Tables

In [None]:
class QueryOptimizer:
    """Optimize queries for Redshift."""
    
    @staticmethod
    def add_limit_offset_pagination(base_query: str, page_size: int, page: int) -> str:
        """Add pagination to query."""
        offset = page * page_size
        return f"{base_query} LIMIT {page_size} OFFSET {offset}"
    
    @staticmethod
    def optimize_column_selection(table: str, required_columns: List[str]) -> str:
        """Select only required columns."""
        cols = ', '.join(required_columns)
        return f"SELECT {cols} FROM {table}"
    
    @staticmethod
    def add_date_partition(query: str, date_column: str,
                          start_date: str, end_date: str) -> str:
        """Add date partitioning for efficient scanning."""
        return f"""
        {query}
        WHERE {date_column} >= '{start_date}'
          AND {date_column} < '{end_date}'
        """
    
    @staticmethod
    def create_incremental_load_query(table: str, last_updated_column: str,
                                     last_loaded_timestamp: str) -> str:
        """Create query for incremental loading."""
        return f"""
        SELECT *
        FROM {table}
        WHERE {last_updated_column} > '{last_loaded_timestamp}'
        ORDER BY {last_updated_column}
        """
    
    @staticmethod
    def create_sampling_query(table: str, sample_rate: float) -> str:
        """Create query to sample data."""
        return f"""
        SELECT *
        FROM {table}
        WHERE RANDOM() < {sample_rate}
        """


# Example optimization
optimizer = QueryOptimizer()

# Instead of SELECT *
inefficient_query = "SELECT * FROM marketing_events"

# Use column selection
efficient_query = optimizer.optimize_column_selection(
    'marketing_events',
    ['event_id', 'user_id', 'campaign_id', 'revenue', 'timestamp']
)

# Add date partition
partitioned_query = optimizer.add_date_partition(
    efficient_query,
    'timestamp',
    '2024-01-01',
    '2024-02-01'
)

print("Optimized query:")
print(partitioned_query)

## 6. Dask for Out-of-Memory Processing

In [None]:
# Initialize Dask client
# client = Client(n_workers=4, threads_per_worker=2, memory_limit='4GB')
# print(client)


def process_large_dataset_with_dask(file_pattern: str) -> dd.DataFrame:
    """Process large CSV files with Dask."""
    
    logger.info(f"Loading data from {file_pattern}")
    
    # Read CSV files (lazy loading)
    ddf = dd.read_csv(
        file_pattern,
        blocksize='64MB',  # Size of each partition
        dtype={
            'user_id': 'int32',
            'campaign_id': 'int16',
            'revenue': 'float32'
        },
        parse_dates=['timestamp']
    )
    
    logger.info(f"Partitions: {ddf.npartitions}")
    
    return ddf


def dask_aggregation_example(ddf: dd.DataFrame) -> pd.DataFrame:
    """Perform aggregations with Dask."""
    
    logger.info("Computing aggregations")
    
    # Perform aggregations (lazy)
    result = ddf.groupby('campaign_id').agg({
        'revenue': ['sum', 'mean', 'count'],
        'user_id': 'nunique'
    })
    
    # Compute (triggers execution)
    result_df = result.compute()
    
    logger.info("Aggregation complete")
    
    return result_df


def dask_to_redshift(ddf: dd.DataFrame, table: str, engine,
                    chunksize: int = 10000):
    """Write Dask DataFrame to Redshift."""
    
    logger.info(f"Writing Dask DataFrame to {table}")
    
    # Convert to pandas in chunks and write
    for i, partition in enumerate(ddf.to_delayed()):
        df_partition = partition.compute()
        
        if_exists = 'replace' if i == 0 else 'append'
        
        df_partition.to_sql(
            table,
            engine,
            if_exists=if_exists,
            index=False,
            method='multi',
            chunksize=chunksize
        )
        
        logger.info(f"Written partition {i + 1}")


# Example: Process large file with Dask
# ddf = process_large_dataset_with_dask('data/marketing_*.csv')
# result = dask_aggregation_example(ddf)

print("✓ Dask processing functions defined")

## 7. Performance Benchmarking

In [None]:
import time
from contextlib import contextmanager


@contextmanager
def timer(description: str):
    """Context manager for timing operations."""
    start = time.time()
    yield
    elapsed = time.time() - start
    logger.info(f"{description}: {elapsed:.2f}s")


class PerformanceBenchmark:
    """Benchmark different approaches."""
    
    def __init__(self, engine):
        self.engine = engine
        self.results = []
    
    def benchmark_read_methods(self, query: str, n_rows: int):
        """Compare different read methods."""
        
        # Method 1: Full load
        with timer("Full load"):
            df1 = pd.read_sql(query, self.engine)
            mem1 = df1.memory_usage(deep=True).sum() / 1024**2
        
        self.results.append({
            'method': 'full_load',
            'rows': len(df1),
            'memory_mb': mem1
        })
        
        # Method 2: Chunked processing
        chunks_processed = 0
        with timer("Chunked processing"):
            for chunk in pd.read_sql(query, self.engine, chunksize=10000):
                chunks_processed += len(chunk)
        
        self.results.append({
            'method': 'chunked',
            'rows': chunks_processed,
            'memory_mb': 'streaming'
        })
        
        # Method 3: Column optimization
        with timer("Optimized dtypes"):
            df3 = pd.read_sql(query, self.engine)
            df3 = optimize_dataframe_dtypes(df3)
            mem3 = df3.memory_usage(deep=True).sum() / 1024**2
        
        self.results.append({
            'method': 'optimized_dtypes',
            'rows': len(df3),
            'memory_mb': mem3
        })
        
        return pd.DataFrame(self.results)
    
    def benchmark_aggregations(self, df: pd.DataFrame):
        """Benchmark different aggregation methods."""
        
        results = []
        
        # Method 1: Standard groupby
        with timer("Standard groupby"):
            result1 = df.groupby('campaign_id')['revenue'].sum()
        
        # Method 2: NumPy for simple aggregations
        with timer("NumPy-based"):
            unique_campaigns = df['campaign_id'].unique()
            result2 = {}
            for cid in unique_campaigns:
                result2[cid] = df[df['campaign_id'] == cid]['revenue'].sum()
        
        # Method 3: Dask (if very large)
        # with timer("Dask groupby"):
        #     ddf = dd.from_pandas(df, npartitions=4)
        #     result3 = ddf.groupby('campaign_id')['revenue'].sum().compute()
        
        return results


# Example benchmark
# benchmark = PerformanceBenchmark(rs.engine)
# query = "SELECT * FROM marketing_events LIMIT 100000"
# results = benchmark.benchmark_read_methods(query, 100000)
# print(results)

print("✓ Benchmarking tools defined")

## 8. Production Patterns

In [None]:
class ProductionDataPipeline:
    """Production-ready data pipeline."""
    
    def __init__(self, rs_manager: RedshiftManager):
        self.rs = rs_manager
        self.logger = logging.getLogger(self.__class__.__name__)
        self.metrics = {}
    
    def extract_incremental(self, table: str, timestamp_col: str,
                          last_loaded: str) -> pd.DataFrame:
        """Extract data incrementally."""
        
        query = f"""
        SELECT *
        FROM {table}
        WHERE {timestamp_col} > '{last_loaded}'
        ORDER BY {timestamp_col}
        """
        
        self.logger.info(f"Extracting incremental data since {last_loaded}")
        
        df = self.rs.read_query(query)
        
        self.metrics['extract_rows'] = len(df)
        self.metrics['extract_time'] = datetime.now()
        
        return df
    
    def transform(self, df: pd.DataFrame) -> pd.DataFrame:
        """Apply transformations."""
        
        self.logger.info("Applying transformations")
        
        # Optimize dtypes
        df = optimize_dataframe_dtypes(df)
        
        # Data quality checks
        initial_rows = len(df)
        df = df.dropna(subset=['user_id', 'timestamp'])
        df = df[df['revenue'] >= 0]
        
        rows_removed = initial_rows - len(df)
        if rows_removed > 0:
            self.logger.warning(f"Removed {rows_removed} invalid rows")
        
        # Add derived columns
        df['date'] = pd.to_datetime(df['timestamp']).dt.date
        df['hour'] = pd.to_datetime(df['timestamp']).dt.hour
        
        self.metrics['transform_rows'] = len(df)
        
        return df
    
    def load(self, df: pd.DataFrame, target_table: str,
            load_type: str = 'append'):
        """Load data to Redshift."""
        
        self.logger.info(f"Loading {len(df):,} rows to {target_table}")
        
        start_time = time.time()
        
        self.rs.write_dataframe(
            df,
            target_table,
            if_exists=load_type,
            chunksize=10000
        )
        
        self.metrics['load_rows'] = len(df)
        self.metrics['load_time'] = time.time() - start_time
        
        self.logger.info(f"Load completed in {self.metrics['load_time']:.2f}s")
    
    def run_pipeline(self, source_table: str, target_table: str,
                    timestamp_col: str, last_loaded: str):
        """Run complete ETL pipeline."""
        
        self.logger.info("Starting ETL pipeline")
        pipeline_start = time.time()
        
        try:
            # Extract
            df = self.extract_incremental(source_table, timestamp_col, last_loaded)
            
            if len(df) == 0:
                self.logger.info("No new data to process")
                return
            
            # Transform
            df = self.transform(df)
            
            # Load
            self.load(df, target_table)
            
            # Update metrics
            self.metrics['pipeline_time'] = time.time() - pipeline_start
            self.metrics['status'] = 'success'
            
            self.logger.info(f"Pipeline completed in {self.metrics['pipeline_time']:.2f}s")
            self.logger.info(f"Metrics: {self.metrics}")
            
        except Exception as e:
            self.logger.error(f"Pipeline failed: {e}")
            self.metrics['status'] = 'failed'
            self.metrics['error'] = str(e)
            raise


# Example usage
# pipeline = ProductionDataPipeline(rs)
# pipeline.run_pipeline(
#     source_table='raw_events',
#     target_table='processed_events',
#     timestamp_col='created_at',
#     last_loaded='2024-01-01 00:00:00'
# )

print("✓ Production pipeline defined")

## 9. Real-World Project: Analyze 100M Row Marketing Dataset

### Project: Multi-Channel Attribution Analysis

In [None]:
class MarketingAttributionAnalyzer:
    """Analyze large-scale marketing attribution data."""
    
    def __init__(self, rs_manager: RedshiftManager):
        self.rs = rs_manager
        self.logger = logging.getLogger(self.__class__.__name__)
    
    def get_user_journeys(self, start_date: str, end_date: str,
                         chunk_size: int = 100000) -> pd.DataFrame:
        """Extract user journeys using chunked processing."""
        
        query = f"""
        SELECT 
            user_id,
            campaign_id,
            channel,
            event_type,
            revenue,
            timestamp
        FROM marketing_events
        WHERE date >= '{start_date}'
          AND date < '{end_date}'
        ORDER BY user_id, timestamp
        """
        
        self.logger.info(f"Extracting journeys for {start_date} to {end_date}")
        
        # Process in chunks
        journey_stats = []
        
        for chunk in pd.read_sql(query, self.rs.engine, chunksize=chunk_size):
            # Analyze chunk
            chunk_stats = self._analyze_journey_chunk(chunk)
            journey_stats.append(chunk_stats)
        
        # Combine results
        final_stats = pd.concat(journey_stats).groupby('channel').sum()
        
        return final_stats
    
    def _analyze_journey_chunk(self, chunk: pd.DataFrame) -> pd.DataFrame:
        """Analyze a chunk of journey data."""
        
        # Calculate metrics by channel
        stats = chunk.groupby('channel').agg({
            'user_id': 'nunique',
            'event_type': 'count',
            'revenue': 'sum'
        }).rename(columns={
            'user_id': 'unique_users',
            'event_type': 'total_events',
            'revenue': 'total_revenue'
        })
        
        # Count conversions
        conversions = chunk[chunk['event_type'] == 'conversion'].groupby('channel').size()
        stats['conversions'] = conversions
        stats['conversions'] = stats['conversions'].fillna(0)
        
        return stats
    
    def calculate_attribution(self, start_date: str, end_date: str,
                            model: str = 'last_touch') -> pd.DataFrame:
        """Calculate attribution using specified model."""
        
        self.logger.info(f"Calculating {model} attribution")
        
        if model == 'last_touch':
            return self._last_touch_attribution(start_date, end_date)
        elif model == 'first_touch':
            return self._first_touch_attribution(start_date, end_date)
        elif model == 'linear':
            return self._linear_attribution(start_date, end_date)
        else:
            raise ValueError(f"Unknown model: {model}")
    
    def _last_touch_attribution(self, start_date: str, end_date: str) -> pd.DataFrame:
        """Last-touch attribution model."""
        
        query = f"""
        WITH conversions AS (
            SELECT 
                user_id,
                revenue,
                timestamp as conversion_time
            FROM marketing_events
            WHERE event_type = 'conversion'
              AND date >= '{start_date}'
              AND date < '{end_date}'
        ),
        last_touch AS (
            SELECT DISTINCT
                e.user_id,
                LAST_VALUE(e.channel) OVER (
                    PARTITION BY e.user_id 
                    ORDER BY e.timestamp
                    ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING
                ) as last_channel,
                c.revenue
            FROM marketing_events e
            INNER JOIN conversions c ON e.user_id = c.user_id
            WHERE e.timestamp <= c.conversion_time
        )
        SELECT 
            last_channel as channel,
            COUNT(*) as attributed_conversions,
            SUM(revenue) as attributed_revenue
        FROM last_touch
        GROUP BY last_channel
        """
        
        return self.rs.read_query(query)
    
    def generate_performance_report(self, start_date: str, end_date: str) -> Dict:
        """Generate comprehensive performance report."""
        
        self.logger.info("Generating performance report")
        
        # Overall metrics
        overall_query = f"""
        SELECT 
            COUNT(*) as total_events,
            COUNT(DISTINCT user_id) as unique_users,
            SUM(CASE WHEN event_type = 'conversion' THEN 1 ELSE 0 END) as conversions,
            SUM(revenue) as total_revenue,
            AVG(revenue) as avg_revenue
        FROM marketing_events
        WHERE date >= '{start_date}'
          AND date < '{end_date}'
        """
        
        overall = self.rs.read_query(overall_query).to_dict('records')[0]
        
        # Channel performance
        channel_query = f"""
        SELECT 
            channel,
            COUNT(*) as events,
            COUNT(DISTINCT user_id) as users,
            SUM(CASE WHEN event_type = 'conversion' THEN 1 ELSE 0 END) as conversions,
            SUM(revenue) as revenue
        FROM marketing_events
        WHERE date >= '{start_date}'
          AND date < '{end_date}'
        GROUP BY channel
        """
        
        channel_perf = self.rs.read_query(channel_query)
        
        # Attribution
        attribution = self.calculate_attribution(start_date, end_date, 'last_touch')
        
        return {
            'overall_metrics': overall,
            'channel_performance': channel_perf,
            'attribution': attribution,
            'date_range': {'start': start_date, 'end': end_date}
        }


# Example usage
# analyzer = MarketingAttributionAnalyzer(rs)
# report = analyzer.generate_performance_report('2024-01-01', '2024-02-01')
# print(report['overall_metrics'])
# print(report['channel_performance'])

print("✓ Attribution analyzer defined")

## 10. Best Practices Summary

### Memory Optimization
1. Use appropriate dtypes (downcast integers and floats)
2. Convert low-cardinality objects to categories
3. Process data in chunks for large datasets
4. Use Dask for truly out-of-memory datasets
5. Monitor memory usage with `memory_profiler`

### Query Optimization
1. Select only required columns (avoid SELECT *)
2. Use date partitioning for large tables
3. Implement incremental loading
4. Use EXPLAIN to analyze query plans
5. Leverage distribution and sort keys

### Redshift Best Practices
1. Use connection pooling
2. Read data in chunks with `chunksize`
3. Use COPY command for bulk loads
4. Vacuum and analyze tables regularly
5. Monitor query performance with system tables

### Performance
1. Benchmark different approaches
2. Use vectorized operations
3. Avoid loops when possible
4. Use appropriate aggregation methods
5. Profile code to identify bottlenecks

## 11. Exercises

### Exercise 1: Memory Optimization
Download the "Online Retail" dataset from Kaggle and:
1. Load the data and measure initial memory usage
2. Optimize dtypes to reduce memory by at least 50%
3. Compare processing time before and after optimization

### Exercise 2: Chunked Processing
1. Create a large CSV file (10M+ rows)
2. Process it in chunks to calculate aggregated metrics
3. Compare memory usage with full-load approach
4. Measure processing time and throughput

### Exercise 3: Dask Integration
1. Load a dataset too large for memory using Dask
2. Perform complex aggregations
3. Write results back to disk in partitioned format
4. Compare performance with pandas

### Exercise 4: Production Pipeline
Build an end-to-end ETL pipeline that:
1. Extracts data incrementally from Redshift
2. Applies transformations and data quality checks
3. Loads results back to Redshift
4. Logs metrics and handles errors gracefully
5. Runs on a schedule

## Resources

### Documentation
- [Pandas Performance](https://pandas.pydata.org/docs/user_guide/enhancingperf.html)
- [Dask DataFrame](https://docs.dask.org/en/latest/dataframe.html)
- [Redshift Best Practices](https://docs.aws.amazon.com/redshift/latest/dg/best-practices.html)
- [SQLAlchemy Core](https://docs.sqlalchemy.org/en/14/core/)

### Kaggle Datasets
- Online Retail Dataset
- Marketing Analytics
- E-commerce Data
- Customer Transactions

### Tools
- memory_profiler: Memory usage profiling
- Dask: Parallel computing library
- AWS Redshift: Cloud data warehouse
- psycopg2: PostgreSQL adapter