Connections to local SQL Express

Intially this session was slated for using pyodbc but the intial execution showed a warning: "UserWarning: pandas only supports SQLAlchemy connectable (engine/connection) or database string URI or sqlite3 DBAPI2 connection. Other DBAPI2 objects are not tested. Please consider using SQLAlchemy. "  Adjusted to move forward with SQLAlchemy instead.

- Old (pyodbc direct): Caused warnings
    - "DRIVER={ODBC Driver 17 for SQL Server};SERVER=server;DATABASE=db;Trusted_Connection=yes;"

- New (SQLAlchemy): No warnings, best practice
    - "mssql+pyodbc://@server/database?driver=ODBC+Driver+17+for+SQL+Server&trusted_connection=yes"

Updated code -  The first section is to test the connection:

In [1]:
from sqlalchemy import create_engine, text
import pandas as pd
import urllib.parse

class SQLServerConnection:
    """SQL Server database connection using SQLAlchemy (recommended)"""
    
    def __init__(self, server, database, username=None, password=None, trusted_connection=True):
        self.server = server
        self.database = database
        self.username = username
        self.password = password
        self.trusted_connection = trusted_connection
        self.engine = None
    
    def get_connection_string(self):
        """Build SQLAlchemy connection string"""
        if self.trusted_connection:
            # Windows Authentication
            connection_string = f"mssql+pyodbc://@{self.server}/{self.database}?driver=ODBC+Driver+17+for+SQL+Server&trusted_connection=yes"
        else:
            # SQL Server Authentication
            encoded_password = urllib.parse.quote_plus(self.password)
            connection_string = f"mssql+pyodbc://{self.username}:{encoded_password}@{self.server}/{self.database}?driver=ODBC+Driver+17+for+SQL+Server"
        
        return connection_string
    
    def connect(self):
        """Create SQLAlchemy engine"""
        try:
            connection_string = self.get_connection_string()
            self.engine = create_engine(connection_string)
            
            # Test the connection
            with self.engine.connect() as conn:
                conn.execute(text("SELECT 1"))
            
            print(f"Connected to SQL Server: {self.server}")
            return True
        except Exception as e:
            print(f"SQL Server connection error: {e}")
            return False
    
    def execute_query(self, query, params=None):
        """Execute query and return DataFrame - no more pandas pyodbc warnings!"""
        if not self.engine:
            print("No database connection")
            return None
        
        try:
            if params:
                df = pd.read_sql_query(text(query), self.engine, params=params)
            else:
                df = pd.read_sql_query(text(query), self.engine)
            return df
        except Exception as e:
            print(f"Query execution error: {e}")
            return None
    
    def close(self):
        """Close database connection"""
        if self.engine:
            self.engine.dispose()
            print("SQL Server connection closed")

# Example connection (adjust for the environment)
sql_server = SQLServerConnection(
    server='localhost\\SQLEXPRESS',  # Or other server name
    database='master',  # Replace with your database name
    trusted_connection=True  # Uses Windows Authentication
)


# Or with SQL Server Authentication:
# sql_server = SQLServerConnection(
#     server='your-server.database.windows.net',
#     database='BusinessDB',
#     username='your-username',
#     password='your-password',
#     trusted_connection=False
# )

if sql_server.connect():
    # Test with a simple query first
    test_query = "SELECT @@VERSION as sql_server_version"
    result = sql_server.execute_query(test_query)
    if result is not None:
        print("SQL Server Version:")
        print(result)
    
    sql_server.close()



Connected to SQL Server: localhost\SQLEXPRESS
SQL Server Version:
                                  sql_server_version
0  Microsoft SQL Server 2022 (RTM-GDR) (KB5058712...
SQL Server connection closed


Now pulling data from SQL Express:

In [2]:
def setup_business_database():
    """Connect to the database and run queries using SQLAlchemy"""
    
    try:
        # Create SQLAlchemy engine for SQL Server with Windows Authentication
        server = 'localhost\\SQLEXPRESS'
        database = 'WideWorldImporters'
        
        # Create connection string for Windows Authentication
        connection_string = f"mssql+pyodbc://@{server}/{database}?driver=ODBC+Driver+17+for+SQL+Server&trusted_connection=yes"
        engine = create_engine(connection_string)
        
        # Test connection and execute query
        with engine.connect() as connection:
            print("Connected to BusinessDB successfully!")
            
            # SQL query to get top customers
            query = """
            SELECT TOP 10
                c.CustomerID,
                c.CustomerName,
                c.DeliveryPostalCode,
                COUNT(t.InvoiceID) as TotalInvoices,
                SUM(t.TransactionAmount) as TotalRevenue
            FROM Sales.Customers c
            LEFT JOIN Sales.CustomerTransactions t ON c.CustomerID = t.CustomerID
            GROUP BY c.CustomerID, c.CustomerName, c.DeliveryPostalCode
            ORDER BY SUM(t.TransactionAmount) DESC
            """
            
            # Execute query using pandas (eliminates the warning!)
            results = pd.read_sql(query, engine)
            
            if not results.empty:
                print("Top Customers:")
                print(results)
                return results
            else:
                print("No results found.")
                return None
                
    except Exception as e:
        print(f"Database connection or query failed: {e}")
        return None
    
# Call the business database function
def run_business_analysis():
    """Run the business database analysis"""
    results = setup_business_database()
    if results is not None:
        print(f"\nAnalysis complete! Found {len(results)} top customers.")
    else:
        print("Analysis failed or returned no results.")

# Run the function to get results
# results = setup_business_database()
# Or use the wrapper function to get results with a message
run_business_analysis()


Connected to BusinessDB successfully!
Top Customers:
   CustomerID                 CustomerName DeliveryPostalCode  TotalInvoices  \
0         401   Wingtip Toys (Head Office)              90625          22165   
1           1  Tailspin Toys (Head Office)              90410          22316   
2         995                Kumar Naicker              90515            112   
3         953                     Linh Dao              90147            130   
4         840           Rodrigo Figueiredo              90331            105   
5         813                 Shyam Poddar              90695            113   
6         835             Dominic Davignon              90192            116   
7         900                   Lilli Sokk              90357            125   
8         915                   Ana Florea              90140            109   
9         911                  Liidia Lepp              90137            112   

   TotalRevenue  
0      97053.58  
1      56435.84  
2      13212

Advanced Analytics with SQL Server

In [None]:
def sql_server_advanced_analytics():
    """Perform advanced analytics on SQL Server data"""
    
    sql_server = SQLServerConnection(
        server='localhost\\SQLEXPRESS',
        database='WideWorldImporters',  
        trusted_connection=True
    )
    
    if sql_server.connect():
        # Complex analytics query using SQL Server features
        analytics_query = """
        WITH CustomerMetrics AS (
            SELECT 
                    c.CustomerID,
                    c.CustomerName,
                    OrderAmount,
                    COUNT(o.OrderID) as TotalOrders,
                    SUM(sq1.OrderAmount) as TotalRevenue,
                    AVG(sq1.OrderAmount) as AvgOrderValue,
                    MIN(o.OrderDate) as FirstOrderDate,
                    MAX(o.OrderDate) as LastOrderDate,
                    DATEDIFF(day, MIN(o.OrderDate), MAX(o.OrderDate)) as CustomerLifespanDays
            FROM Sales.Customers c
            LEFT JOIN Sales.Orders o ON c.CustomerID = o.CustomerID
            LEFT JOIN ( Select OrderID, (PickedQuantity * UnitPrice) as OrderAmount FROM Sales.OrderLines) sq1 on sq1.OrderID = o.OrderID
            GROUP BY c.CustomerID, c.CustomerName, OrderAmount
        ),
        CustomerSegments AS (
            SELECT 
                *,
                CASE 
                    WHEN TotalRevenue > 2000 THEN 'Premium'
                    WHEN TotalRevenue > 1000 THEN 'Standard'
                    ELSE 'Basic'
                END as CustomerSegment,
                CASE 
                    WHEN CustomerLifespanDays > 0 
                    THEN TotalRevenue / CustomerLifespanDays * 365 
                    ELSE TotalRevenue 
                END as EstimatedAnnualValue
            FROM CustomerMetrics
        )
        SELECT 
            CustomerSegment,
            COUNT(*) as CustomerCount,
            AVG(TotalRevenue) as AvgRevenue,
            AVG(EstimatedAnnualValue) as AvgAnnualValue,
            SUM(TotalRevenue) as SegmentRevenue
        FROM CustomerSegments
        GROUP BY CustomerSegment
        ORDER BY AvgRevenue DESC
        """
        
        results = sql_server.execute_query(analytics_query)
        if results is not None:
            print("Customer Segment Analysis:")
            print(results)
            
            # Enhanced analysis with pandas
            results['revenue_percentage'] = (results['SegmentRevenue'] / results['SegmentRevenue'].sum()) * 100
            print("\nRevenue Distribution by Segment:")
            print(results[['CustomerSegment', 'revenue_percentage']])
        
        sql_server.close()
        return results
    
    return None

# Run advanced analytics
analytics_results = sql_server_advanced_analytics()

Handling Large Datasets with Chunking

In [None]:
def process_large_sql_server_dataset(query, chunk_size=10000):
    """Process large SQL Server datasets efficiently"""
    
    sql_server = SQLServerConnection(
        server='localhost\\SQLEXPRESS',
        database='WideWorldImporters',
        trusted_connection=True
    )
    
    if sql_server.connect():
        try:
            # Use chunking for large datasets
            chunk_iterator = pd.read_sql_query(
                text(query),
                sql_server.engine,
                chunksize=chunk_size
            )
            
            processed_data = []
            
            for chunk_num, chunk_df in enumerate(chunk_iterator):
                print(f"Processing chunk {chunk_num + 1}, size: {len(chunk_df)}")
                
                # Process each chunk
                chunk_summary = {
                    'chunk_number': chunk_num + 1,
                    'row_count': len(chunk_df),
                    'total_amount': chunk_df.get('OrderAmount', pd.Series([0])).sum(),
                    'avg_amount': chunk_df.get('OrderAmount', pd.Series([0])).mean()
                }
                
                processed_data.append(chunk_summary)
            
            sql_server.close()
            
            # Combine results
            summary_df = pd.DataFrame(processed_data)
            return summary_df
            
        except Exception as e:
            print(f"Chunk processing error: {e}")
            sql_server.close()
            return None
    
    return None

# Example usage for large datasets
large_query = """
SELECT 
    CustomerID,
    OrderDate,
    SalespersonPersonID
FROM Sales.Orders 
WHERE OrderDate >= '2020-01-01'
ORDER BY OrderDate
"""

# Process in chunks
chunk_summary = process_large_sql_server_dataset(large_query, 10000)

When I asked Claude.ai how to handle aggregates on chunked data, I got the following notes.  Adding here for reference.

# Aggregating Chunk Summaries

When processing large datasets with chunking, you often end up with **chunk-level summaries** rather than the raw data. To get meaningful insights about your entire dataset, you need to properly aggregate these chunk summaries.

## Understanding Chunk Summaries

After chunking, your `summary_df` contains statistics for each individual chunk:

```python
# Example chunk summary output
chunk_number  row_count  total_amount  avg_amount
1             10000      50000.00      5.00
2             10000      75000.00      7.50
3             8500       42500.00      5.00
```

**Important**: These are chunk-level metrics, not overall dataset metrics.

## Basic Aggregation Functions

### Simple Aggregates Function

```python
def get_overall_aggregates(summary_df):
    """Calculate overall aggregates from chunk summaries"""
    
    # Total rows across all chunks
    total_rows = summary_df['row_count'].sum()
    
    # Total amount across all chunks
    total_amount = summary_df['total_amount'].sum()
    
    # Overall average (weighted by chunk sizes)
    overall_avg = total_amount / total_rows
    
    return {
        'total_rows': total_rows,
        'total_amount': total_amount,
        'overall_average': overall_avg,
        'chunks_processed': len(summary_df)
    }
```

### Usage Example

```python
# Process your large dataset
chunk_summary = process_large_sql_server_dataset(large_query, 1000)

if chunk_summary is not None:
    # Get overall aggregates
    aggregates = get_overall_aggregates(chunk_summary)
    
    # Display results
    print(f"Total rows: {aggregates['total_rows']:,}")
    print(f"Total amount: ${aggregates['total_amount']:,.2f}")
    print(f"Overall average: ${aggregates['overall_average']:.2f}")
    print(f"Chunks processed: {aggregates['chunks_processed']}")
```

## Enhanced Aggregation with Detailed Statistics

### Enhanced Processing Function

```python
def process_large_sql_server_dataset_enhanced(query, chunk_size=10000):
    """Enhanced version with more detailed chunk tracking"""
    
    sql_server = SQLServerConnection(
        server='localhost\\SQLEXPRESS',
        database='BusinessTestDB',
        trusted_connection=True
    )
    
    if sql_server.connect():
        try:
            chunk_iterator = pd.read_sql_query(
                text(query),
                sql_server.engine,
                chunksize=chunk_size
            )
            
            processed_data = []
            running_totals = {'rows': 0, 'amount': 0}
            
            for chunk_num, chunk_df in enumerate(chunk_iterator):
                print(f"Processing chunk {chunk_num + 1}, size: {len(chunk_df)}")
                
                # Detailed chunk analysis
                order_amounts = chunk_df.get('OrderAmount', pd.Series([0]))
                
                chunk_summary = {
                    'chunk_number': chunk_num + 1,
                    'row_count': len(chunk_df),
                    'total_amount': order_amounts.sum(),
                    'avg_amount': order_amounts.mean(),
                    'min_amount': order_amounts.min(),
                    'max_amount': order_amounts.max(),
                    'std_amount': order_amounts.std()
                }
                
                # Update running totals
                running_totals['rows'] += len(chunk_df)
                running_totals['amount'] += order_amounts.sum()
                
                processed_data.append(chunk_summary)
            
            sql_server.close()
            
            # Create summary DataFrame
            summary_df = pd.DataFrame(processed_data)
            
            # Calculate overall aggregates
            overall_stats = {
                'total_rows': running_totals['rows'],
                'total_amount': running_totals['amount'],
                'overall_average': running_totals['amount'] / running_totals['rows'] if running_totals['rows'] > 0 else 0,
                'chunks_processed': len(summary_df),
                'global_min': summary_df['min_amount'].min(),
                'global_max': summary_df['max_amount'].max()
            }
            
            return summary_df, overall_stats
            
        except Exception as e:
            print(f"Chunk processing error: {e}")
            sql_server.close()
            return None, None
    
    return None, None
```

### Enhanced Usage Example

```python
# Process with enhanced statistics
chunk_summary, overall_stats = process_large_sql_server_dataset_enhanced(large_query, 1000)

if chunk_summary is not None:
    print("=== OVERALL STATISTICS ===")
    print(f"Total rows processed: {overall_stats['total_rows']:,}")
    print(f"Total amount: ${overall_stats['total_amount']:,.2f}")
    print(f"Overall average: ${overall_stats['overall_average']:.2f}")
    print(f"Global min: ${overall_stats['global_min']:.2f}")
    print(f"Global max: ${overall_stats['global_max']:.2f}")
    print(f"Chunks processed: {overall_stats['chunks_processed']}")
    
    print("\n=== CHUNK BREAKDOWN ===")
    display(chunk_summary)
```

## Aggregation Rules by Metric Type

### Additive Metrics
✅ **Simply sum across chunks**
- Row counts: `summary_df['row_count'].sum()`
- Total amounts: `summary_df['total_amount'].sum()`
- Any cumulative values

### Averages
⚠️ **Must be weighted by chunk sizes**
```python
# ❌ Wrong - this gives average of averages
wrong_avg = summary_df['avg_amount'].mean()

# ✅ Correct - weighted average
total_amount = summary_df['total_amount'].sum()
total_rows = summary_df['row_count'].sum()
correct_avg = total_amount / total_rows
```

### Min/Max Values
✅ **Take min/max across all chunk values**
```python
global_min = summary_df['min_amount'].min()
global_max = summary_df['max_amount'].max()
```

### Standard Deviation
⚠️ **Complex - requires special handling**
```python
# For standard deviation, you typically need:
# 1. Raw data access, or
# 2. Additional statistics from each chunk (sum of squares, etc.)
# Simple chunk std aggregation is not mathematically valid
```

## Stream Processing Alternative

For simpler aggregation needs, consider calculating running totals during processing:

```python
def stream_process_dataset(query, chunk_size=10000):
    """Process data with running aggregates - no chunk storage"""
    
    # Initialize running aggregates
    total_rows = 0
    total_amount = 0.0
    global_min = float('inf')
    global_max = float('-inf')
    
    sql_server = SQLServerConnection(
        server='localhost\\SQLEXPRESS',
        database='BusinessTestDB',
        trusted_connection=True
    )
    
    if sql_server.connect():
        try:
            chunk_iterator = pd.read_sql_query(
                text(query),
                sql_server.engine,
                chunksize=chunk_size
            )
            
            for chunk_num, chunk_df in enumerate(chunk_iterator):
                amounts = chunk_df.get('OrderAmount', pd.Series([0]))
                
                # Update running totals
                total_rows += len(chunk_df)
                total_amount += amounts.sum()
                global_min = min(global_min, amounts.min())
                global_max = max(global_max, amounts.max())
                
                # Progress update
                if chunk_num % 10 == 0:  # Every 10 chunks
                    print(f"Processed {total_rows:,} rows so far...")
            
            sql_server.close()
            
            return {
                'total_rows': total_rows,
                'total_amount': total_amount,
                'overall_average': total_amount / total_rows if total_rows > 0 else 0,
                'global_min': global_min if global_min != float('inf') else 0,
                'global_max': global_max if global_max != float('-inf') else 0
            }
            
        except Exception as e:
            print(f"Stream processing error: {e}")
            sql_server.close()
            return None
    
    return None

# Usage
results = stream_process_dataset(large_query, 1000)
if results:
    print(f"Final results: {results}")
```

## Key Takeaways

1. **Chunk summaries are intermediate results** - always aggregate them for final statistics
2. **Different metrics require different aggregation methods** - sums vs. averages vs. min/max
3. **Weighted averages are crucial** - never average the averages directly
4. **Consider stream processing** for simpler use cases where you don't need chunk-level details
5. **Complex statistics** (like standard deviation) may require raw data access or specialized formulas

## Next Steps

- Implement the appropriate aggregation method for your use case
- Test with smaller datasets first to verify your aggregation logic
- Consider adding data validation to ensure aggregation accuracy
- Monitor memory usage if storing chunk summaries for very large datasets