# Apache Spark with PostgreSQL Integration

This notebook demonstrates how to integrate Apache Spark with PostgreSQL using PySpark. We'll cover connecting to a PostgreSQL database, reading data into Spark DataFrames, performing analysis, and writing results back to the database.

## Overview

Apache Spark is a powerful distributed computing framework that can process large datasets efficiently. When combined with PostgreSQL:
- Enables processing of large-scale data stored in PostgreSQL
- Provides SQL capabilities alongside Spark's distributed computing features
- Allows for seamless integration with existing database infrastructure

## Setup and Environment

In [None]:
# Install required libraries if not already installed
!pip install pyspark psycopg2-binary

In [None]:
# Import necessary libraries
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, sum as sum_, avg, count
import matplotlib.pyplot as plt
import pandas as pd

# Configure matplotlib for notebook
%matplotlib inline
plt.style.use('ggplot')

In [None]:
# Initialize Spark Session with PostgreSQL JDBC driver
spark = SparkSession.builder \
    .appName("Spark-PostgreSQL Integration") \
    .config("spark.jars.packages", "org.postgresql:postgresql:42.7.3") \
    .getOrCreate()

# Database connection properties
db_properties = {
    "url": "jdbc:postgresql://localhost:5432/sample_db",
    "user": "postgres",
    "password": "password",
    "driver": "org.postgresql.Driver"
}

## Loading Data from PostgreSQL

Let's assume we have the same sample data structure as in our previous notebooks, but stored in PostgreSQL tables: sales_data, products, and customers.

In [None]:
# Read tables from PostgreSQL
sales_data = spark.read \
    .format("jdbc") \
    .option("url", db_properties["url"]) \
    .option("dbtable", "sales_data") \
    .option("user", db_properties["user"]) \
    .option("password", db_properties["password"]) \
    .option("driver", db_properties["driver"]) \
    .load()

products = spark.read \
    .format("jdbc") \
    .option("url", db_properties["url"]) \
    .option("dbtable", "products") \
    .option("user", db_properties["user"]) \
    .option("password", db_properties["password"]) \
    .option("driver", db_properties["driver"]) \
    .load()

customers = spark.read \
    .format("jdbc") \
    .option("url", db_properties["url"]) \
    .option("dbtable", "customers") \
    .option("user", db_properties["user"]) \
    .option("password", db_properties["password"]) \
    .option("driver", db_properties["driver"]) \
    .load()

In [None]:
# Register tables as temporary views for SQL queries
sales_data.createOrReplaceTempView("sales_data")
products.createOrReplaceTempView("products")
customers.createOrReplaceTempView("customers")

In [None]:
# Display sample data
print("Sales Data Sample:")
sales_data.show(5)

print("Products Sample:")
products.show(5)

print("Customers Sample:")
customers.show(5)

## Basic Spark SQL Analysis

Let's perform some analysis using Spark SQL.

In [None]:
# Simple query: Top sales by quantity
top_sales_query = """
SELECT *
FROM sales_data
WHERE quantity > 40
ORDER BY quantity DESC
LIMIT 10
"""

top_sales = spark.sql(top_sales_query)
top_sales.show()

In [None]:
# Product summary analysis
product_summary_query = """
SELECT 
    product_id,
    COUNT(*) as num_transactions,
    SUM(quantity) as total_quantity,
    SUM(total_price) as total_revenue,
    AVG(unit_price) as avg_unit_price
FROM sales_data
GROUP BY product_id
ORDER BY total_revenue DESC
"""

product_summary = spark.sql(product_summary_query)
product_summary.show()

## Advanced Analysis with Joins

Let's perform some analysis by joining our tables.

In [None]:
# Sales by category using DataFrame API
category_sales = sales_data.join(products, "product_id") \
    .groupBy("category") \
    .agg(
        count("*").alias("num_transactions"),
        sum_("total_price").alias("total_revenue"),
        avg("total_price").alias("avg_transaction_value")
    ) \
    .orderBy(col("total_revenue").desc())

category_sales.show()

In [None]:
# Visualize category sales
# Convert to Pandas for plotting
category_sales_pd = category_sales.toPandas()

plt.figure(figsize=(10, 6))
plt.bar(category_sales_pd['category'], category_sales_pd['total_revenue'])
plt.title('Revenue by Product Category')
plt.xlabel('Category')
plt.ylabel('Total Revenue')
plt.tight_layout()
plt.show()

In [None]:
# Regional category analysis using SQL
regional_category_query = """
SELECT 
    c.region,
    p.category,
    COUNT(*) as num_transactions,
    SUM(s.quantity) as total_quantity,
    SUM(s.total_price) as total_revenue,
    AVG(s.total_price) as avg_transaction_value
FROM sales_data s
JOIN products p ON s.product_id = p.product_id
JOIN customers c ON s.customer_id = c.customer_id
GROUP BY c.region, p.category
ORDER BY total_revenue DESC
"""

regional_category_sales = spark.sql(regional_category_query)
regional_category_sales.show()

In [None]:
# Visualize regional category sales
regional_category_pd = regional_category_sales.toPandas()

plt.figure(figsize=(12, 6))
regional_category_pivot = regional_category_pd.pivot(index='region', columns='category', values='total_revenue').fillna(0)
regional_category_pivot.plot(kind='bar', stacked=True)
plt.title('Revenue by Region and Category')
plt.xlabel('Region')
plt.ylabel('Total Revenue')
plt.legend(title='Category')
plt.tight_layout()
plt.show()

## Writing Results Back to PostgreSQL

Let's save our analysis results back to PostgreSQL.

In [None]:
# Write category sales to PostgreSQL
category_sales.write \
    .format("jdbc") \
    .option("url", db_properties["url"]) \
    .option("dbtable", "category_sales_summary") \
    .option("user", db_properties["user"]) \
    .option("password", db_properties["password"]) \
    .option("driver", db_properties["driver"]) \
    .mode("overwrite") \
    .save()

# Write regional category sales to PostgreSQL
regional_category_sales.write \
    .format("jdbc") \
    .option("url", db_properties["url"]) \
    .option("dbtable", "regional_category_summary") \
    .option("user", db_properties["user"]) \
    .option("password", db_properties["password"]) \
    .option("driver", db_properties["driver"]) \
    .mode("overwrite") \
    .save()

## Cleanup

In [None]:
# Stop Spark session
spark.stop()

## Conclusion

This notebook demonstrated how to use Apache Spark with PostgreSQL:

1. **Connection**: Established a connection using JDBC driver
2. **Data Operations**: Read from and wrote to PostgreSQL tables
3. **Analysis**: Performed SQL queries and DataFrame operations
4. **Visualization**: Converted results to Pandas for plotting
5. **Persistence**: Saved analysis results back to the database

Key benefits:
- Scalability for large datasets
- Distributed computing capabilities
- Familiar SQL interface

Considerations:
- Requires proper Spark setup and JDBC driver
- More complex setup than Pandas
- Best suited for larger datasets where distributed processing is beneficial