# SageMaker Database-Connected ML Pipeline

This notebook demonstrates how to:
1. Connect to your MySQL database
2. Train ML models using your transaction data
3. Deploy models to SageMaker endpoints
4. Make predictions and store results back in the database

## 1. Setup and Database Connection

In [None]:
# Install required packages
!pip install pymysql pandas numpy scikit-learn tensorflow prophet statsmodels boto3 sagemaker

In [None]:
import pandas as pd
import numpy as np
import pymysql
import boto3
import sagemaker
from sagemaker import get_execution_role
from sagemaker.tensorflow import TensorFlow
from sagemaker.sklearn import SKLearn
import json
from datetime import datetime, timedelta
import matplotlib.pyplot as plt
import seaborn as sns

# SageMaker session setup
role = get_execution_role()
session = sagemaker.Session()
bucket = session.default_bucket()
region = session.boto_region_name

print(f"SageMaker role: {role}")
print(f"SageMaker bucket: {bucket}")
print(f"Region: {region}")

In [None]:
# Database connection configuration
DB_CONFIG = {
    'host': 'your-rds-endpoint.amazonaws.com',  # Replace with your RDS endpoint
    'user': 'admin',
    'password': 'your-password',  # Replace with your password
    'database': 'inventory_management',
    'port': 3306
}

def get_db_connection():
    """Create database connection"""
    return pymysql.connect(**DB_CONFIG)

# Test connection
try:
    conn = get_db_connection()
    print("‚úÖ Database connection successful")
    conn.close()
except Exception as e:
    print(f"‚ùå Database connection failed: {e}")

## 2. Data Extraction and Analysis

In [None]:
def fetch_training_data():
    """Fetch transaction data for ML training"""
    query = """
    SELECT 
        product_id,
        product_name,
        category,
        store_id,
        DATE(created_at) as date,
        SUM(CASE WHEN transaction_type = 'Sale' THEN quantity ELSE 0 END) as sales_quantity,
        SUM(CASE WHEN transaction_type = 'Sale' THEN total_amount ELSE 0 END) as sales_amount,
        AVG(unit_price) as avg_price,
        COUNT(*) as transaction_count
    FROM inventory_transactions 
    WHERE created_at >= DATE_SUB(NOW(), INTERVAL 180 DAY)
    GROUP BY product_id, store_id, DATE(created_at)
    ORDER BY product_id, store_id, date
    """
    
    conn = get_db_connection()
    df = pd.read_sql(query, conn)
    conn.close()
    
    return df

# Load training data
training_data = fetch_training_data()
print(f"Training data shape: {training_data.shape}")
print("\nFirst few rows:")
training_data.head()

In [None]:
# Data analysis and visualization
plt.figure(figsize=(15, 10))

# Sales trends by category
plt.subplot(2, 2, 1)
category_sales = training_data.groupby('category')['sales_amount'].sum().sort_values(ascending=False)
category_sales.plot(kind='bar')
plt.title('Sales by Category')
plt.xticks(rotation=45)

# Daily sales trend
plt.subplot(2, 2, 2)
daily_sales = training_data.groupby('date')['sales_amount'].sum()
daily_sales.plot()
plt.title('Daily Sales Trend')
plt.xticks(rotation=45)

# Top products by sales
plt.subplot(2, 2, 3)
top_products = training_data.groupby('product_name')['sales_quantity'].sum().nlargest(10)
top_products.plot(kind='barh')
plt.title('Top 10 Products by Quantity Sold')

# Store performance
plt.subplot(2, 2, 4)
store_sales = training_data.groupby('store_id')['sales_amount'].sum()
store_sales.plot(kind='bar')
plt.title('Sales by Store')
plt.xticks(rotation=0)

plt.tight_layout()
plt.show()

## 3. Prepare Data for Different ML Models

In [None]:
def prepare_lstm_data(product_id, store_id, sequence_length=30):
    """Prepare time series data for LSTM model"""
    product_data = training_data[
        (training_data['product_id'] == product_id) & 
        (training_data['store_id'] == store_id)
    ].copy()
    
    if len(product_data) < sequence_length + 10:
        return None, None
    
    # Create complete date range and fill missing values
    date_range = pd.date_range(
        start=product_data['date'].min(),
        end=product_data['date'].max(),
        freq='D'
    )
    
    complete_data = pd.DataFrame({'date': date_range})
    complete_data = complete_data.merge(product_data, on='date', how='left')
    complete_data['sales_quantity'] = complete_data['sales_quantity'].fillna(0)
    
    # Create sequences
    sales_data = complete_data['sales_quantity'].values
    
    X, y = [], []
    for i in range(sequence_length, len(sales_data)):
        X.append(sales_data[i-sequence_length:i])
        y.append(sales_data[i])
    
    return np.array(X), np.array(y)

def prepare_classification_features():
    """Prepare features for ABC classification"""
    features = training_data.groupby(['product_id', 'store_id']).agg({
        'sales_quantity': ['sum', 'mean', 'std', 'count'],
        'sales_amount': ['sum', 'mean', 'std'],
        'avg_price': ['mean', 'std']
    }).round(4)
    
    # Flatten column names
    features.columns = ['_'.join(col).strip() for col in features.columns]
    
    # Add derived features
    features['revenue_per_transaction'] = features['sales_amount_sum'] / features['sales_quantity_count']
    features['price_volatility'] = features['avg_price_std'] / features['avg_price_mean']
    features['demand_volatility'] = features['sales_quantity_std'] / features['sales_quantity_mean']
    
    # Handle infinite and NaN values
    features = features.replace([np.inf, -np.inf], 0).fillna(0)
    
    return features

# Prepare sample data
unique_products = training_data[['product_id', 'store_id']].drop_duplicates()
print(f"Unique product-store combinations: {len(unique_products)}")

# Show sample LSTM data preparation
sample_product = unique_products.iloc[0]
X_sample, y_sample = prepare_lstm_data(sample_product['product_id'], sample_product['store_id'])

if X_sample is not None:
    print(f"Sample LSTM data - X shape: {X_sample.shape}, y shape: {y_sample.shape}")
else:
    print("Insufficient data for sample product")

# Show classification features
classification_features = prepare_classification_features()
print(f"\nClassification features shape: {classification_features.shape}")
print("\nFeature names:")
print(list(classification_features.columns))

## 4. Train LSTM Model with SageMaker

In [None]:
# Save training data to S3
s3_client = boto3.client('s3')
training_data_key = 'training-data/transactions.csv'

# Upload training data
training_data.to_csv('/tmp/transactions.csv', index=False)
s3_client.upload_file('/tmp/transactions.csv', bucket, training_data_key)

print(f"Training data uploaded to s3://{bucket}/{training_data_key}")

In [None]:
# Create LSTM training script
lstm_script = """
import argparse
import os
import json
import pandas as pd
import numpy as np
import tensorflow as tf
from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import LSTM, Dense, Dropout
from sklearn.preprocessing import MinMaxScaler
from sklearn.metrics import mean_absolute_error, mean_squared_error
import joblib

def create_sequences(data, seq_length):
    X, y = [], []
    for i in range(seq_length, len(data)):
        X.append(data[i-seq_length:i])
        y.append(data[i])
    return np.array(X), np.array(y)

def build_lstm_model(seq_length, n_features=1):
    model = Sequential([
        LSTM(50, return_sequences=True, input_shape=(seq_length, n_features)),
        Dropout(0.2),
        LSTM(50, return_sequences=True),
        Dropout(0.2),
        LSTM(50),
        Dropout(0.2),
        Dense(25),
        Dense(1)
    ])
    
    model.compile(optimizer='adam', loss='mse', metrics=['mae'])
    return model

if __name__ == '__main__':
    parser = argparse.ArgumentParser()
    parser.add_argument('--model-dir', type=str, default=os.environ.get('SM_MODEL_DIR'))
    parser.add_argument('--train', type=str, default=os.environ.get('SM_CHANNEL_TRAINING'))
    parser.add_argument('--epochs', type=int, default=50)
    parser.add_argument('--batch-size', type=int, default=32)
    parser.add_argument('--seq-length', type=int, default=30)
    
    args = parser.parse_args()
    
    # Load data
    df = pd.read_csv(os.path.join(args.train, 'transactions.csv'))
    
    # Prepare aggregated daily sales data
    daily_sales = df.groupby('date')['sales_quantity'].sum().sort_index()
    
    # Scale data
    scaler = MinMaxScaler()
    scaled_data = scaler.fit_transform(daily_sales.values.reshape(-1, 1))
    
    # Create sequences
    X, y = create_sequences(scaled_data.flatten(), args.seq_length)
    
    # Split data
    train_size = int(0.8 * len(X))
    X_train, X_test = X[:train_size], X[train_size:]
    y_train, y_test = y[:train_size], y[train_size:]
    
    # Reshape for LSTM
    X_train = X_train.reshape((X_train.shape[0], X_train.shape[1], 1))
    X_test = X_test.reshape((X_test.shape[0], X_test.shape[1], 1))
    
    # Build and train model
    model = build_lstm_model(args.seq_length)
    
    history = model.fit(
        X_train, y_train,
        epochs=args.epochs,
        batch_size=args.batch_size,
        validation_data=(X_test, y_test),
        verbose=1
    )
    
    # Evaluate
    test_predictions = model.predict(X_test)
    test_mae = mean_absolute_error(y_test, test_predictions)
    test_rmse = np.sqrt(mean_squared_error(y_test, test_predictions))
    
    # Save model
    model.save(os.path.join(args.model_dir, 'lstm_model.h5'))
    
    # Save scaler
    joblib.dump(scaler, os.path.join(args.model_dir, 'scaler.pkl'))
    
    # Save metadata
    metadata = {
        'seq_length': args.seq_length,
        'test_mae': float(test_mae),
        'test_rmse': float(test_rmse),
        'model_type': 'lstm_demand_forecasting'
    }
    
    with open(os.path.join(args.model_dir, 'metadata.json'), 'w') as f:
        json.dump(metadata, f)
    
    print(f"Training completed. Test MAE: {test_mae:.4f}, RMSE: {test_rmse:.4f}")
"""

# Save training script
with open('lstm_training.py', 'w') as f:
    f.write(lstm_script)

print("LSTM training script created")

In [None]:
# Train LSTM model with SageMaker
lstm_estimator = TensorFlow(
    entry_point='lstm_training.py',
    role=role,
    instance_count=1,
    instance_type='ml.m5.large',
    framework_version='2.8.0',
    py_version='py39',
    hyperparameters={
        'epochs': 50,
        'batch-size': 32,
        'seq-length': 30
    }
)

# Start training
training_input = f's3://{bucket}/training-data/'
lstm_estimator.fit({'training': training_input})

print("LSTM model training completed!")

## 5. Deploy Model to SageMaker Endpoint

In [None]:
# Create inference script
inference_script = """
import json
import numpy as np
import tensorflow as tf
from sklearn.preprocessing import MinMaxScaler
import joblib
import os

def model_fn(model_dir):
    model = tf.keras.models.load_model(os.path.join(model_dir, 'lstm_model.h5'))
    scaler = joblib.load(os.path.join(model_dir, 'scaler.pkl'))
    
    with open(os.path.join(model_dir, 'metadata.json'), 'r') as f:
        metadata = json.load(f)
    
    return {'model': model, 'scaler': scaler, 'metadata': metadata}

def input_fn(request_body, request_content_type):
    if request_content_type == 'application/json':
        return json.loads(request_body)
    else:
        raise ValueError(f"Unsupported content type: {request_content_type}")

def predict_fn(input_data, model_dict):
    model = model_dict['model']
    scaler = model_dict['scaler']
    metadata = model_dict['metadata']
    
    historical_data = input_data.get('historical_data', [])
    forecast_days = input_data.get('forecast_days', 30)
    
    if len(historical_data) == 0:
        return {'error': 'No historical data provided'}
    
    # Scale input data
    scaled_data = scaler.transform(np.array(historical_data).reshape(-1, 1))
    
    seq_length = metadata['seq_length']
    
    # Generate predictions
    predictions = []
    current_sequence = scaled_data[-seq_length:].flatten()
    
    for _ in range(forecast_days):
        model_input = current_sequence.reshape(1, seq_length, 1)
        next_pred = model.predict(model_input)[0][0]
        predictions.append(next_pred)
        
        # Update sequence
        current_sequence = np.append(current_sequence[1:], next_pred)
    
    # Inverse transform
    actual_predictions = scaler.inverse_transform(
        np.array(predictions).reshape(-1, 1)
    ).flatten()
    
    # Ensure non-negative
    actual_predictions = np.maximum(actual_predictions, 0)
    
    return {
        'predictions': actual_predictions.tolist(),
        'confidence_lower': (actual_predictions * 0.8).tolist(),
        'confidence_upper': (actual_predictions * 1.2).tolist(),
        'model_accuracy': metadata.get('test_mae', 0.0),
        'forecast_horizon': forecast_days
    }

def output_fn(prediction, content_type):
    if content_type == 'application/json':
        return json.dumps(prediction)
    else:
        raise ValueError(f"Unsupported content type: {content_type}")
"""

# Save inference script
with open('inference.py', 'w') as f:
    f.write(inference_script)

print("Inference script created")

In [None]:
# Deploy model to endpoint
endpoint_name = f'demand-forecasting-{datetime.now().strftime("%Y%m%d%H%M")}

predictor = lstm_estimator.deploy(
    initial_instance_count=1,
    instance_type='ml.m5.large',
    endpoint_name=endpoint_name
)

print(f"Model deployed to endpoint: {endpoint_name}")

## 6. Test Predictions and Store in Database

In [None]:
# Test prediction
test_data = {
    'historical_data': [10, 12, 15, 8, 20, 18, 14, 16, 11, 13, 19, 22, 17, 9, 25, 21, 16, 12, 14, 18, 23, 19, 15, 11, 24, 20, 17, 13, 16, 22],
    'forecast_days': 7
}

# Make prediction
result = predictor.predict(test_data)
print("Prediction result:")
print(json.dumps(result, indent=2))

In [None]:
def store_prediction_in_db(product_id, store_id, prediction_result):
    """Store prediction results in database"""
    conn = get_db_connection()
    cursor = conn.cursor()
    
    # Create predictions table if not exists
    create_table_query = """
    CREATE TABLE IF NOT EXISTS demand_predictions (
        id INT AUTO_INCREMENT PRIMARY KEY,
        product_id VARCHAR(50) NOT NULL,
        store_id VARCHAR(50) NOT NULL,
        model_type VARCHAR(50) NOT NULL,
        forecast_days INT NOT NULL,
        predictions JSON NOT NULL,
        confidence_lower JSON,
        confidence_upper JSON,
        model_accuracy DECIMAL(5,4),
        created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
        INDEX idx_product_store (product_id, store_id),
        INDEX idx_created_at (created_at)
    )
    """
    cursor.execute(create_table_query)
    
    # Insert prediction
    insert_query = """
    INSERT INTO demand_predictions 
    (product_id, store_id, model_type, forecast_days, predictions, confidence_lower, confidence_upper, model_accuracy)
    VALUES (%s, %s, %s, %s, %s, %s, %s, %s)
    """
    
    values = (
        product_id,
        store_id,
        'lstm',
        prediction_result['forecast_horizon'],
        json.dumps(prediction_result['predictions']),
        json.dumps(prediction_result['confidence_lower']),
        json.dumps(prediction_result['confidence_upper']),
        prediction_result['model_accuracy']
    )
    
    cursor.execute(insert_query, values)
    conn.commit()
    conn.close()
    
    print(f"‚úÖ Prediction stored for product {product_id} in store {store_id}")

# Store test prediction
store_prediction_in_db('FV-BAN-001', 'store_001', result)

# Verify stored prediction
conn = get_db_connection()
stored_predictions = pd.read_sql(
    "SELECT * FROM demand_predictions ORDER BY created_at DESC LIMIT 5", 
    conn
)
conn.close()

print("\nRecent predictions in database:")
print(stored_predictions)

## 7. Batch Predictions for All Products

In [None]:
def run_batch_predictions():
    """Run predictions for all products with sufficient data"""
    conn = get_db_connection()
    
    # Get products with sufficient transaction history
    query = """
    SELECT product_id, store_id, COUNT(*) as transaction_count
    FROM inventory_transactions 
    WHERE created_at >= DATE_SUB(NOW(), INTERVAL 90 DAY)
    AND transaction_type = 'Sale'
    GROUP BY product_id, store_id
    HAVING COUNT(*) >= 10
    ORDER BY transaction_count DESC
    """
    
    products_to_predict = pd.read_sql(query, conn)
    conn.close()
    
    print(f"Found {len(products_to_predict)} products for batch prediction")
    
    # Run predictions for top products
    successful_predictions = 0
    
    for idx, product in products_to_predict.head(5).iterrows():  # Limit to 5 for demo
        try:
            # Get historical data for this product
            conn = get_db_connection()
            history_query = """
            SELECT DATE(created_at) as date, SUM(quantity) as sales
            FROM inventory_transactions
            WHERE product_id = %s AND store_id = %s AND transaction_type = 'Sale'
            AND created_at >= DATE_SUB(NOW(), INTERVAL 60 DAY)
            GROUP BY DATE(created_at)
            ORDER BY date
            """
            
            history_df = pd.read_sql(history_query, conn, params=[product['product_id'], product['store_id']])
            conn.close()
            
            if len(history_df) >= 30:  # Need at least 30 days
                historical_data = history_df['sales'].tolist()
                
                # Make prediction
                test_data = {
                    'historical_data': historical_data,
                    'forecast_days': 30
                }
                
                prediction_result = predictor.predict(test_data)
                
                # Store in database
                store_prediction_in_db(
                    product['product_id'], 
                    product['store_id'], 
                    prediction_result
                )
                
                successful_predictions += 1
                print(f"‚úÖ Prediction {successful_predictions}: {product['product_id']} - {product['store_id']}")
            
        except Exception as e:
            print(f"‚ùå Failed prediction for {product['product_id']}: {str(e)}")
    
    print(f"\nBatch prediction completed: {successful_predictions} successful predictions")

# Run batch predictions
run_batch_predictions()

## 8. Visualization and Analysis

In [None]:
# Visualize predictions
def visualize_predictions():
    conn = get_db_connection()
    
    # Get recent predictions
    predictions_df = pd.read_sql(
        "SELECT * FROM demand_predictions ORDER BY created_at DESC LIMIT 3",
        conn
    )
    conn.close()
    
    if len(predictions_df) == 0:
        print("No predictions found in database")
        return
    
    fig, axes = plt.subplots(len(predictions_df), 1, figsize=(12, 4*len(predictions_df)))
    
    if len(predictions_df) == 1:
        axes = [axes]
    
    for idx, (_, prediction) in enumerate(predictions_df.iterrows()):
        predictions = json.loads(prediction['predictions'])
        confidence_lower = json.loads(prediction['confidence_lower'])
        confidence_upper = json.loads(prediction['confidence_upper'])
        
        days = list(range(1, len(predictions) + 1))
        
        axes[idx].plot(days, predictions, 'b-', label='Prediction', linewidth=2)
        axes[idx].fill_between(days, confidence_lower, confidence_upper, 
                              alpha=0.3, color='blue', label='Confidence Interval')
        
        axes[idx].set_title(f'Demand Forecast - {prediction["product_id"]} ({prediction["store_id"]})')
        axes[idx].set_xlabel('Days')
        axes[idx].set_ylabel('Predicted Demand')
        axes[idx].legend()
        axes[idx].grid(True, alpha=0.3)
    
    plt.tight_layout()
    plt.show()

# Show predictions
visualize_predictions()

## 9. API Integration Test

In [None]:
# Test API integration (assuming your Express server is running)
import requests

# Update these with your actual endpoint details
API_BASE_URL = 'http://localhost:8080/api/ml'  # Replace with your server URL
ENDPOINT_NAME = endpoint_name  # The endpoint we just created

# Update environment variables in your Express server
print(f"Set these environment variables in your Express server:")
print(f"LSTM_ENDPOINT_NAME={ENDPOINT_NAME}")
print(f"AWS_REGION={region}")

# Test API call (uncomment when your server is running)
# test_request = {
#     'product_id': 'FV-BAN-001',
#     'store_id': 'store_001',
#     'forecast_days': 14,
#     'model_type': 'lstm'
# }

# try:
#     response = requests.post(f'{API_BASE_URL}/forecast', json=test_request)
#     print("API Response:")
#     print(json.dumps(response.json(), indent=2))
# except Exception as e:
#     print(f"API test failed: {e}")
#     print("Make sure your Express server is running with the ML routes enabled")

## 10. Cleanup and Next Steps

In [None]:
# Summary of what we've accomplished
print("üéâ SageMaker Database Integration Complete!")
print("\nWhat we've accomplished:")
print("‚úÖ Connected to MySQL database")
print("‚úÖ Extracted and analyzed transaction data")
print("‚úÖ Trained LSTM model with SageMaker")
print("‚úÖ Deployed model to real-time endpoint")
print("‚úÖ Made predictions and stored results in database")
print("‚úÖ Created API integration layer")

print("\nNext steps:")
print("1. Update your Express server environment variables")
print("2. Test the ML API endpoints")
print("3. Integrate predictions into your frontend")
print("4. Set up automated retraining schedule")
print("5. Add more ML models (ARIMA, Prophet, Classification)")

print(f"\nEndpoint created: {endpoint_name}")
print(f"S3 bucket used: {bucket}")
print(f"Region: {region}")

In [None]:
# Optional: Clean up endpoint to save costs
# predictor.delete_endpoint()
# print("Endpoint deleted to save costs")