# MongoDB to PostgreSQL Data Migration

This notebook demonstrates the complete process of migrating data from MongoDB to PostgreSQL.

## Overview
- **Source Database**: MongoDB (localhost:27017, database: `tutorial`)
- **Target Database**: PostgreSQL (localhost:5432, database: `tutorial_db`)
- **Data Collections**: Depression Index, News Articles, Stock Data, S&P 500, Rainfall

## Table of Contents
1. [Setup and Configuration](#setup)
2. [Create PostgreSQL Database](#create-db)
3. [Extract and Load Standard Tables](#standard-tables)
4. [Handle Stock Data (Normalized)](#stock-data)
5. [Verify Migration](#verify)
6. [Export to CSV](#export-csv)

## 1. Setup and Configuration

Import required libraries and configure database connections.

In [None]:
# Import required libraries
from pymongo import MongoClient
import pandas as pd
from sqlalchemy import create_engine
import psycopg2
from psycopg2.extensions import ISOLATION_LEVEL_AUTOCOMMIT
import os

print("âœ“ All libraries imported successfully")

In [None]:
# MongoDB Configuration
MONGO_HOST = 'localhost'
MONGO_PORT = 27017
MONGO_DB = 'tutorial'

# PostgreSQL Configuration
PG_USER = 'postgres'
PG_PASSWORD = '123'  # UPDATE THIS with your password
PG_HOST = 'localhost'
PG_PORT = 5432
PG_DB = 'tutorial_db'

print("âœ“ Configuration set")
print(f"MongoDB: {MONGO_HOST}:{MONGO_PORT}/{MONGO_DB}")
print(f"PostgreSQL: {PG_HOST}:{PG_PORT}/{PG_DB}")

## 2. Create PostgreSQL Database

Connect to PostgreSQL and create the target database if it doesn't exist.

In [None]:
# Create PostgreSQL database
try:
    conn = psycopg2.connect(
        host=PG_HOST,
        port=PG_PORT,
        user=PG_USER,
        password=PG_PASSWORD,
        database='postgres'  # Connect to default database first
    )
    conn.set_isolation_level(ISOLATION_LEVEL_AUTOCOMMIT)
    cursor = conn.cursor()
    
    # Check if database exists
    cursor.execute(f"SELECT 1 FROM pg_database WHERE datname = '{PG_DB}'")
    if not cursor.fetchone():
        cursor.execute(f"CREATE DATABASE {PG_DB}")
        print(f"âœ“ Database '{PG_DB}' created")
    else:
        print(f"âœ“ Database '{PG_DB}' already exists")
    
    cursor.close()
    conn.close()
    
except Exception as e:
    print(f"Error: {e}")

In [None]:
# Create connections
mongo_client = MongoClient(MONGO_HOST, MONGO_PORT)
mongo_db = mongo_client[MONGO_DB]

pg_engine = create_engine(f'postgresql://{PG_USER}:{PG_PASSWORD}@{PG_HOST}:{PG_PORT}/{PG_DB}')

print("âœ“ Database connections established")

## 3. Extract and Load Standard Tables

Migrate Depression Index, News Articles, S&P 500, and Rainfall data.

### 3.1 Depression Index

In [None]:
# Extract Depression Index data from MongoDB
col_1 = mongo_db.Depression_index
docs_1 = list(col_1.find({}, {"_id": 0, "date": 1, "depression": 1}))

# Transform to DataFrame
df_1 = pd.DataFrame(docs_1)
df_1["date"] = pd.to_datetime(df_1["date"], utc=True).dt.date
df_1.rename(columns={"depression": "depression_index"}, inplace=True)

print(f"Depression Index: {len(df_1)} records")
display(df_1.head())

In [None]:
# Load to PostgreSQL
df_1.to_sql('depression_index', pg_engine, if_exists='replace', index=False)
print("âœ“ depression_index table created")

### 3.2 News Articles (CCnews Depression)

In [None]:
# Extract News data from MongoDB
col_2 = mongo_db.CCnews_Depression
docs_2 = list(col_2.find({}, {"_id": 0, "date": 1, "title": 1, "text": 1}))

# Transform to DataFrame
df_2 = pd.DataFrame(docs_2)
df_2["date"] = pd.to_datetime(df_2["date"], errors="coerce").dt.strftime("%Y-%m-%d")

print(f"News Articles: {len(df_2)} records")
display(df_2.head())

In [None]:
# Load to PostgreSQL
df_2.to_sql('ccnews_depression', pg_engine, if_exists='replace', index=False)
print("âœ“ ccnews_depression table created")

### 3.3 S&P 500 Data

In [None]:
# Extract S&P 500 data from MongoDB
col_5 = mongo_db.SP500
docs_5 = list(col_5.find({}, {"_id": 0}))

# Transform to DataFrame
df_5 = pd.DataFrame(docs_5)
df_5["Date"] = pd.to_datetime(df_5["Date"]).dt.strftime("%Y-%m-%d")

print(f"S&P 500: {len(df_5)} records")
display(df_5.head())

In [None]:
# Load to PostgreSQL
df_5.to_sql('sp500', pg_engine, if_exists='replace', index=False)
print("âœ“ sp500 table created")

### 3.4 Rainfall Data

In [None]:
# Extract Rainfall data from MongoDB
col_6 = mongo_db.Rainfall
docs_6 = list(col_6.find({}, {"_id": 0}))

# Transform to DataFrame
df_6 = pd.DataFrame(docs_6)
df_6["Date"] = pd.to_datetime(df_6["Date"]).dt.strftime("%Y-%m-%d")

print(f"Rainfall: {len(df_6)} records")
display(df_6.head())

In [None]:
# Load to PostgreSQL
df_6.to_sql('rainfall', pg_engine, if_exists='replace', index=False)
print("âœ“ rainfall table created")

## 4. Handle Stock Data (Normalized Format)

Stock data has 2,511 columns which exceeds PostgreSQL limits. We normalize it to a long format:
- **Original**: One row per date with 2,511 columns (Date, Open_AAPL, High_AAPL, ...)
- **Normalized**: Multiple rows per date with 7 columns (date, ticker, open, high, low, close, volume)

In [None]:
# Extract Stock data from MongoDB
print("Loading StockData from MongoDB...")
col_3 = mongo_db.StockData
docs_3 = list(col_3.find({}, {"_id": 0}))

df_3 = pd.DataFrame(docs_3)
df_3["Date"] = pd.to_datetime(df_3["Date"]).dt.strftime("%Y-%m-%d")

print(f"Original shape: {df_3.shape} (rows, columns)")
print(f"Sample columns: {df_3.columns.tolist()[:10]}")

In [None]:
# Transform from wide to long format
print("\nTransforming to normalized format...")

stock_cols = [col for col in df_3.columns if col != 'Date']
records = []

for idx, row in df_3.iterrows():
    date = row['Date']
    
    # Extract unique tickers
    tickers = set()
    for col in stock_cols:
        parts = col.split('_')
        if len(parts) >= 2:
            tickers.add(parts[-1])
    
    # Create one record per ticker
    for ticker in tickers:
        record = {
            'date': date,
            'ticker': ticker,
            'open': row.get(f'Open_{ticker}'),
            'high': row.get(f'High_{ticker}'),
            'low': row.get(f'Low_{ticker}'),
            'close': row.get(f'Close_{ticker}'),
            'volume': row.get(f'Volume_{ticker}')
        }
        records.append(record)
    
    if (idx + 1) % 500 == 0:
        print(f"  Processed {idx + 1} rows...")

df_normalized = pd.DataFrame(records)
print(f"\nNormalized shape: {df_normalized.shape}")
display(df_normalized.head(10))

In [None]:
# Load to PostgreSQL
print("\nSending to PostgreSQL...")
with pg_engine.connect() as conn:
    df_normalized.to_sql(
        'stock_data', 
        conn, 
        if_exists='replace', 
        index=False, 
        method='multi', 
        chunksize=5000
    )

print("âœ“ stock_data table created")
print(f"\nTotal records: {len(df_normalized):,}")
print(f"Unique tickers: {df_normalized['ticker'].nunique()}")
print(f"Date range: {df_normalized['date'].min()} to {df_normalized['date'].max()}")

## 5. Verify Migration

Query PostgreSQL to verify all tables were created successfully.

In [None]:
# List all tables in PostgreSQL
query = """
SELECT table_name 
FROM information_schema.tables 
WHERE table_schema = 'public'
ORDER BY table_name;
"""

tables = pd.read_sql(query, pg_engine)
print("Tables in PostgreSQL:")
display(tables)

In [None]:
# Get row counts for each table
table_stats = []

for table in ['depression_index', 'ccnews_depression', 'stock_data', 'sp500', 'rainfall']:
    count_query = f"SELECT COUNT(*) as count FROM {table}"
    result = pd.read_sql(count_query, pg_engine)
    table_stats.append({'table': table, 'rows': result['count'][0]})

df_stats = pd.DataFrame(table_stats)
print("\nTable Statistics:")
display(df_stats)

In [None]:
# Sample query: Get recent stock data for a specific ticker
sample_query = """
SELECT * FROM stock_data 
WHERE ticker = 'AAPL' 
ORDER BY date DESC 
LIMIT 10;
"""

sample_data = pd.read_sql(sample_query, pg_engine)
print("\nSample Query - Recent AAPL stock data:")
display(sample_data)

## 6. Export to CSV Files

Export all tables to CSV files for easy sharing with team members.

In [None]:
# Create output directory
output_dir = 'csv_exports'
os.makedirs(output_dir, exist_ok=True)

print(f"Exporting tables to '{output_dir}' folder...")
print("=" * 60)

In [None]:
# Export each table
tables_to_export = [
    {'name': 'depression_index', 'filename': 'depression_index.csv'},
    {'name': 'ccnews_depression', 'filename': 'ccnews_depression.csv'},
    {'name': 'stock_data', 'filename': 'stock_data.csv'},
    {'name': 'sp500', 'filename': 'sp500.csv'},
    {'name': 'rainfall', 'filename': 'rainfall.csv'}
]

export_results = []

for table_info in tables_to_export:
    table_name = table_info['name']
    filename = table_info['filename']
    filepath = os.path.join(output_dir, filename)
    
    try:
        # Read from PostgreSQL
        df = pd.read_sql_table(table_name, pg_engine)
        
        # Export to CSV
        df.to_csv(filepath, index=False, encoding='utf-8')
        
        # Get file size
        file_size = os.path.getsize(filepath) / (1024 * 1024)  # MB
        
        export_results.append({
            'table': table_name,
            'rows': len(df),
            'file': filename,
            'size_mb': round(file_size, 2),
            'status': 'âœ“ Success'
        })
        
    except Exception as e:
        export_results.append({
            'table': table_name,
            'rows': 0,
            'file': filename,
            'size_mb': 0,
            'status': f'âœ— Error: {e}'
        })

df_exports = pd.DataFrame(export_results)
print("\nExport Results:")
display(df_exports)

In [None]:
print(f"\nâœ… All files exported to: {os.path.abspath(output_dir)}")
print("\nShare this folder with your team members!")

## Summary

### Migration Complete! ðŸŽ‰

**Database Information:**
- **Source**: MongoDB (`tutorial` database)
- **Target**: PostgreSQL (`tutorial_db` database)

**Tables Created:**
1. `depression_index` - Depression index data by date
2. `ccnews_depression` - News articles related to depression
3. `stock_data` - Stock market data (normalized format with 502 tickers)
4. `sp500` - S&P 500 index data
5. `rainfall` - Rainfall data by US state

**CSV Exports:**
- All tables exported to `csv_exports/` folder
- Ready to share with team members

### Example Queries:

```sql
-- Get Apple stock data
SELECT * FROM stock_data WHERE ticker = 'AAPL' ORDER BY date DESC LIMIT 10;

-- Compare multiple stocks
SELECT ticker, AVG(close) as avg_close 
FROM stock_data 
WHERE ticker IN ('AAPL', 'MSFT', 'GOOGL') 
GROUP BY ticker;

-- Join depression index with S&P 500
SELECT d.date, d.depression_index, s.close_gspc 
FROM depression_index d 
JOIN sp500 s ON d.date::text = s.date;
```