# üß© Add-on Module 9: Pandas Integration with External Systems (SQL, Spark & APIs)

In large-scale data workflows, Pandas rarely operates alone. It often integrates with:

- Databases (MySQL, PostgreSQL, SQLite) via **SQLAlchemy**
- Big Data engines like **Apache Spark**
- **REST APIs** for ingestion and enrichment

This module explores how Pandas connects, exchanges, and synchronizes data with external systems efficiently.

## 1Ô∏è‚É£ Integrating Pandas with SQL Databases

Pandas supports SQL operations natively via **SQLAlchemy** ‚Äî a powerful ORM and database abstraction layer.

You can read/write tables, run queries, and even join Pandas DataFrames with SQL tables.

In [ ]:
import pandas as pd
from sqlalchemy import create_engine

# Create an in-memory SQLite database
engine = create_engine('sqlite:///:memory:')

# Sample DataFrame
sales = pd.DataFrame({
    'order_id': [101, 102, 103, 104],
    'customer': ['Alice', 'Bob', 'Charlie', 'David'],
    'amount': [250, 180, 600, 120]
})

# Write to SQL table
sales.to_sql('sales_table', con=engine, index=False, if_exists='replace')

# Read back using SQL query
df_sql = pd.read_sql('SELECT * FROM sales_table WHERE amount > 200', con=engine)
df_sql

### ‚öôÔ∏è Advanced: Parameterized Queries & Joins

In [ ]:
# Create another table
customers = pd.DataFrame({
    'customer': ['Alice', 'Bob', 'Charlie', 'David'],
    'city': ['NY', 'Paris', 'Berlin', 'Delhi']
})
customers.to_sql('customers_table', con=engine, index=False, if_exists='replace')

# Perform join via SQL
query = '''
SELECT s.order_id, s.customer, c.city, s.amount
FROM sales_table AS s
JOIN customers_table AS c ON s.customer = c.customer
WHERE s.amount > :amt
'''

joined_df = pd.read_sql(query, con=engine, params={'amt': 150})
joined_df

## 2Ô∏è‚É£ Working with REST APIs

Pandas can easily ingest JSON responses from RESTful APIs ‚Äî transforming them into DataFrames for analysis.

### Example: Fetching and Normalizing API Data

In [ ]:
import requests

# Example API (placeholder JSON)
url = 'https://jsonplaceholder.typicode.com/posts'
response = requests.get(url)
posts = pd.json_normalize(response.json())
posts.head()

### üîç API + Pandas Processing Workflow
Use case: Combine API data with existing datasets for enrichment.

For instance, enriching a transaction dataset with currency exchange rates or weather data from APIs.

In [ ]:
# Simulate enrichment workflow
transactions = pd.DataFrame({
    'txn_id': [1, 2, 3],
    'amount_usd': [200, 150, 350]
})

# Suppose API provides current USD ‚Üí EUR rate
exchange_rate = 0.93  # Mocked API response
transactions['amount_eur'] = transactions['amount_usd'] * exchange_rate
transactions

## 3Ô∏è‚É£ Pandas with PySpark: Bridging Small and Big Data

Pandas integrates seamlessly with **PySpark DataFrames** for distributed processing.

Use this when scaling Pandas pipelines to terabytes of data while retaining Pandas-like syntax.

In [ ]:
from pyspark.sql import SparkSession

# Initialize Spark Session
spark = SparkSession.builder.appName('PandasIntegration').getOrCreate()

# Convert Pandas ‚Üí Spark
spark_df = spark.createDataFrame(sales)

# Spark transformations
spark_df.createOrReplaceTempView('sales')
spark_result = spark.sql('SELECT customer, SUM(amount) as total_spent FROM sales GROUP BY customer')

# Convert back to Pandas
pandas_result = spark_result.toPandas()
pandas_result

### üß© Real-World Problem 1: Data Warehouse ETL Pipeline

**Scenario:**
- Load millions of order records from PostgreSQL
- Clean, transform, and aggregate them in Pandas
- Write the summary back to SQL for reporting

**Workflow:**
```text
PostgreSQL ‚Üí SQLAlchemy ‚Üí Pandas Cleaning ‚Üí Pandas GroupBy ‚Üí SQL Write-Back
```

### üß© Real-World Problem 2: API Data Enrichment for E-Commerce

**Scenario:**
- Fetch product reviews from an external API.
- Merge API sentiment scores with internal sales.
- Generate a performance report combining both sources.

**Approach:**
- Use `requests` + `json_normalize()` to flatten API JSON.
- Merge on `product_id`.
- Save the merged DataFrame to Parquet for fast analytics.

## üß† Under the Hood

- **SQLAlchemy** translates Python operations to SQL syntax.
- **Pandas** uses `pyarrow` for efficient Arrow-based transfers.
- **Spark Integration** uses Arrow serialization via `toPandas()` and `createDataFrame()`.
- **REST APIs** are JSON-based and flattened into tabular form using `json_normalize()`.

## ‚úÖ Best Practices

- Always use parameterized queries for security.
- Prefer **Parquet** or **Arrow** when exchanging large data.
- Use **connection pooling** in production for SQL.
- Cache API responses when possible to reduce latency.
- Benchmark I/O performance using `%timeit` or `perf_counter`.

## ‚ö° Challenge Exercise

You work at a logistics company managing millions of shipments.

1. Fetch live shipment status from a REST API.
2. Merge it with SQL-based shipment data in Pandas.
3. Transform timestamps to local timezones using Pandas `dt`.
4. Write the combined DataFrame to Parquet.
5. Push summary metrics (on-time %, delayed %, avg delivery time) back to the SQL database.