In [22]:
import pandas as pd 
import numpy as np
import requests
import logging
import os
import traceback 
import sys
from dotenv import load_dotenv
import psycopg2 
from sqlalchemy import create_engine
load_dotenv()

True

In [2]:
BASE_DIR = os.getcwd() ## This is used because of coding in jupyter notebook (if in python_file, it changes to "BASE_DIR = os.path.dirname(os.path.abspath(__file__)))
LOG_PATH = os.path.join(BASE_DIR, ".env")

LOG_PATH = os.path.join(BASE_DIR, "etl.log")
logging.basicConfig(
    filename=LOG_PATH,
    level=logging.INFO,
    format="%(asctime)s %(levelname)s %(message)s"
)

logging.info("ETL job started")
logging.info("Extracting data from API...")
logging.warning("API took longer than expected")
logging.error("Failed to load into database")
logging.info("ETL job finished")

READ CONFIGURATION FROM .env Environment for Retrieval 

In [3]:
url="https://api.rentcast.io/v1/listings/sale?limit=150"
password= os.getenv("DB_PASSWORD")
api_key=os.getenv("API_KEY")
port=os.getenv("PORT")
host=os.getenv("DB_HOST")
user_name=os.getenv("USER")
db=os.getenv("DATABASE")
db_name=os.getenv("DB_NAME") 


In [4]:
headers = {
    "Accept": "application/json",
    "X-Api-Key": api_key   # where api_key = os.getenv("API_KEY")
}


GET END POINT FROM YOUR API FOR DATA INGESTION 

In [5]:
def extract_data(url):
    '''this function gets a response from a url and returns a dataframe'''
    response = requests.get(url, headers=headers)
    if response.status_code == 200:
        data = response.json()
        df = pd.json_normalize(data)
        return df
    else:
        raise Exception(f"Error fetching data: {response.status_code}- {response.text}")



In [6]:
df=extract_data(url)
df.head()


Unnamed: 0,id,formattedAddress,addressLine1,addressLine2,city,state,stateFips,zipCode,county,countyFips,...,history.2025-07-10.listingType,history.2025-07-10.listedDate,history.2025-07-10.removedDate,history.2025-07-10.daysOnMarket,history.2025-05-15.event,history.2025-05-15.price,history.2025-05-15.listingType,history.2025-05-15.listedDate,history.2025-05-15.removedDate,history.2025-05-15.daysOnMarket
0,"12763-Battista-Ln,-Henderson,-NV-89044","12763 Battista Ln, Henderson, NV 89044",12763 Battista Ln,,Henderson,NV,32,89044,Clark,3,...,,,,,,,,,,
1,"1014-Lone-Pine-River-Ave,-Henderson,-NV-89002","1014 Lone Pine River Ave, Henderson, NV 89002",1014 Lone Pine River Ave,,Henderson,NV,32,89002,Clark,3,...,,,,,,,,,,
2,"6955-Elysian-Valley-Ave,-Las-Vegas,-NV-89113","6955 Elysian Valley Ave, Las Vegas, NV 89113",6955 Elysian Valley Ave,,Las Vegas,NV,32,89113,Clark,3,...,,,,,,,,,,
3,"1105-Aubrey-Springs-Ave,-Henderson,-NV-89014","1105 Aubrey Springs Ave, Henderson, NV 89014",1105 Aubrey Springs Ave,,Henderson,NV,32,89014,Clark,3,...,,,,,,,,,,
4,"2486-Sabado-St,-Las-Vegas,-NV-89121","2486 Sabado St, Las Vegas, NV 89121",2486 Sabado St,,Las Vegas,NV,32,89121,Clark,3,...,,,,,,,,,,


In [7]:
df.shape

(150, 394)

In [8]:
df.columns

Index(['id', 'formattedAddress', 'addressLine1', 'addressLine2', 'city',
       'state', 'stateFips', 'zipCode', 'county', 'countyFips',
       ...
       'history.2025-07-10.listingType', 'history.2025-07-10.listedDate',
       'history.2025-07-10.removedDate', 'history.2025-07-10.daysOnMarket',
       'history.2025-05-15.event', 'history.2025-05-15.price',
       'history.2025-05-15.listingType', 'history.2025-05-15.listedDate',
       'history.2025-05-15.removedDate', 'history.2025-05-15.daysOnMarket'],
      dtype='object', length=394)

In [9]:
engine = create_engine(f'postgresql+psycopg2://{user_name}:{password}@{host}:{port}/{db_name}')

In [10]:
try:
    conn = psycopg2.connect(
        host=host,
        database=db_name,
        user=user_name,
        password=password,
        port=port
    )
    print("Connection successful")
except Exception as e:
    print("Connection was unsuccessful:", e)

Connection successful


In [11]:
cur=conn.cursor()

In [12]:
df.to_sql('raw_listings', con=engine, if_exists='replace', index=False,method="multi")

150

In [13]:
conn.commit()

In [14]:
Total_list =list(df.columns)


In [16]:
Total_list

['id',
 'formattedAddress',
 'addressLine1',
 'addressLine2',
 'city',
 'state',
 'stateFips',
 'zipCode',
 'county',
 'countyFips',
 'latitude',
 'longitude',
 'propertyType',
 'bedrooms',
 'bathrooms',
 'squareFootage',
 'lotSize',
 'yearBuilt',
 'status',
 'price',
 'listingType',
 'listedDate',
 'removedDate',
 'createdDate',
 'lastSeenDate',
 'daysOnMarket',
 'mlsName',
 'mlsNumber',
 'hoa.fee',
 'listingAgent.name',
 'listingAgent.phone',
 'listingAgent.email',
 'listingOffice.name',
 'listingOffice.phone',
 'listingOffice.email',
 'history.2025-07-08.event',
 'history.2025-07-08.price',
 'history.2025-07-08.listingType',
 'history.2025-07-08.listedDate',
 'history.2025-07-08.removedDate',
 'history.2025-07-08.daysOnMarket',
 'listingOffice.website',
 'history.2025-04-18.event',
 'history.2025-04-18.price',
 'history.2025-04-18.listingType',
 'history.2025-04-18.listedDate',
 'history.2025-04-18.removedDate',
 'history.2025-04-18.daysOnMarket',
 'listingAgent.website',
 'history.

In [15]:
def transform_data(df):
    listing_data_cleaned = df[['id', 'formattedAddress', 'city',
       'state', 'zipCode', 'county', 'latitude', 'longitude', 'propertyType',
       'bedrooms', 'bathrooms', 'squareFootage', 'lotSize', 'yearBuilt',
       'status', 'price', 'listingType', 'listedDate', 'removedDate',
       'createdDate', 'lastSeenDate', 'daysOnMarket', 'mlsName', 'mlsNumber',
       'listingAgent.name', 'listingAgent.phone', 'listingAgent.email',
       'listingAgent.website', 'listingOffice.name', 'listingOffice.phone',
       'listingOffice.email', 'listingOffice.website']].copy()
    listing_data_cleaned.rename(columns={
        'formattedAddress': 'full_address','zipCode': 'postal_code'}, inplace=True
        )
    listing_data_cleaned.columns= (
        listing_data_cleaned.columns
        .str.strip()
        .str.replace(r'([A-Z])', r'_\1', regex=True)
        .str.replace(" ", "_")
        .str.replace(r'[^0-9a-zA-Z_]', r'_', regex=True)
        .str.lower()
        .str.replace('__', '_')
        .str.strip('_')
        )
    return listing_data_cleaned

In [16]:
listing_data_cleaned = transform_data(df)

TRANSFORMED DATA TO BE LOADED INTO THE DATABASE

In [17]:
engine = create_engine(f"postgresql+psycopg2://{user_name}:{password}@{host}:{port}/{db_name}")

In [18]:
listing_data_cleaned.to_sql('transformed_listings', con=engine, if_exists='replace', index=False, method="multi")

150

In [19]:
cur=conn.cursor()

In [20]:
conn.commit()

In [21]:
def main() -> int:
    logging.info("ETL start")
    try:
        df = extract_data(url)
        df.to_sql("raw_listings", engine, if_exists="replace", index=False,method="multi")

        df_list_clean = transform_data(df)
        df_list_clean.to_sql("transformed_listings", engine, if_exists="replace", index=False,method="multi")

        logging.info("Rows: raw=%s clean=%s", len(df), len(df_list_clean))
        logging.info("ETL success")
        return 0
    except Exception:
        logging.error("ETL failed\n%s", traceback.format_exc())
        return 1

if __name__ == "__main__":
    main()
