
# Real Estate Sales ETL Pipeline

## Introduction
This notebook outlines the ETL (Extract, Transform, Load) process for real estate sales data obtained from the [Office of Policy and Management](https://data.ct.gov/Housing-and-Development/Real-Estate-Sales-2001-2022-GL/5mzw-sjtu/about_data). The dataset contains records of real estate sales, including property type, sales price, and assessment values. The primary goal of this project is to clean, transform, and load this data into a PostgreSQL data warehouse to facilitate efficient analysis and reporting on property trends over the years.

By implementing this ETL pipeline, the project demonstrates the ability to clean, structure, and load complex real estate data, making it suitable for analysis. This is crucial for understanding market trends and supporting data-driven decision-making in the real estate industry.


In [1]:
# Import necessary libraries
import pandas as pd

# Data Ingestion

## Load Real Estate Sales Data

In [2]:
# The real estate sales data is loaded from a local CSV file. 
#The 'low_memory=False' option helps ensure that pandas loads the data correctly, especially when dealing with large datasets.
real_estate_sales = pd.read_csv('data/Real_Estate_Sales_2001-2022_GL.csv', low_memory=False)

# Optionally, you can load the latest data directly from the source if needed (uncomment the following line)
# real_estate_sales = pd.read_csv("https://data.ct.gov/resource/5mzw-sjtu.csv", low_memory=False)

In [3]:
# Load Non-Usable Codes Data
# This dataset contains special codes that explain why a sale might not be used for property value assessments.
non_usable_codes = pd.read_csv('data/Non_Usable_codes.csv')

In [4]:
real_estate_sales.head()

Unnamed: 0,Serial Number,List Year,Date Recorded,Town,Address,Assessed Value,Sale Amount,Sales Ratio,Property Type,Residential Type,Non Use Code,Assessor Remarks,OPM remarks,Location
0,2020177,2020,04/14/2021,Ansonia,323 BEAVER ST,133000.0,248400.0,0.5354,Residential,Single Family,,,,POINT (-73.06822 41.35014)
1,2020225,2020,05/26/2021,Ansonia,152 JACKSON ST,110500.0,239900.0,0.4606,Residential,Three Family,,,,
2,2020348,2020,09/13/2021,Ansonia,230 WAKELEE AVE,150500.0,325000.0,0.463,Commercial,,,,,
3,2020090,2020,12/14/2020,Ansonia,57 PLATT ST,127400.0,202500.0,0.6291,Residential,Two Family,,,,
4,200500,2020,09/07/2021,Avon,245 NEW ROAD,217640.0,400000.0,0.5441,Residential,Single Family,,,,


In [5]:
non_usable_codes.head()

Unnamed: 0,Id,Title,Description
0,0,Not special,Nothing special
1,1,FAMILY,Sale between members of the same family.
2,2,LOVE AND AFFECTION,Sale in which 'Love and affection' are part of...
3,3,INTER CORPORATION,"Sales between a corporation and stockholder, s..."
4,4,CORRECTING DEED,"Transfers of convenience; for example, sales f..."


### Data Description
The dataset contains detailed records of real estate transactions. Below is a summary of each column:

#### Real Estate Sales Data:
| **Column Name**        | **Description**                                                                                     | **Data Type**              | **Example Value**          |
|------------------------|-----------------------------------------------------------------------------------------------------|----------------------------|----------------------------|
| **Serial Number**       | A unique identifier for each property sale record.                                                 | Integer                    | 2020177                    |
| **List Year**           | The year the property was listed for sale.                                                         | Integer                    | 2020                       |
| **Date Recorded**       | The date when the sale was recorded in the local jurisdiction.                                      | Date (Floating Timestamp)  | 04/14/2021                 |
| **Town**                | The name of the town where the property is located.                                                | Text                       | Ansonia                    |
| **Address**             | The full address of the property sold.                                                              | Text                       | 323 BEAVER ST              |
| **Assessed Value**      | The value of the property used for local tax assessment.                                            | Float (Number)             | 133000.0                   |
| **Sale Amount**         | The final sale price for the property.                                                              | Float (Number)             | 248400.0                   |
| **Sales Ratio**         | The ratio of the sale price to the assessed value, providing insight into property valuation.       | Float (Number)             | 0.5354                     |
| **Property Type**       | The type of property sold (e.g., Residential, Commercial, Industrial, etc.).                        | Text                       | Residential                |
| **Residential Type**    | Indicates whether the property is a single-family or multi-family residential property.            | Text                       | Single Family              |
| **Non Use Code**        | A code indicating that the sale price is not reliable for determining property value.               | Text                       | NaN                        |
| **Assessor Remarks**    | Remarks or comments made by the property assessor regarding the property.                           | Text                       | NaN                        |
| **OPM Remarks**         | Comments from the Office of Policy and Management regarding the property.                          | Text                       | NaN                        |
| **Location**            | Geographical coordinates (latitude and longitude) representing the property's location.            | Point                      | POINT (-73.06822 41.35014) |

#### Non-Usable Codes Data:
This dataset provides a mapping of codes that describe special cases when property sale data is not reliable for analysis. These cases include sales such as transfers within the same family or sales made for convenience reasons.

These codes will be used to filter or tag specific sale transactions for analysis later.

| **Id** | **Title**              | **Description**                                                                                     |
|--------|------------------------|-----------------------------------------------------------------------------------------------------|
| 0      | Not special            | Nothing special.                                                                                   |
| 1      | FAMILY                 | Sale between members of the same family.                                                           |
| 2      | LOVE AND AFFECTION      | Sale in which 'Love and affection' are part of the deal.                                           |
| 3      | INTER CORPORATION      | Sales between a corporation and stockholder.                                                       |
| 4      | CORRECTING DEED        | Transfers of convenience; for example, sales for correction of deeds.                             |

### Next Steps:
At this stage, the data has been ingested into pandas DataFrames, and we've inspected a sample of the data. The next steps will involve:
- **Data Cleaning**: Removing any irrelevant or missing data.
- **Data Transformation**: Converting data types, handling missing values, and creating derived columns if necessary.
- **Data Loading**: Loading the cleaned and transformed data into a PostgreSQL data warehouse for analysis.
22 41.35014) |


# Data Cleaning

In [6]:
# Check for missing values and their percentage in the dataset
missing_values = real_estate_sales.isnull().sum()
missing_percentage = (missing_values / len(real_estate_sales)) * 100

# Creating a DataFrame to summarize the missing values and their percentage
missing_analysis = pd.DataFrame({'Missing Values': missing_values, 'Percentage': missing_percentage})

# Displaying the missing analysis for the user to inspect
missing_analysis

Unnamed: 0,Missing Values,Percentage
Serial Number,0,0.0
List Year,0,0.0
Date Recorded,2,0.000182
Town,0,0.0
Address,51,0.004646
Assessed Value,0,0.0
Sale Amount,0,0.0
Sales Ratio,0,0.0
Property Type,382446,34.842921
Residential Type,398389,36.295415


In [7]:
# Removing irrelevant columns that have too many missing values or aren't useful for analysis
real_estate_sales = real_estate_sales.drop(['Assessor Remarks', 'OPM remarks', 'Location', 'Serial Number'], axis=1)

# Dropping rows where critical fields like 'Address' and 'Date Recorded' are missing
real_estate_sales = real_estate_sales.dropna(subset=['Address', 'Date Recorded'])

## Non usable codes cleaning

In [8]:
# Handling 'Non Use Code' missing data: fill with 0 and process specific cases
real_estate_sales['Non Use Code'] = real_estate_sales['Non Use Code'].fillna(0)
real_estate_sales.loc[real_estate_sales['Non Use Code'] == 'Single Family', 'Non Use Code'] = 0
real_estate_sales.loc[real_estate_sales['Non Use Code'] == 'Single Family', 'Residential Type'] = 'Single Family'
real_estate_sales.loc[real_estate_sales['Non Use Code'] == 'Single Family', 'Property Type'] = 'Residential'
real_estate_sales['Non Use Code'] = real_estate_sales['Non Use Code'].apply(lambda x: x.split(" -")[0] if isinstance(x, str) else x)
real_estate_sales['Non Use Code'] = pd.to_numeric(real_estate_sales['Non Use Code'], errors='coerce')
real_estate_sales.loc[real_estate_sales['Non Use Code'] > 30, 'Non Use Code'] = 0

## Property Type et Residential Type cleaning

In [9]:
# Standardizing the 'Property Type' column and handling 'Apartments' as a special case
real_estate_sales.loc[:, 'Property Type'] = real_estate_sales['Property Type'].replace({
    'Two Family': 'Residential',
    'Single Family': 'Residential',
    'Three Family': 'Residential',
    'Four Family': 'Residential',
    'Condo': 'Residential',
})

# Handling apartments as a separate residential type
real_estate_sales.loc[real_estate_sales['Property Type'] == 'Apartments', 'Residential Type'] = 'Apartments'
real_estate_sales.loc[real_estate_sales['Property Type'] == 'Apartments', 'Property Type'] = 'Residential'
real_estate_sales.loc[:, 'Property Type'] = real_estate_sales['Property Type'].fillna('Unknown')

## List Year et Date Recorded cleaning

In [10]:
# Converting 'List Year' to numeric and 'Date Recorded' to datetime format, and extracting time-based features
real_estate_sales['List Year'] = pd.to_numeric(real_estate_sales['List Year'], errors='coerce')
real_estate_sales['Date Recorded'] = pd.to_datetime(real_estate_sales['Date Recorded'], errors='coerce', format='%m/%d/%Y')

# Extracting day, month, year, and quarter from 'Date Recorded'
real_estate_sales.loc[:, 'Sale Day'] = real_estate_sales['Date Recorded'].dt.day
real_estate_sales.loc[:, 'Sale Month'] = real_estate_sales['Date Recorded'].dt.month
real_estate_sales.loc[:, 'Sale Year'] = real_estate_sales['Date Recorded'].dt.year
real_estate_sales.loc[:, 'Sale Quarter'] = real_estate_sales['Date Recorded'].dt.quarter

# Data Loading

1. **Connecting to PostgreSQL Database**:
   - A connection to the PostgreSQL database is established using SQLAlchemy's `create_engine`. The credentials for PostgreSQL (username, password, host, port, and database name) are provided to connect successfully.
   - After establishing the connection, a test query (`SELECT version()`) is executed to confirm the connection is successful.

In [11]:
# Import necessary libraries
from sqlalchemy import create_engine,text

# PostgreSQL credentials
username = 'postgres'
password = 'password'  # Change this with your password
host = 'localhost'  # or your server IP
port = '5432'       # default PostgreSQL port
database = 're_sales_dw'

# Create the connection string
connection_string = f'postgresql://{username}:{password}@{host}:{port}/{database}'
engine = create_engine(connection_string)

# Test the connection
with engine.connect() as connection:
    result = connection.execute(text("SELECT version();"))
    for row in result:
        print(row)

('PostgreSQL 16.3, compiled by Visual C++ build 1938, 64-bit',)


2. **Loading and Inserting dim_list_year Data**:
   - The unique List Year values from the real estate dataset are extracted and loaded into a DataFrame (dim_list_year_df).
   - The code checks if these years already exist in the dim_list_year table in the database and inserts only the new years that are not yet present.

In [12]:
dim_list_year_df = pd.DataFrame({'year': real_estate_sales['List Year'].unique()})
with engine.connect() as conn:
    # Step 1: Load existing years from dim_list_year table
    existing_years = pd.read_sql('SELECT year FROM dim_list_year', conn)

    # Step 2: Find new years that are not already in dim_list_year
    new_years_df = dim_list_year_df[~dim_list_year_df['year'].isin(existing_years['year'])]
    
    # Step 3: Append only the new records to dim_list_year table
    if not new_years_df.empty:
        new_years_df.to_sql('dim_list_year', conn, if_exists='append', index=False)
        conn.commit()

3. **Loading and Inserting dim_location Data**:
   - The Town and Address columns are used to create unique location records.
   - The code checks for new locations not yet stored in the dim_location table and appends them if not already present.

In [13]:
# Create dim_location_df by dropping duplicates from the original DataFrame
dim_location_df = real_estate_sales[['Town', 'Address']].drop_duplicates()
dim_location_df.columns = ['city', 'street_address']  # Rename columns to match your dimension table
with engine.connect() as conn:
    # Step 1: Retrieve existing records from dim_location table
    existing_locations = pd.read_sql('SELECT city, street_address FROM dim_location', conn)

    # Step 2: Find new locations that are not already in dim_location
    new_locations_df = dim_location_df.merge(
        existing_locations, 
        how='left', 
        on=['city', 'street_address'], 
        indicator=True
    ).loc[lambda x: x['_merge'] == 'left_only'].drop(columns='_merge')

    # Step 3: Append only the new records to dim_location table
    if not new_locations_df.empty:
        new_locations_df.to_sql('dim_location', conn, if_exists='append', index=False)
        conn.commit()

4. **Loading and Inserting dim_non_use_code Data**:
   - The Non Use Code is cleaned and columns are renamed to match the target table.
   - New non-use codes that are not yet present in the dim_non_use_code table are identified and inserted.

In [14]:
# Rename columns to match your dimension table
non_usable_codes.columns = ['id', 'title', 'description']

# Connect to the database and perform the insert operation
with engine.connect() as conn:
    # Step 1: Retrieve existing records from dim_non_use_code table
    existing_codes = pd.read_sql('SELECT id, title, description FROM dim_non_use_code', conn)

    # Step 2: Find new codes that are not already in dim_non_use_code
    new_codes_df = non_usable_codes.merge(
        existing_codes, 
        how='left', 
        on=['id', 'title', 'description'], 
        indicator=True
    ).loc[lambda x: x['_merge'] == 'left_only'].drop(columns='_merge')

    # Step 3: Append only the new records to dim_non_use_code table
    if not new_codes_df.empty:
        new_codes_df.to_sql('dim_non_use_code', conn, if_exists='append', index=False)
        conn.commit()

5. **Loading and Inserting dim_property_type Data**:
   - The Property Type and Residential Type columns are used to create unique records in the dimension table.
   - The code checks for new property types not yet in the dim_property_type table and inserts them if needed.

In [15]:
dim_propriety_type_df = real_estate_sales[['Property Type', 'Residential Type']].drop_duplicates()
dim_propriety_type_df.columns = ['property_type', 'residential_type'] 
# Connect to the database and perform the insert operation
with engine.connect() as conn:
    # Step 1: Retrieve existing records from dim_property_type table
    existing_property_types = pd.read_sql('SELECT * FROM dim_property_type', conn)

    # Step 2: Find new property types that are not already in dim_property_type
    new_property_types_df = dim_propriety_type_df.merge(
        existing_property_types, 
        how='left', 
        on=dim_propriety_type_df.columns.tolist(),  # Join on all columns
        indicator=True
    ).loc[lambda x: x['_merge'] == 'left_only'].drop(columns='_merge')

    # Step 3: Append only the new records to dim_property_type table
    if not new_property_types_df.empty:
        new_property_types_df[['property_type', 'residential_type']].to_sql('dim_property_type', conn, if_exists='append', index=False)
        conn.commit()

6. **Loading and Inserting dim_sale_date Data**:
   - The Sale Day, Sale Month, Sale Year, and Sale Quarter columns are used to create a unique set of sale date records.
   - New sale dates are identified and inserted into the dim_sale_date table.

In [16]:
dim_sale_date_df = real_estate_sales[['Sale Day','Sale Month','Sale Year','Sale Quarter']].drop_duplicates()
dim_sale_date_df.columns = ['day','month','year','quarter'] 
# Connect to the database and perform the insert operation
with engine.connect() as conn:
    # Step 1: Retrieve existing records from dim_sale_date table
    existing_sale_dates = pd.read_sql('SELECT * FROM dim_sale_date', conn)

    # Step 2: Find new sale dates that are not already in dim_sale_date
    new_sale_dates_df = dim_sale_date_df.merge(
        existing_sale_dates, 
        how='left', 
        on=dim_sale_date_df.columns.tolist(),  # Join on all columns
        indicator=True
    ).loc[lambda x: x['_merge'] == 'left_only'].drop(columns='_merge')

    # Step 3: Append only the new records to dim_sale_date table
    if not new_sale_dates_df.empty:
        new_sale_dates_df[['day','month','year','quarter']].to_sql('dim_sale_date', conn, if_exists='append', index=False)
        conn.commit()

7. **Creating the Final fact_sales Table**:
   - After loading the data into the dimension tables, the fact table is created by merging the real estate sales data with the dimension tables.
   - Duplicate rows are removed, and only the new records are inserted into the fact_sales table.

In [17]:
with engine.connect() as connection:
    dim_list_year = pd.read_sql('SELECT * FROM dim_list_year', connection)
    dim_location = pd.read_sql('SELECT * FROM dim_location', connection)
    dim_property_type = pd.read_sql('SELECT * FROM dim_property_type', connection)
    dim_non_use_code = pd.read_sql('SELECT * FROM dim_non_use_code', connection)
    dim_sale_date = pd.read_sql('SELECT * FROM dim_sale_date', connection)
final_fact_sales = (
    real_estate_sales.merge(
        dim_list_year.rename(columns={'year': 'id_list_year'}),
        left_on='List Year',
        right_on='id_list_year',
        how='left'
    ).merge(
        dim_location.rename(columns={'id': 'id_location'}),
        left_on=['Town', 'Address'],
        right_on=['city', 'street_address'],
        how='left'
    ).merge(
        dim_property_type.rename(columns={'id_property_type': 'id_property_type'}),
        left_on=['Property Type', 'Residential Type'],
        right_on=['property_type', 'residential_type'],
        how='left'
    ).merge(
        dim_non_use_code.rename(columns={'id': 'id_non_use_code'}),
        left_on='Non Use Code',
        right_on='id_non_use_code',
        how='left'
    ).merge(  # Added the missing period here
        dim_sale_date.rename(columns={'id_sale_date': 'id_sale_date'}),
        left_on=['Sale Day', 'Sale Month', 'Sale Year', 'Sale Quarter'],
        right_on=['day', 'month', 'year', 'quarter'],
        how='left'
    )[
        [
            'id_list_year', 'id_location', 'id_property_type', 'id_non_use_code', 'id_sale_date',
            'Assessed Value', 'Sale Amount', 'Sales Ratio'
        ]
    ].rename(columns={
        'Assessed Value': 'assessed_value',
        'Sale Amount': 'sale_amount',
        'Sales Ratio': 'sales_ratio'
    })
)
duplicate_columns = ['id_list_year', 'id_location', 'id_property_type', 'id_non_use_code', 'id_sale_date']

# Drop duplicates while keeping the first occurrence
final_fact_sales_unique = final_fact_sales.drop_duplicates(subset=duplicate_columns)
# Specify the columns that have the unique constraint in fact_sales
unique_columns = ['id_list_year', 'id_location', 'id_property_type', 'id_non_use_code', 'id_sale_date']

with engine.connect() as conn:
    # Step 1: Retrieve existing unique values from the fact_sales table
    existing_fact_sales = pd.read_sql(f'SELECT {", ".join(unique_columns)} FROM fact_sales', conn)

    # Step 2: Identify new records that are not in fact_sales
    new_fact_sales_df = final_fact_sales_unique.merge(
        existing_fact_sales,
        how='left',
        on=unique_columns,
        indicator=True
    ).loc[lambda x: x['_merge'] == 'left_only'].drop(columns='_merge')

    # Step 3: Insert only the new records
    if not new_fact_sales_df.empty:
        new_fact_sales_df.to_sql('fact_sales', conn, if_exists='append', index=False)
        conn.commit()

### Conclusion

In this notebook, we successfully implemented a data ingestion pipeline to load data from the real estate sales dataset into a PostgreSQL database. The process involved:

1. **Connecting to the PostgreSQL database**: We established a connection to the database using SQLAlchemy and verified the connection with a simple query.
2. **Loading data into dimension tables**: We processed and inserted unique records into the `dim_list_year`, `dim_location`, `dim_non_use_code`, `dim_property_type`, and `dim_sale_date` tables, ensuring no duplicates were inserted.
3. **Creating the fact table**: By merging the original dataset with the dimension tables, we constructed the `fact_sales` table, removing duplicates and inserting only new records.

This approach ensures data consistency and avoids redundancy, while optimizing the ETL process. The steps provided can be easily adapted for future datasets, enabling streamlined data ingestion and integration into the data warehouse.
