# Final Project - Part 2: PostgreSQL Database Operations

## Overview

In Part 1, you cleaned 7 messy CSV files using R. Now you will:
1. Create a PostgreSQL database with 7 tables (one for each CSV file)
2. Load cleaned data using PostgreSQL COPY commands
3. Perform complex SQL queries for business analysis

## Cleaned Files from Part 1:
- `clean_dealership_sales.csv` - Sales transactions
- `clean_customer_data.csv` - Customer information
- `clean_vehicle_inventory.csv` - Vehicle inventory
- `clean_salesperson_info.csv` - Salesperson details
- `clean_service_records.csv` - Service and maintenance records
- `clean_financing_details.csv` - Loan and financing information
- `clean_warranty_info.csv` - Warranty coverage details

## Part 1: Database Setup

**Instructions:** Set up the database connection parameters and verify the data path.
- Import required libraries: pandas, subprocess, sqlalchemy (create_engine, text), and os
- Define database connection variables: DB_NAME, DB_USER, DB_HOST, DB_PORT, and DATA_PATH
- Set DB_NAME to 'dealership_final_db' and DB_USER to 'student'
- **IMPORTANT:** Set DATA_PATH to point to your `data/` folder where the cleaned CSV files are located
- Create the connection string using PostgreSQL format
- Print confirmation messages showing the data path and database name
- Verify that the data path exists and is accessible

In [1]:
import pandas as pd
import subprocess
from sqlalchemy import create_engine, text
import os

DB_NAME = 'dealership_final_db'
DB_USER = 'student'
DB_HOST = 'localhost'
DB_PORT = '5432'

# Set the path to your data folder containing the cleaned CSV files
DATA_PATH = '/workspaces/Fall2025-MS3083-Base_Template/data/'

connection_string = f'postgresql://{DB_USER}@{DB_HOST}:{DB_PORT}/{DB_NAME}'

print(f"✓ Data path: {DATA_PATH}")
print(f"✓ Database will be created: {DB_NAME}")
print(f"✓ Connection parameters set")

# Verify data path exists
if os.path.exists(DATA_PATH):
    print(f"✓ Data folder verified and accessible")
else:
    print(f"⚠ Warning: Data folder not found at {DATA_PATH}")

✓ Data path: /workspaces/Fall2025-MS3083-Base_Template/data/
✓ Database will be created: dealership_final_db
✓ Connection parameters set
✓ Data folder verified and accessible


In [2]:
# Drop and create database
subprocess.run(['dropdb', '--if-exists', DB_NAME, '-U', DB_USER], capture_output=True)
result = subprocess.run(['createdb', DB_NAME, '-U', DB_USER], capture_output=True, text=True)

if result.returncode == 0:
    print(f"✓ Database '{DB_NAME}' created")
    engine = create_engine(connection_string)
    print("✓ Connection established")
else:
    print(f"Error: {result.stderr}")

✓ Database 'dealership_final_db' created
✓ Connection established


**Instructions:** Drop any existing database and create a fresh database for the project.
- Use subprocess to run the `dropdb` command with `--if-exists` flag to remove any existing database
- Use subprocess to run the `createdb` command to create a new database
- Check the return code to verify success
- If successful, create a SQLAlchemy engine and print confirmation messages
- If there's an error, print the error message

## Part 2: Create Tables

**Instructions:** Create the customers table with appropriate columns and data types.
- Write a CREATE TABLE statement for the customers table
- Include columns: customer_id (SERIAL PRIMARY KEY), first_name, last_name, full_name, email, phone, state, zip_code, registration_date
- Use appropriate data types: VARCHAR for text fields, DATE for dates
- Set NOT NULL constraints where appropriate
- Execute the SQL statement using the engine connection
- Commit the transaction and print a confirmation message

In [50]:
# Create customers table
create_customers = """
DROP TABLE IF EXISTS customers CASCADE;
CREATE TABLE customers (
    customer_id SERIAL PRIMARY KEY,
    first_name VARCHAR(50) NOT NULL,
    last_name VARCHAR(50) NOT NULL,
    full_name VARCHAR(100),
    email VARCHAR(100),
    phone VARCHAR(20),
    state VARCHAR(2),
    zip_code VARCHAR(10),
    registration_date DATE
);
"""

with engine.connect() as conn:
    conn.execute(text(create_customers))
    conn.commit()
    
print("✓ customers table created")

✓ customers table created


In [51]:
# Create salespeople table
create_salespeople = """
DROP TABLE IF EXISTS salespeople CASCADE;
CREATE TABLE salespeople (
    salesperson_id SERIAL PRIMARY KEY,
    salesperson_name VARCHAR(100) NOT NULL,
    hire_date DATE,
    email VARCHAR(100),
    phone VARCHAR(20),
    commission_rate DECIMAL(4,3),
    department VARCHAR(50),
    status VARCHAR(20)
);
"""

with engine.connect() as conn:
    conn.execute(text(create_salespeople))
    conn.commit()
    
print("✓ salespeople table created")

✓ salespeople table created


**Instructions:** Create the salespeople table to store employee information.
- Write a CREATE TABLE statement for the salespeople table
- Include columns: salesperson_id (SERIAL PRIMARY KEY), salesperson_name, hire_date, email, phone, commission_rate, department, status
- Use DECIMAL(4,3) for commission_rate to store percentages with precision
- Execute the SQL and commit the transaction
- Print a confirmation message

In [52]:
# Create vehicles table
create_vehicles = """
DROP TABLE IF EXISTS vehicles CASCADE;
CREATE TABLE vehicles (
    vehicle_id SERIAL PRIMARY KEY,
    vin VARCHAR(17) UNIQUE NOT NULL,
    make VARCHAR(50) NOT NULL,
    model VARCHAR(50) NOT NULL,
    year INTEGER NOT NULL,
    color VARCHAR(30),
    mileage INTEGER,
    condition VARCHAR(20),
    purchase_price DECIMAL(10,2),
    lot_date DATE
);
"""

with engine.connect() as conn:
    conn.execute(text(create_vehicles))
    conn.commit()
    
print("✓ vehicles table created")

✓ vehicles table created


**Instructions:** Create the vehicles table to store vehicle inventory.
- Write a CREATE TABLE statement for the vehicles table
- Include columns: vehicle_id (SERIAL PRIMARY KEY), vin (UNIQUE), make, model, year, color, mileage, condition, purchase_price, lot_date
- Set vin as UNIQUE and NOT NULL
- Use INTEGER for year and mileage, DECIMAL(10,2) for purchase_price
- Execute the SQL and commit
- Print a confirmation message

**Instructions:** Create the sales table to store sales transactions.
- Write a CREATE TABLE statement for the sales table
- Include columns: sale_id (SERIAL PRIMARY KEY), customer_name, vehicle_make, vehicle_model, sale_date, sale_price, salesperson, payment_method, trade_in_value, year
- Use appropriate data types: VARCHAR for text fields, DATE for sale_date, DECIMAL(10,2) for prices, INTEGER for year
- Set NOT NULL for key fields like sale_date and sale_price
- Execute the SQL and commit
- Print a confirmation message

In [53]:
# Create sales table
create_sales = """
DROP TABLE IF EXISTS sales CASCADE;
CREATE TABLE sales (
    sale_id SERIAL PRIMARY KEY,
    customer_name VARCHAR(100),
    vehicle_make VARCHAR(50),
    vehicle_model VARCHAR(50),
    year INTEGER,
    sale_date DATE NOT NULL,
    sale_price DECIMAL(10,2) NOT NULL,
    salesperson VARCHAR(100),
    payment_method VARCHAR(20),
    trade_in_value DECIMAL(10,2)
);
"""

with engine.connect() as conn:
    conn.execute(text(create_sales))
    conn.commit()
    
print("✓ sales table created")

✓ sales table created


In [54]:
# Create service_records table
create_service = """
DROP TABLE IF EXISTS service_records CASCADE;
CREATE TABLE service_records (
    service_id SERIAL PRIMARY KEY,
    vin VARCHAR(17),
    service_date DATE NOT NULL,
    service_type VARCHAR(50),
    mechanic_name VARCHAR(100),
    labor_cost DECIMAL(8,2),
    parts_cost DECIMAL(8,2),
    notes TEXT
);
"""

with engine.connect() as conn:
    conn.execute(text(create_service))
    conn.commit()
    
print("✓ service_records table created")

✓ service_records table created


**Instructions:** Create the service_records table to store vehicle service history.
- Write a CREATE TABLE statement for the service_records table
- Include columns: service_id (SERIAL PRIMARY KEY), vin, service_date, service_type, mechanic_name, labor_cost, parts_cost, notes
- Use appropriate data types: VARCHAR for text, DATE for service_date, DECIMAL(8,2) for costs, TEXT for notes
- Set service_date as NOT NULL
- Execute the SQL and commit
- Print a confirmation message

In [55]:
# Create financing table
create_financing = """
DROP TABLE IF EXISTS financing CASCADE;
CREATE TABLE financing (
    financing_id SERIAL PRIMARY KEY,
    sale_id INTEGER,
    lender_name VARCHAR(100),
    loan_amount DECIMAL(10,2),
    interest_rate DECIMAL(4,2),
    term_months INTEGER,
    monthly_payment DECIMAL(8,2),
    approval_date DATE,
    down_payment DECIMAL(10,2)
);
"""

with engine.connect() as conn:
    conn.execute(text(create_financing))
    conn.commit()
    
print("✓ financing table created")

✓ financing table created


**Instructions:** Create the financing table to store loan and financing information.
- Write a CREATE TABLE statement for the financing table
- Include columns: financing_id (SERIAL PRIMARY KEY), sale_id, lender_name, loan_amount, interest_rate, term_months, monthly_payment, approval_date, down_payment
- Use DECIMAL(10,2) for money amounts, DECIMAL(4,2) for interest_rate, INTEGER for term_months, DATE for approval_date
- Execute the SQL and commit
- Print a confirmation message

**Instructions:** Create the warranties table to store warranty coverage details.
- Write a CREATE TABLE statement for the warranties table
- Include columns: warranty_id (SERIAL PRIMARY KEY), vehicle_id, warranty_type, provider, start_date, end_date, coverage_amount, deductible, status
- Use appropriate data types: VARCHAR for text fields, DATE for dates, DECIMAL for monetary values
- Execute the SQL and commit
- Print confirmation message indicating all 7 tables are created

In [56]:
# Create warranties table
create_warranties = """
DROP TABLE IF EXISTS warranties CASCADE;
CREATE TABLE warranties (
    warranty_id SERIAL PRIMARY KEY,
    vehicle_id INTEGER,
    warranty_type VARCHAR(50),
    provider VARCHAR(100),
    start_date DATE,
    end_date DATE,
    coverage_amount DECIMAL(10,2),
    deductible DECIMAL(6,2),
    status VARCHAR(20)
);
"""

with engine.connect() as conn:
    conn.execute(text(create_warranties))
    conn.commit()
    
print("✓ warranties table created")
print("\n✓ All 7 tables created successfully!")

✓ warranties table created

✓ All 7 tables created successfully!


## Part 3: Load Data with PostgreSQL COPY

We'll use PostgreSQL's native COPY command for efficient bulk loading of CSV data.

**Instructions:** Load customer data using PostgreSQL's COPY command.
- Construct the full file path to 'clean_customer_data.csv' using os.path.join()
- Create a COPY command string using the `\copy` syntax for client-side loading
- Specify all column names in order: customer_id, first_name, last_name, full_name, email, phone, state, zip_code, registration_date
- Use FORMAT CSV and HEADER TRUE options
- Execute the COPY command using subprocess.run() with psql
- If successful, query the count of loaded records
- Reset the sequence using setval() with the MAX(customer_id) value
- Print the number of customers loaded
- If there's an error, print the error message

In [1]:
# Load customers using PostgreSQL COPY command
customers_file = os.path.join(DATA_PATH, 'clean_customer_data.csv')

copy_command = f"""\\copy customers(customer_id, first_name, last_name, full_name, email, phone, state, zip_code, registration_date) 
FROM '{customers_file}' 
WITH (FORMAT CSV, HEADER TRUE);"""

result = subprocess.run(
    ['psql', '-d', DB_NAME, '-U', DB_USER],
    input=copy_command,
    capture_output=True,
    text=True
)

if result.returncode == 0:
    with engine.connect() as conn:
        count = conn.execute(text("SELECT COUNT(*) FROM customers")).scalar()
        max_id = conn.execute(text("SELECT MAX(customer_id) FROM customers")).scalar()
        if max_id is not None:
            conn.execute(text(f"SELECT setval('customers_customer_id_seq', {max_id});"))
        conn.commit()
    print(f"✓ Loaded {count} customers using COPY")
else:
    print(f"Error: {result.stderr}")

NameError: name 'os' is not defined

In [58]:
# Load salespeople using PostgreSQL COPY command
salespeople_file = os.path.join(DATA_PATH, 'clean_salesperson_info.csv')

copy_command = f"""\\copy salespeople(salesperson_id, salesperson_name, hire_date, email, phone, commission_rate, department, status) 
FROM '{salespeople_file}' 
WITH (FORMAT CSV, HEADER TRUE);"""

result = subprocess.run(
    ['psql', '-d', DB_NAME, '-U', DB_USER],
    input=copy_command,
    capture_output=True,
    text=True
)

if result.returncode == 0:
    with engine.connect() as conn:
        count = conn.execute(text("SELECT COUNT(*) FROM salespeople")).scalar()
        max_id = conn.execute(text("SELECT MAX(salesperson_id) FROM salespeople")).scalar()
        if max_id is not None:
            conn.execute(text(f"SELECT setval('salespeople_salesperson_id_seq', {max_id});"))
        conn.commit()
    print(f"✓ Loaded {count} salespeople using COPY")
else:
    print(f"Error: {result.stderr}")

✓ Loaded 0 salespeople using COPY


**Instructions:** Load salesperson data using PostgreSQL's COPY command.
- Construct the full file path to 'clean_salesperson_info.csv'
- Create a COPY command with columns: salesperson_id, salesperson_name, hire_date, email, phone, commission_rate, department, status
- Execute using subprocess.run() with psql
- Query the count and reset the sequence
- Print the number of salespeople loaded or error message

In [59]:
# Load vehicles using PostgreSQL COPY command
vehicles_file = os.path.join(DATA_PATH, 'clean_vehicle_inventory.csv')

copy_command = f"""\\copy vehicles(vehicle_id, vin, make, model, year, color, mileage, condition, purchase_price, lot_date) 
FROM '{vehicles_file}' 
WITH (FORMAT CSV, HEADER TRUE);"""

result = subprocess.run(
    ['psql', '-d', DB_NAME, '-U', DB_USER],
    input=copy_command,
    capture_output=True,
    text=True
)

if result.returncode == 0:
    with engine.connect() as conn:
        count = conn.execute(text("SELECT COUNT(*) FROM vehicles")).scalar()
        max_id = conn.execute(text("SELECT MAX(vehicle_id) FROM vehicles")).scalar()
        if max_id is not None:
            conn.execute(text(f"SELECT setval('vehicles_vehicle_id_seq', {max_id});"))
        conn.commit()
    print(f"✓ Loaded {count} vehicles using COPY")
else:
    print(f"Error: {result.stderr}")

✓ Loaded 0 vehicles using COPY


**Instructions:** Load vehicle data using PostgreSQL's COPY command.
- Construct the full file path to 'clean_vehicle_inventory.csv'
- Create a COPY command with columns: vehicle_id, vin, make, model, year, color, mileage, condition, purchase_price, lot_date
- Execute using subprocess.run() with psql
- Query the count and reset the sequence using setval()
- Print the number of vehicles loaded or error message

**Instructions:** Load sales data using PostgreSQL's COPY command.
- Construct the full file path to 'clean_dealership_sales.csv'
- Create a COPY command with columns: sale_id, customer_name, vehicle_make, vehicle_model, year, sale_date, sale_price, salesperson, payment_method, trade_in_value
- Execute using subprocess.run() with psql
- Query the count and reset the sequence
- Print the number of sales loaded or error message

In [60]:
# Load sales using PostgreSQL COPY command
sales_file = os.path.join(DATA_PATH, 'clean_dealership_sales.csv')

copy_command = f"""\\copy sales(sale_id, customer_name, vehicle_make, vehicle_model, year, sale_date, sale_price, salesperson, payment_method, trade_in_value) 
FROM '{sales_file}' 
WITH (FORMAT CSV, HEADER TRUE);"""

result = subprocess.run(
    ['psql', '-d', DB_NAME, '-U', DB_USER],
    input=copy_command,
    capture_output=True,
    text=True
)

if result.returncode == 0:
    with engine.connect() as conn:
        count = conn.execute(text("SELECT COUNT(*) FROM sales")).scalar()
        max_id = conn.execute(text("SELECT MAX(sale_id) FROM sales")).scalar()
        if max_id is not None:
            conn.execute(text(f"SELECT setval('sales_sale_id_seq', {max_id});"))
        conn.commit()
    print(f"✓ Loaded {count} sales using COPY")
else:
    print(f"Error: {result.stderr}")

✓ Loaded 0 sales using COPY


In [61]:
# Load service_records using PostgreSQL COPY command
service_file = os.path.join(DATA_PATH, 'clean_service_records.csv')

copy_command = f"""\\copy service_records(service_id, vin, service_date, service_type, mechanic_name, labor_cost, parts_cost, notes) 
FROM '{service_file}' 
WITH (FORMAT CSV, HEADER TRUE);"""

result = subprocess.run(
    ['psql', '-d', DB_NAME, '-U', DB_USER],
    input=copy_command,
    capture_output=True,
    text=True
)

if result.returncode == 0:
    with engine.connect() as conn:
        count = conn.execute(text("SELECT COUNT(*) FROM service_records")).scalar()
        max_id = conn.execute(text("SELECT MAX(service_id) FROM service_records")).scalar()
        if max_id is not None:
            conn.execute(text(f"SELECT setval('service_records_service_id_seq', {max_id});"))
        conn.commit()
    print(f"✓ Loaded {count} service_records using COPY")
else:
    print(f"Error: {result.stderr}")

✓ Loaded 0 service_records using COPY


**Instructions:** Load service records data using PostgreSQL's COPY command.
- Construct the full file path to 'clean_service_records.csv'
- Create a COPY command with columns: service_id, vin, service_date, service_type, mechanic_name, labor_cost, parts_cost, notes
- Execute using subprocess.run() with psql
- Query the count and reset the sequence
- Print the number of service records loaded or error message

In [62]:
# Load financing using PostgreSQL COPY command
financing_file = os.path.join(DATA_PATH, 'clean_financing_details.csv')

copy_command = f"""\\copy financing(financing_id, sale_id, lender_name, loan_amount, interest_rate, term_months, monthly_payment, approval_date, down_payment) 
FROM '{financing_file}' 
WITH (FORMAT CSV, HEADER TRUE);"""

result = subprocess.run(
    ['psql', '-d', DB_NAME, '-U', DB_USER],
    input=copy_command,
    capture_output=True,
    text=True
)

if result.returncode == 0:
    with engine.connect() as conn:
        count = conn.execute(text("SELECT COUNT(*) FROM financing")).scalar()
        max_id = conn.execute(text("SELECT MAX(financing_id) FROM financing")).scalar()
        if max_id is not None:
            conn.execute(text(f"SELECT setval('financing_financing_id_seq', {max_id});"))
        conn.commit()
    print(f"✓ Loaded {count} financing records using COPY")
else:
    print(f"Error: {result.stderr}")

✓ Loaded 0 financing records using COPY


**Instructions:** Load financing data using PostgreSQL's COPY command.
- Construct the full file path to 'clean_financing_details.csv'
- Create a COPY command with columns: financing_id, sale_id, lender_name, loan_amount, interest_rate, term_months, monthly_payment, approval_date, down_payment
- Execute using subprocess.run() with psql
- Query the count and reset the sequence
- Print the number of financing records loaded or error message

**Instructions:** Load warranty data using PostgreSQL's COPY command.
- Construct the full file path to 'clean_warranty_info.csv'
- Create a COPY command with columns: warranty_id, vehicle_id, warranty_type, provider, start_date, end_date, coverage_amount, deductible, status
- Execute using subprocess.run() with psql
- Query the count and reset the sequence
- Print the number of warranties loaded or error message
- Print a final success message indicating all data has been loaded

In [63]:
# Load warranties using PostgreSQL COPY command
warranties_file = os.path.join(DATA_PATH, 'clean_warranty_info.csv')

copy_command = f"""\\copy warranties(warranty_id, vehicle_id, warranty_type, provider, start_date, end_date, coverage_amount, deductible, status) 
FROM '{warranties_file}' 
WITH (FORMAT CSV, HEADER TRUE);"""

result = subprocess.run(
    ['psql', '-d', DB_NAME, '-U', DB_USER],
    input=copy_command,
    capture_output=True,
    text=True
)

if result.returncode == 0:
    with engine.connect() as conn:
        count = conn.execute(text("SELECT COUNT(*) FROM warranties")).scalar()
        max_id = conn.execute(text("SELECT MAX(warranty_id) FROM warranties")).scalar()
        if max_id is not None:
            conn.execute(text(f"SELECT setval('warranties_warranty_id_seq', {max_id});"))
        conn.commit()
    print(f"✓ Loaded {count} warranties using COPY")
    print("\n✓ All data loaded successfully!")
else:
    print(f"Error: {result.stderr}")

✓ Loaded 0 warranties using COPY

✓ All data loaded successfully!


## Part 4: Basic SELECT Queries

**Instructions:** Write a SELECT query to list all active salespeople.
- Select columns: salesperson_id, salesperson_name, hire_date, email, commission_rate
- Filter WHERE status = 'active'
- ORDER BY hire_date
- Use pd.read_sql() to execute the query and display results

In [64]:
# Exercise 11.1: List active salespeople
query = """
SELECT salesperson_id, salesperson_name, hire_date, email, commission_rate
FROM salespeople
WHERE status = 'active'
ORDER BY hire_date;
"""

result = pd.read_sql(query, engine)
print("Active Salespeople:")
print(result)

Active Salespeople:
Empty DataFrame
Columns: [salesperson_id, salesperson_name, hire_date, email, commission_rate]
Index: []


In [65]:
# Exercise 11.2: List vehicles with active warranties
query = """
SELECT v.make, v.model, v.year, w.warranty_type, w.end_date
FROM vehicles v
JOIN warranties w ON v.vehicle_id = w.vehicle_id
WHERE w.status = 'active'
ORDER BY w.end_date;
"""

result = pd.read_sql(query, engine)
print("Vehicles with Active Warranties:")
print(result)

Vehicles with Active Warranties:
Empty DataFrame
Columns: [make, model, year, warranty_type, end_date]
Index: []


**Instructions:** Write a SELECT query with JOIN to list vehicles with active warranties.
- Join vehicles and warranties tables on vehicle_id
- Select: make, model, year, warranty_type, end_date
- Filter WHERE status = 'active'
- ORDER BY end_date
- Execute and display results

## Part 5: Multi-Table JOINs

**Instructions:** Write a complex query joining sales and financing tables to show complete sales information.
- Join sales and financing tables on sale_id
- Select: sale_id, sale_date, customer_name, vehicle_make || ' ' || vehicle_model AS vehicle, salesperson, sale_price, lender_name, loan_amount, interest_rate
- Use LEFT JOIN for financing (some sales may not have financing)
- ORDER BY sale_date DESC
- Display the first 10 results

In [66]:
# Exercise 5.1: Complete sales information with financing
query = """
SELECT 
    s.sale_id,
    s.sale_date,
    s.customer_name,
    s.vehicle_make || ' ' || s.vehicle_model AS vehicle,
    s.salesperson,
    s.sale_price,
    f.lender_name,
    f.loan_amount,
    f.interest_rate
FROM sales s
LEFT JOIN financing f ON s.sale_id = f.sale_id
ORDER BY s.sale_date DESC;
"""

result = pd.read_sql(query, engine)
print("Complete Sales with Financing:")
print(result.head(10))

Complete Sales with Financing:
Empty DataFrame
Columns: [sale_id, sale_date, customer_name, vehicle, salesperson, sale_price, lender_name, loan_amount, interest_rate]
Index: []


In [67]:
# Exercise 5.2: Vehicle service history
query = """
SELECT 
    v.vin,
    v.make,
    v.model,
    v.year,
    sr.service_date,
    sr.service_type,
    sr.mechanic_name,
    sr.labor_cost + sr.parts_cost AS total_cost
FROM vehicles v
JOIN service_records sr ON v.vin = sr.vin
ORDER BY v.vin, sr.service_date;
"""

result = pd.read_sql(query, engine)
print("Vehicle Service History:")
print(result.head(15))

Vehicle Service History:
Empty DataFrame
Columns: [vin, make, model, year, service_date, service_type, mechanic_name, total_cost]
Index: []


**Instructions:** Write a query to show complete vehicle service history.
- Join vehicles and service_records tables on vin
- Select: v.vin, make, model, year, service_date, service_type, mechanic_name
- Calculate total_cost as labor_cost + parts_cost
- ORDER BY vin, then service_date
- Display first 15 results

## Part 6: Aggregate Functions and GROUP BY

**Instructions:** Write a query analyzing salesperson performance with commissions.
- Join salespeople and sales tables on salesperson name
- GROUP BY salesperson_name and commission_rate
- Calculate: COUNT sale_id as total_sales, SUM(sale_price) as total_revenue, AVG(sale_price) as avg_sale
- Calculate total_commission as SUM(sale_price * commission_rate)
- Filter WHERE status = 'active'
- ORDER BY total_revenue DESC
- Execute and display all results

In [68]:
# Exercise 6.1: Salesperson performance with commission
query = """
SELECT 
    sp.salesperson_name,
    COUNT(s.sale_id) AS total_sales,
    SUM(s.sale_price) AS total_revenue,
    AVG(s.sale_price) AS avg_sale,
    SUM(s.sale_price * sp.commission_rate) AS total_commission
FROM salespeople sp
JOIN sales s ON sp.salesperson_name = s.salesperson
WHERE sp.status = 'active'
GROUP BY sp.salesperson_name, sp.commission_rate
ORDER BY total_revenue DESC;
"""

result = pd.read_sql(query, engine)
print("Salesperson Performance with Commission:")
print(result)

Salesperson Performance with Commission:
Empty DataFrame
Columns: [salesperson_name, total_sales, total_revenue, avg_sale, total_commission]
Index: []


In [69]:
# Exercise 6.2: Service costs by mechanic
query = """
SELECT 
    mechanic_name,
    COUNT(service_id) AS service_count,
    SUM(labor_cost) AS total_labor,
    SUM(parts_cost) AS total_parts,
    SUM(labor_cost + parts_cost) AS total_revenue,
    AVG(labor_cost + parts_cost) AS avg_service_cost
FROM service_records
GROUP BY mechanic_name
ORDER BY total_revenue DESC;
"""

result = pd.read_sql(query, engine)
print("Service Revenue by Mechanic:")
print(result)

Service Revenue by Mechanic:
Empty DataFrame
Columns: [mechanic_name, service_count, total_labor, total_parts, total_revenue, avg_service_cost]
Index: []


**Instructions:** Write a query analyzing service revenue by mechanic.
- Use service_records table
- GROUP BY mechanic_name
- Calculate: COUNT(service_id), SUM(labor_cost), SUM(parts_cost), SUM(labor_cost + parts_cost) as total_revenue, AVG(labor_cost + parts_cost) as avg_service_cost
- ORDER BY total_revenue DESC
- Execute and display results

In [70]:
# Exercise 6.3: Financing by lender
query = """
SELECT 
    lender_name,
    COUNT(financing_id) AS loan_count,
    SUM(loan_amount) AS total_financed,
    AVG(interest_rate) AS avg_rate,
    AVG(term_months) AS avg_term
FROM financing
GROUP BY lender_name
ORDER BY total_financed DESC;
"""

result = pd.read_sql(query, engine)
print("Financing by Lender:")
print(result)

Financing by Lender:
Empty DataFrame
Columns: [lender_name, loan_count, total_financed, avg_rate, avg_term]
Index: []


**Instructions:** Write a query analyzing financing by lender.
- Use financing table
- GROUP BY lender_name
- Calculate: COUNT(financing_id) as loan_count, SUM(loan_amount) as total_financed, AVG(interest_rate), AVG(term_months)
- ORDER BY total_financed DESC
- Execute and display results

## Part 7: Advanced Queries with CTEs and Subqueries

**Instructions:** Write a query using CTEs to find top-performing salespeople above average.
- Create CTE 'sales_stats' to calculate sale_count and total_revenue per salesperson from sales table
- Create CTE 'avg_performance' to calculate the average revenue across all salespeople
- Join salespeople with sales_stats CTE on salesperson_name
- Cross join with avg_performance
- Calculate how much above average each top performer is
- Filter to show only those with revenue > average
- ORDER BY total_revenue DESC
- Execute and display results

In [71]:
# Exercise 7.1: Top performing salespeople (above average)
query = """
WITH sales_stats AS (
    SELECT 
        salesperson,
        COUNT(*) AS sale_count,
        SUM(sale_price) AS total_revenue
    FROM sales
    GROUP BY salesperson
),
avg_performance AS (
    SELECT AVG(total_revenue) AS avg_revenue
    FROM sales_stats
)
SELECT 
    sp.salesperson_name,
    ss.sale_count,
    ss.total_revenue,
    ap.avg_revenue,
    ss.total_revenue - ap.avg_revenue AS above_average
FROM salespeople sp
JOIN sales_stats ss ON sp.salesperson_name = ss.salesperson
CROSS JOIN avg_performance ap
WHERE ss.total_revenue > ap.avg_revenue
ORDER BY ss.total_revenue DESC;
"""

result = pd.read_sql(query, engine)
print("Top Performers (Above Average):")
print(result)

Top Performers (Above Average):
Empty DataFrame
Columns: [salesperson_name, sale_count, total_revenue, avg_revenue, above_average]
Index: []


In [72]:
# Exercise 7.2: Vehicles with high service costs
query = """
WITH vehicle_service_costs AS (
    SELECT 
        vin,
        SUM(labor_cost + parts_cost) AS total_service_cost,
        COUNT(*) AS service_count
    FROM service_records
    GROUP BY vin
)
SELECT 
    v.make,
    v.model,
    v.year,
    vsc.service_count,
    vsc.total_service_cost,
    v.purchase_price,
    ROUND((vsc.total_service_cost / v.purchase_price * 100), 2) AS cost_percentage
FROM vehicles v
JOIN vehicle_service_costs vsc ON v.vin = vsc.vin
WHERE vsc.total_service_cost > 200
ORDER BY vsc.total_service_cost DESC;
"""

result = pd.read_sql(query, engine)
print("Vehicles with High Service Costs:")
print(result)

Vehicles with High Service Costs:
Empty DataFrame
Columns: [make, model, year, service_count, total_service_cost, purchase_price, cost_percentage]
Index: []


**Instructions:** Write a query using CTE to find vehicles with high service costs.
- Create CTE 'vehicle_service_costs' to calculate total_service_cost (labor + parts) and service_count per vin from service_records
- Join vehicles with the CTE on vin
- Calculate cost_percentage as (total_service_cost / purchase_price * 100)
- Filter WHERE total_service_cost > 200
- ORDER BY total_service_cost DESC
- Execute and display results

## Part 8: Window Functions

**Instructions:** Write a query using window functions to rank salespeople by monthly performance.
- Use DATE_TRUNC('month', sale_date) to group by month
- Use sales table
- GROUP BY month and salesperson
- Calculate sales_count and monthly_revenue
- Use RANK() OVER (PARTITION BY month ORDER BY revenue DESC) to rank within each month
- ORDER BY month, then rank
- Display first 20 results

In [73]:
# Exercise 8.1: Ranking salespeople by monthly sales
query = """
SELECT 
    DATE_TRUNC('month', sale_date) AS month,
    salesperson,
    COUNT(*) AS sales_count,
    SUM(sale_price) AS monthly_revenue,
    RANK() OVER (PARTITION BY DATE_TRUNC('month', sale_date) 
                 ORDER BY SUM(sale_price) DESC) AS rank
FROM sales
GROUP BY DATE_TRUNC('month', sale_date), salesperson
ORDER BY month, rank;
"""

result = pd.read_sql(query, engine)
print("Monthly Sales Rankings:")
print(result.head(20))

Monthly Sales Rankings:
Empty DataFrame
Columns: [month, salesperson, sales_count, monthly_revenue, rank]
Index: []


## Part 9: UPDATE Operations

**Instructions:** Write an UPDATE statement to update warranty status for expired warranties.
- UPDATE the warranties table
- SET status = 'expired'
- WHERE end_date < CURRENT_DATE AND status = 'active'
- Execute using engine.connect() and text()
- Commit the transaction
- Print the number of rows updated (rowcount)

In [74]:
# Exercise 9.1: Expire old warranties
update_query = """
UPDATE warranties
SET status = 'expired'
WHERE end_date < CURRENT_DATE AND status = 'active';
"""

with engine.connect() as conn:
    result = conn.execute(text(update_query))
    conn.commit()
    print(f"✓ Updated {result.rowcount} warranties to expired status")

✓ Updated 0 warranties to expired status


In [75]:
# Exercise 9.2: Increase commission for top performers
update_query = """
UPDATE salespeople
SET commission_rate = commission_rate * 1.1
WHERE salesperson_name IN (
    SELECT salesperson
    FROM sales
    GROUP BY salesperson
    ORDER BY SUM(sale_price) DESC
    LIMIT 2
);
"""

with engine.connect() as conn:
    result = conn.execute(text(update_query))
    conn.commit()
    print(f"✓ Updated {result.rowcount} salespeople commission rates")

✓ Updated 0 salespeople commission rates


**Instructions:** Write an UPDATE statement to increase commission rates for top performers.
- UPDATE the salespeople table
- SET commission_rate = commission_rate * 1.1 (10% increase)
- WHERE salesperson_name IN (subquery selecting top 2 salespeople by total sales revenue)
- Use a subquery that joins sales table, groups by salesperson, and orders by total revenue DESC with LIMIT 2
- Execute, commit, and print the rowcount

## Part 10: INSERT Operations

**Instructions:** Write an INSERT statement to add a new salesperson.
- INSERT INTO salespeople table
- Add values for: salesperson_name ('Jennifer Adams'), hire_date (CURRENT_DATE), email ('jadams@dealership.com'), phone ('555-0199'), commission_rate (0.045), department ('Sales'), status ('active')
- Use RETURNING clause to get back the salesperson_id and salesperson_name
- Execute, fetch the result, and commit
- Print the new salesperson information

In [76]:
# Exercise 10.1: Add new salesperson
insert_query = """
INSERT INTO salespeople (salesperson_name, hire_date, email, phone, commission_rate, department, status)
VALUES ('Jennifer Adams', CURRENT_DATE, 'jadams@dealership.com', '555-0199', 0.045, 'Sales', 'active')
RETURNING salesperson_id, salesperson_name;
"""

with engine.connect() as conn:
    result = conn.execute(text(insert_query))
    new_salesperson = result.fetchone()
    conn.commit()
    print(f"✓ Added salesperson: {new_salesperson[1]} (ID: {new_salesperson[0]})")

✓ Added salesperson: Jennifer Adams (ID: 1)


## Part 11: Comprehensive Business Analytics

**Instructions:** Write a comprehensive query using UNION ALL to create a dealership performance report.
- Create multiple SELECT statements combined with UNION ALL
- Each SELECT should return: category, metric_value (as TEXT), metric_name
- Include metrics for:
  - Sales: total transactions, total revenue
  - Service: total services, service revenue
  - Financing: total loans, amount financed
  - Inventory: total vehicles
- Use TO_CHAR() for currency formatting
- Execute and display with formatted headers

In [77]:
# Exercise 11.1: Complete dealership performance report
query = """
SELECT 
    'Sales' AS category,
    COUNT(DISTINCT sale_id)::TEXT AS metric_value,
    'Total Transactions' AS metric_name
FROM sales
UNION ALL
SELECT 'Sales', TO_CHAR(SUM(sale_price), 'FM$999,999,999.00'), 'Total Revenue' FROM sales
UNION ALL
SELECT 'Service', COUNT(*)::TEXT, 'Total Services' FROM service_records
UNION ALL
SELECT 'Service', TO_CHAR(SUM(labor_cost + parts_cost), 'FM$999,999,999.00'), 'Service Revenue' FROM service_records
UNION ALL
SELECT 'Financing', COUNT(*)::TEXT, 'Total Loans' FROM financing
UNION ALL
SELECT 'Financing', TO_CHAR(SUM(loan_amount), 'FM$999,999,999.00'), 'Amount Financed' FROM financing
UNION ALL
SELECT 'Inventory', COUNT(*)::TEXT, 'Total Vehicles' FROM vehicles;
"""

result = pd.read_sql(query, engine)
print("="*80)
print("DEALERSHIP PERFORMANCE REPORT")
print("="*80)
print(result)
print("="*80)

DEALERSHIP PERFORMANCE REPORT
    category metric_value         metric_name
0      Sales            0  Total Transactions
1      Sales         None       Total Revenue
2    Service            0      Total Services
3    Service         None     Service Revenue
4  Financing            0         Total Loans
5  Financing         None     Amount Financed
6  Inventory            0      Total Vehicles


In [78]:
# Exercise 11.2: Customer purchase analysis
query = """
WITH customer_purchases AS (
    SELECT 
        customer_name,
        COUNT(sale_id) AS purchase_count,
        SUM(sale_price) AS total_spent
    FROM sales
    GROUP BY customer_name
)
SELECT 
    customer_name,
    purchase_count,
    total_spent,
    CASE 
        WHEN total_spent > 50000 THEN 'Premium'
        WHEN total_spent > 30000 THEN 'High Value'
        WHEN total_spent > 0 THEN 'Standard'
        ELSE 'No Purchases'
    END AS customer_tier
FROM customer_purchases
WHERE purchase_count > 0
ORDER BY total_spent DESC;
"""

result = pd.read_sql(query, engine)
print("Customer Purchase Analysis:")
print(result)

Customer Purchase Analysis:
Empty DataFrame
Columns: [customer_name, purchase_count, total_spent, customer_tier]
Index: []


**Instructions:** Write a query to analyze customer purchase patterns with tier classification.
- Create CTE 'customer_purchases' to calculate purchase_count and total_spent per customer from sales table (GROUP BY customer_name)
- Use CASE statement to classify customers into tiers:
  - 'Premium' if total_spent > 50000
  - 'High Value' if total_spent > 30000
  - 'Standard' if total_spent > 0
  - 'No Purchases' otherwise
- Filter to show only customers with purchases (purchase_count > 0)
- ORDER BY total_spent DESC
- Execute and display results

## Summary

**Congratulations! You've completed the comprehensive final project.**

### ✓ Part 1 (R) - Data Cleaning:
- Cleaned 7 messy CSV files with 100+ total records
- Applied text standardization, date parsing, missing value handling
- Calculated missing values using formulas
- Exported clean CSV files ready for database loading

### ✓ Part 2 (PostgreSQL) - Database Operations:
- Created 7-table database with proper data types and constraints
- Loaded all cleaned data using PostgreSQL COPY commands
- Performed 15+ advanced SQL exercises:
  - Basic SELECT queries with filtering
  - Multi-table JOINs (2-3 tables)
  - Aggregate functions with GROUP BY
  - CTEs and subqueries
  - Window functions and rankings
  - UPDATE and INSERT operations
  - Comprehensive business analytics with UNION ALL

### Skills Mastered:
- Data quality assessment and cleaning (R)
- Database design with appropriate table structures
- PostgreSQL COPY command for bulk data loading
- Advanced SQL query techniques
- Business intelligence and reporting
- End-to-end data workflow from messy CSVs to actionable insights

**Database Structure:**
- customers (30 records)
- salespeople (10 records)
- vehicles (25 records)
- sales (25 transactions)
- service_records (20 records)
- financing (15 records)
- warranties (20 records)

**This project demonstrates professional-level data management and SQL analysis skills!**