## Project Checkpoint #4: "Dream Homes NYC" Project ETL 

David Skorodinsky, Daniel Jesus Jesurum Cumberbatch, Arden Haggin <br>
Professor Machairas <br> 
APAN PS5310: SQL & Relational Databases <br>
1 August 2025 <br>

### Jupyter Notebook Setup and PostgreSQL Engine Connection

In [1]:
# Importing packages

import pandas as pd
from sqlalchemy import create_engine, text

In [2]:
# Establishing constants (environment variables)
DATABASE = "SQL_Class_Project"
PORT = 5432
PASSWORD = "123" # changed from password because Arden's is different

conn_url = f'postgresql://postgres:{PASSWORD}@localhost:{str(PORT)}/{DATABASE}'

# Create an engine that connects to PostgreSQL server
engine = create_engine(conn_url)

### 1. Clearing Existing Tables (ETL Reproducibility)

In [3]:
# Drop existing tables if they exist

# Establish a connection
connection = engine.connect()
transaction = connection.begin()
stmt = text("""

DROP TABLE IF EXISTS properties, clients, sales, listings, parks, parks_properties, schools, schools_properties, service_requests, violations, offices, positions, departments, employees, payrolls, commissions, expense_categories, expenses, equipment_types, equipment, appointments, listing_features;

""")

# Execute the statement
connection.execute(stmt)
transaction.commit()
connection.close()

### 2. Table Creation

In [4]:
# Create tables

connection = engine.connect()

transaction = connection.begin()

stmt = text("""

CREATE TABLE properties (
    property_id SERIAL PRIMARY KEY,
    building_number VARCHAR(50) NOT NULL,
    street_name VARCHAR(50) NOT NULL,
    apartment VARCHAR(5),
    city VARCHAR(20) NOT NULL,
    state CHAR(2) NOT NULL,
    zip_code CHAR(5) NOT NULL
);

CREATE TABLE clients (
    client_id SERIAL PRIMARY KEY,
    first_name VARCHAR(50) NOT NULL,
    last_name VARCHAR(50) NOT NULL,
    family_size INT,
    income NUMERIC(10,2)
);

CREATE TABLE sales (
    sale_id SERIAL PRIMARY KEY,
    client_id INT NOT NULL REFERENCES clients(client_id) ON UPDATE CASCADE,
    property_id INT NOT NULL REFERENCES properties(property_id) ON UPDATE CASCADE,
    sale_price NUMERIC(11,2) NOT NULL,
    sale_date DATE NOT NULL
);

CREATE TABLE listings (
    listing_id SERIAL PRIMARY KEY,
    property_id INT NOT NULL REFERENCES properties(property_id) ON UPDATE CASCADE,
    listing_type VARCHAR(20) NOT NULL,
    listing_price NUMERIC(11,2) NOT NULL,
    sale_id INT REFERENCES sales(sale_id) ON UPDATE CASCADE,
    active BOOLEAN NOT NULL DEFAULT TRUE
);

CREATE TABLE parks (
    p_id SERIAL PRIMARY KEY,
    park_name VARCHAR(50) NOT NULL,
    zip_code CHAR(5) NOT NULL
);

CREATE TABLE parks_properties (
    p_id INT NOT NULL REFERENCES parks(p_id) ON UPDATE CASCADE,
    property_id INT NOT NULL REFERENCES properties(property_id) ON UPDATE CASCADE,
    PRIMARY KEY (p_id, property_id)
);

CREATE TABLE schools (
    s_id SERIAL PRIMARY KEY,
    school_name VARCHAR(50) NOT NULL,
    zip_code CHAR(5) NOT NULL
);

CREATE TABLE schools_properties (
    s_id INT NOT NULL REFERENCES schools(s_id) ON UPDATE CASCADE,
    property_id INT NOT NULL REFERENCES properties(property_id) ON UPDATE CASCADE,
    PRIMARY KEY (s_id, property_id)
);

CREATE TABLE service_requests (
    sr_id SERIAL PRIMARY KEY,
    property_id INT NOT NULL REFERENCES properties(property_id) ON UPDATE CASCADE,
    complaint_type VARCHAR(30) NOT NULL
);

CREATE TABLE violations (
    v_id SERIAL PRIMARY KEY,
    property_id INT NOT NULL REFERENCES properties(property_id) ON UPDATE CASCADE,
    class CHAR(1) NOT NULL
);

CREATE TABLE offices (
    office_id SERIAL PRIMARY KEY,
    address VARCHAR(50) NOT NULL,
    city VARCHAR(20) NOT NULL,
    state CHAR(2) NOT NULL,
    phone_number VARCHAR(15) NOT NULL
);

CREATE TABLE positions (
    position_id SERIAL PRIMARY KEY,
    position_title VARCHAR(50) NOT NULL
);

CREATE TABLE departments (
    department_id SERIAL PRIMARY KEY,
    department_name VARCHAR(20) NOT NULL
);

CREATE TABLE employees (
    employee_id SERIAL PRIMARY KEY,
    office_id INT NOT NULL REFERENCES offices(office_id) ON UPDATE CASCADE,
    first_name VARCHAR(50) NOT NULL,
    last_name VARCHAR(50) NOT NULL,
    position_id INT NOT NULL REFERENCES positions(position_id) ON UPDATE CASCADE,
    department_id INT NOT NULL REFERENCES departments(department_id) ON UPDATE CASCADE,
    employee_since DATE NOT NULL,
    email VARCHAR(50) NOT NULL UNIQUE
);

CREATE TABLE payrolls (
    payroll_id SERIAL PRIMARY KEY,
    employee_id INT NOT NULL REFERENCES employees(employee_id) ON UPDATE CASCADE,
    base_salary NUMERIC(10,2) NOT NULL
);

CREATE TABLE commissions (
    commission_id SERIAL PRIMARY KEY,
    employee_id INT NOT NULL REFERENCES employees(employee_id) ON UPDATE CASCADE,
    sale_id INT NOT NULL REFERENCES sales(sale_id) ON UPDATE CASCADE,
    commission_amount NUMERIC(12,2) NOT NULL
);

CREATE TABLE expense_categories (
    expense_category_id SERIAL PRIMARY KEY,
    expense_category_name VARCHAR(20) NOT NULL
);

CREATE TABLE expenses (
    expense_id SERIAL PRIMARY KEY,
    office_id INT NOT NULL REFERENCES offices(office_id) ON UPDATE CASCADE,
    employee_id INT NOT NULL REFERENCES employees(employee_id) ON UPDATE CASCADE,
    expense_date DATE NOT NULL,
    expense_category_id INT NOT NULL REFERENCES expense_categories(expense_category_id) ON UPDATE CASCADE,
    expense_amount NUMERIC(8,2) NOT NULL
);

CREATE TABLE equipment_types (
    equipment_type_id SERIAL PRIMARY KEY,
    equipment_type_name VARCHAR(50) NOT NULL
);

CREATE TABLE equipment (
    equipment_id SERIAL PRIMARY KEY,
    office_id INT NOT NULL REFERENCES offices(office_id) ON UPDATE CASCADE,
    employee_id INT NOT NULL REFERENCES employees(employee_id) ON UPDATE CASCADE,
    equipment_type_id INT NOT NULL REFERENCES equipment_types(equipment_type_id) ON UPDATE CASCADE
);

CREATE TABLE appointments (
    appointment_id SERIAL PRIMARY KEY,
    appointment_type VARCHAR(20) NOT NULL,
    employee_id INT NOT NULL REFERENCES employees(employee_id) ON UPDATE CASCADE,
    client_id INT NOT NULL REFERENCES clients(client_id) ON UPDATE CASCADE,
    listing_id INT NOT NULL REFERENCES listings(listing_id) ON UPDATE CASCADE
);

CREATE TABLE listing_features (
    l_features_id SERIAL PRIMARY KEY,
    listing_id INT NOT NULL REFERENCES listings(listing_id) ON UPDATE CASCADE,
    bedrooms INT NOT NULL,
    bathrooms INT NOT NULL,
    square_feet INT NOT NULL,
    in_unit_washer BOOLEAN NOT NULL,
    dishwasher BOOLEAN NOT NULL,
    outdoor_space BOOLEAN NOT NULL,
    elevator BOOLEAN NOT NULL,
    doorman BOOLEAN NOT NULL,
    laundry_room BOOLEAN NOT NULL,
    pool BOOLEAN NOT NULL,
    gym BOOLEAN NOT NULL,
    rec_room BOOLEAN NOT NULL,
    parking BOOLEAN NOT NULL
);
""")

connection.execute(stmt)

transaction.commit()

connection.close()

### 3. Data Loading

In [5]:
# Properties Table
# Removed Not Null constraint from apartment due to properties w/ no apt
properties = pd.read_csv('properties.csv')
properties.to_sql(name='properties', con=engine, if_exists='append', index=False)

771

In [6]:
# Clients Table
clients = pd.read_csv('clients.csv')
clients.to_sql(name='clients', con=engine, if_exists='append', index=False)

1000

In [7]:
# Sales Table
sales = pd.read_csv('sales.csv')
sales.to_sql(name='sales', con=engine, if_exists='append', index=False)

900

In [8]:
# Listings Table
# Converted active from int to bool
listings = pd.read_csv('listings.csv')
listings[['active']] = listings[['active']].astype(bool)
listings.dtypes
listings.to_sql(name='listings', con=engine, if_exists='append', index=False)

771

In [9]:
# Parks Table
# Removed "property_id"
parks = pd.read_csv('parks.csv')
parks = parks.drop('property_id', axis=1)
parks.to_sql(name='parks', con=engine, if_exists='append', index=False)

573

In [10]:
# Parks properties Table
parks_properties = pd.read_csv('parks_addresses.csv')
parks_properties.to_sql(name='parks_properties', con=engine, if_exists='append', index=False)

573

In [11]:
# Schools Table
# Removed "property_id"
schools = pd.read_csv('schools.csv')
schools = schools.drop('property_id', axis=1)
schools.to_sql(name='schools', con=engine, if_exists='append', index=False)

951

In [12]:
# Schools Properties Table
schools_properties = pd.read_csv('schools_addresses.csv')
schools_properties.to_sql(name='schools_properties', con=engine, if_exists='append', index=False)

951

In [13]:
# Service Requests Table
service_requests = pd.read_csv('service_requests.csv')
service_requests.to_sql(name='service_requests', con=engine, if_exists='append', index=False)

500

In [14]:
# Violations Table
violations = pd.read_csv('violations.csv')
violations.to_sql(name='violations', con=engine, if_exists='append', index=False)

500

In [15]:
# Listing Features Table
# Converting features from int to bool
listing_features = pd.read_csv('listing_features.csv')
bool_columns = ['in_unit_washer', 'dishwasher', 'outdoor_space', 'elevator', 'doorman', 'laundry_room', 'pool', 'gym', 'rec_room', 'parking']
listing_features[bool_columns] = listing_features[bool_columns].astype(bool)
listing_features.dtypes
listing_features.to_sql(name='listing_features', con=engine, if_exists='append', index=False)

771

In [16]:
# Offices Table
offices = pd.read_csv("offices.csv")
offices.to_sql(name='offices', con=engine, if_exists='append', index=False)

3

In [17]:
# Positions Table
# Increased varchar limit to 50 to accomodate longer position titles
positions = pd.read_csv("positions.csv")
positions.to_sql(name='positions', con=engine, if_exists='append', index=False)

10

In [18]:
# Departments Table
departments = pd.read_csv("departments.csv")
departments.to_sql(name='departments', con=engine, if_exists='append', index=False)

5

In [19]:
# Employees Table
employees = pd.read_csv("employees.csv")
employees[['first_name', 'last_name']] = employees[['last_name', 'first_name']] # fixing mismatch originating in R file
employees.to_sql(name='employees', con=engine, if_exists='append', index=False)

250

In [20]:
# Payrolls Table
payrolls = pd.read_csv("payrolls.csv")
payrolls.to_sql(name='payrolls', con=engine, if_exists='append', index=False)

250

In [21]:
# Commissions Table
commissions = pd.read_csv("commissions.csv")
commissions.to_sql(name='commissions', con=engine, if_exists='append', index=False)

900

In [22]:
# Expense Categories Table
expense_categories = pd.read_csv("expense_categories.csv")
expense_categories.to_sql(name='expense_categories', con=engine, if_exists='append', index=False)

5

In [23]:
# Expenses Table
expenses = pd.read_csv("expenses.csv")
expenses.to_sql(name='expenses', con=engine, if_exists='append', index=False)

500

In [24]:
# Equipment Types Table
# Increased varchar limit to 50 to accomodate longer equipment types titles
equipment_types = pd.read_csv("equipment_types.csv")
equipment_types.to_sql(name='equipment_types', con=engine, if_exists='append', index=False)

4

In [25]:
# Equipment Table
equipment = pd.read_csv("equipment.csv")
equipment.to_sql(name='equipment', con=engine, if_exists='append', index=False)

12

In [26]:
# Appointments Table
appointments = pd.read_csv('appointments.csv')
appointments.to_sql(name='appointments', con=engine, if_exists='append', index=False)

575