In [13]:
import re
import requests
import json
from collections import defaultdict
import copy
import pandas as pd
import uuid
from sqlalchemy import create_engine
import os
from dotenv import load_dotenv
import psycopg2

In [14]:

with open('all_data_22.json', 'r', encoding='utf-8') as f:
    all_data_22 = json.load(f)

# print(f"Loaded {len(all_data_22)} listings")  

# print(json.dumps(all_data_22, indent=2))



# def pre_cleaning():
#     # 1) extract…
#     all_data = data_extraction()

#     # 2) clean…
#     print(json.dumps(all_data, indent=2))
#     # …more cleaning…

#     # 3) return cleaned
#     return all_data

# if __name__ == "__main__":
#     cleaned = pre_cleaning()
#     print("Done, got", len(cleaned), "records")





def pre_cleaning(all_data_22):
    # Extracting the 2 main data endpoints from the global data
    sale_listings = all_data_22["sale_listings"]
    rental_listings = all_data_22["rental_listings"]

    # # saving the jason data to current directory
    # with open('sale_listings.json', 'w', encoding='utf-8') as f:
    #     json.dump(sale_listings, f, ensure_ascii=False, indent=2)

    # with open('rental_listings.json', 'w', encoding='utf-8') as f:
    #     json.dump(rental_listings, f, ensure_ascii=False, indent=2)

    # Making a copy of the data before editing
    sale_listings_copy = copy.deepcopy(sale_listings)
    rental_listings_copy = copy.deepcopy(rental_listings)

    # Further extraction and pre-cleaning starting with sale_listings_copy
    property_history = defaultdict(list)
    property_agent = defaultdict(list)
    property_officer = defaultdict(list)
    property_sales_info = defaultdict(list)

    for state, listings in sale_listings_copy.items():
        for listing in listings:
            # grab the unique id
            listing_id = listing["id"]

            # extract & inject id into history
            history = listing.pop("history", {})
            for date_key, event in history.items():
                event["id"] = listing_id

            # extract & inject id into agent
            listing_agent = listing.pop("listingAgent", {})
            if listing_agent:
                listing_agent["id"] = listing_id

            # extract & inject id into office
            listing_office = listing.pop("listingOffice", {})
            if listing_office:
                listing_office["id"] = listing_id

            # whatever remains is my “sales info” (it still has 'id' there)
            sales = listing

            # putting all together
            property_history[state].append(history)
            property_agent[state].append(listing_agent)
            property_officer[state].append(listing_office)
            property_sales_info[state].append(sales)

    # Next is the rental_listings_copy

    # Prepare collectors
    rental_history = defaultdict(list)
    rental_listing_info = defaultdict(list)

    # Extract history + rest for each listing
    for state, listings in rental_listings_copy.items():
        for listing in listings:
            listing_id = listing["id"]

            # pop & tag history
            history = listing.pop("history", {})
            for date_key, event in history.items():
                event["id"] = listing_id
            rental_history[state].append(history)

            # everything left is the “rest” of the listing
            #    (it still includes 'id' plus all other top-level fields)
            rest = listing
            rental_listing_info[state].append(rest)

    # Exacting the fields i want to keep in property_sales_info:
    fields = [
        "id", "addressLine1", "city", "state", "zipCode", "county",
        "propertyType", "bedrooms", "bathrooms", "squareFootage",
        "lotSize", "yearBuilt", "price", "listedDate", "daysOnMarket"
    ]

    # Preparing a flat list to collect each listing’s info
    extracted_property_sales_info = []

    for listings in property_sales_info.values():        # loop each state’s list
        for listing in listings:                  # loop each listing dict
            # Building a new dict with only the desired keys
            info = {}
            for field in fields:
                info[field] = listing.get(field)
            # Add it to the flat list
            extracted_property_sales_info.append(info)

    # Exacting the fields i want to keep in property_history:
    fields = [
        "event", "price", "listingType", "listedDate",
        "removedDate", "daysOnMarket", "id"
    ]

    extracted_property_history = []

    for state_list in property_history.values():        # loop each state’s list
        for date_dict in state_list:                  # loop each {"2025-06-25": {...}} entry
            # date_dict has one key (the date) and one value (the actual listing dict)
            for date_key, listing in date_dict.items():
                # building info dict
                info = {}
                for field in fields:
                    # copy the field value (or None if missing)
                    value = listing.get(field)
                    info[field] = value
                extracted_property_history.append(info)

    # Exacting the fields you want to keep in property_agent:
    fields = ["name", "phone", "email", "id"]

    # Preparing a flat list to collect each listing’s info
    extracted_property_agent = []

    for listings in property_agent.values():        # loop each state’s list
        for listing in listings:                  # loop each listing dict
            # Build a new dict with only the desired keys
            info = {}
            for field in fields:
                info[field] = listing.get(field)
            # Add it to the flat list
            extracted_property_agent.append(info)

    # Exacting the fields i want to keep in property_officer:
    fields = ["name", "phone", "email", "website", "id"]

    # Preparing a flat list to collect each listing’s info
    extracted_property_officer = []

    for listings in property_officer.values():        # loop each state’s list
        for listing in listings:                  # loop each listing dict
            # Building a new dict with only the desired keys
            info = {}
            for field in fields:
                info[field] = listing.get(field)
            # Add it to the flat list
            extracted_property_officer.append(info)

    # Exacting the fields i want to keep in rental_listing_info:
    fields = [
        "id", "addressLine1", "city", "state", "zipCode", "county",
        "propertyType", "bedrooms", "bathrooms", "squareFootage",
        "price", "listedDate", "daysOnMarket"
    ]

    # Preparing a flat list to collect each listing’s info
    extracted_rental_listing_info = []

    for listings in rental_listing_info.values():        # loop each state’s list
        for listing in listings:                  # loop each listing dict
            # Building a new dict to collect
            info = {}
            for field in fields:
                info[field] = listing.get(field)
            extracted_rental_listing_info.append(info)

    # Exacting the fields i want to keep in rental_history:
    fields = [
        "event", "price", "listingType", "listedDate",
        "removedDate", "daysOnMarket", "id"
    ]

    extracted_rental_history = []

    for state_list in rental_history.values():        # loop each state’s list
        for date_dict in state_list:                  # loop each {"2025-06-25": {...}} entry
            for date_key, listing in date_dict.items():
                # building info dict
                info = {}
                for field in fields:
                    value = listing.get(field)
                    info[field] = value
                extracted_rental_history.append(info)
    
#   print(json.dumps(extracted_property_sales_info, indent=2))
    return {
        "extracted_property_sales_info": extracted_property_sales_info,
        "extracted_property_history": extracted_property_history,
        "extracted_property_agent": extracted_property_agent,
        "extracted_property_officer": extracted_property_officer,
        "extracted_rental_listing_info": extracted_rental_listing_info,
        "extracted_rental_history": extracted_rental_history
    }

# x = pre_cleaning(all_data_22)
# print(json.dumps(x, indent=2))


In [15]:

def cleaning_job_1(all_data_22):
    # Call the function and grab its output
    cleaned = pre_cleaning(all_data_22)

    # Extract each list by key
    extracted_property_history = cleaned["extracted_property_history"]
    extracted_property_agent   = cleaned["extracted_property_agent"]
    extracted_property_officer = cleaned["extracted_property_officer"]
    extracted_property_sales   = cleaned["extracted_property_sales_info"]
    extracted_rental_history   = cleaned["extracted_rental_history"]
    extracted_rental_listing_info = cleaned["extracted_rental_listing_info"]

    #### Creating the dataframes for sales_listings data points
    property_history_df = pd.DataFrame(extracted_property_history)
    property_agent_df = pd.DataFrame(extracted_property_agent)
    property_officer_df = pd.DataFrame(extracted_property_officer)
    property_sales_info_df = pd.DataFrame(extracted_property_sales)

    # ***Reordering property_history_df***
    property_history_df['sales_history_id'] = None
    for num in property_history_df.index:
        property_history_df.at[num, 'sales_history_id'] = str(uuid.uuid4())
    
    # Move history_id to be the very first column
    property_history_df.insert(0, 'sales_history_id', property_history_df.pop('sales_history_id'))
    
    # Pop out the old “id” column and insert it as “sales_id” in position 1
    property_history_df.insert(1, 'sales_id', property_history_df.pop('id'))

    # ***Reordering property_agent_df***
    property_agent_df['agent_id'] = None
    for num in property_agent_df.index:
        property_agent_df.at[num, 'agent_id'] = str(uuid.uuid4())
    property_agent_df.insert(0, 'agent_id', property_agent_df.pop('agent_id'))
    property_agent_df.insert(1, 'sales_id', property_agent_df.pop('id'))

    # ***Reordering property_officer_df***
    property_officer_df['officer_id'] = None
    for num in property_officer_df.index:
        property_officer_df.at[num, 'officer_id'] = str(uuid.uuid4())
    property_officer_df.insert(0, 'officer_id', property_officer_df.pop('officer_id'))
    property_officer_df.insert(1, 'sales_id', property_officer_df.pop('id'))

    # *****Reordering property_sales_info_df*****
    property_sales_info_df.insert(0, 'sales_id', property_sales_info_df.pop('id'))

    #### Creating the dataframes for rental_listings data points
    rental_history_df = pd.DataFrame(extracted_rental_history)
    rental_listing_info_df = pd.DataFrame(extracted_rental_listing_info)

    # ***Reordering rental_history_df***
    rental_history_df['rental_history_id'] = None
    for num in rental_history_df.index:
        rental_history_df.at[num, 'rental_history_id'] = str(uuid.uuid4())
    rental_history_df.insert(0, 'rental_history_id', rental_history_df.pop('rental_history_id'))
    rental_history_df.insert(1, 'rental_id', rental_history_df.pop('id'))

    # *****Reordering rental_listing_info_df*****
    rental_listing_info_df.insert(0, 'rental_id', rental_listing_info_df.pop('id'))

    return {
        'property_history_df': property_history_df,
        'property_agent_df': property_agent_df,
        'property_officer_df': property_officer_df,
        'property_sales_info_df': property_sales_info_df,
        'rental_history_df': rental_history_df,
        'rental_listing_info_df': rental_listing_info_df
    }




x = cleaning_job_1(all_data_22)
print(x)

{'property_history_df':                           sales_history_id  \
0     f276c125-2929-4b2a-8e5c-13f2d9309f6a   
1     b15eb71d-8072-4bf1-96e4-6e01a2ea83eb   
2     86ff3774-ce13-48e4-9741-19f93c56f4f0   
3     7a64cc74-efb5-4e10-abee-0b0abfe39252   
4     3dfae47c-ff76-40b2-a420-ff828c82cf19   
...                                    ...   
4352  888986a5-c4d6-4606-8dd1-5d0ede4f95bc   
4353  b3481a19-782d-496a-8a47-f3cd02a0a709   
4354  b6e0073e-a7aa-4d93-a245-4a11a5b14df8   
4355  e14a1c64-93ad-481b-a58b-d38e72ef3b70   
4356  cf30e1b6-082b-47d6-8b04-0276d9942029   

                                              sales_id         event  \
0                   540-Marker-Ln,-Wellsburg,-WV-26070  Sale Listing   
1                    206-Crystal-Ln,-Weirton,-WV-26062  Sale Listing   
2                 203-Park-Ave,-Middlebourne,-WV-26149  Sale Listing   
3                       963-Ivydale-Rd,-Clay,-WV-25043  Sale Listing   
4                  13088-Clay-Hwy,-Lizemores,-WV-25125  Sale Li

In [16]:
def cleaning_job_2(all_data):
    # 1. Call the function and grab its output
    cleaned_2 = cleaning_job_1(all_data)

    # 2. Extract each list by key
    property_history_df = cleaned_2["property_history_df"]
    property_agent_df = cleaned_2["property_agent_df"]
    property_officer_df = cleaned_2["property_officer_df"]
    property_sales_info_df = cleaned_2["property_sales_info_df"]
    rental_history_df = cleaned_2["rental_history_df"]
    rental_listing_info_df = cleaned_2["rental_listing_info_df"]

    # Cleaning procedure for property_history_df
    # Drop multiple columns, returning a new DataFrame
    property_history_df.drop(columns=["event", "listingType", "removedDate"], inplace=True)
    # Dealing withd duplicates
    property_history_df.drop_duplicates(subset=['sales_id'], keep='first')
    # Dealing with datetime
    property_history_df['listedDate']  = pd.to_datetime(property_history_df['listedDate'])
    # Handling Numerics → integer or float
    property_history_df['daysOnMarket'] = property_history_df['daysOnMarket'].astype('Int64')
    # converting price to int by rounding off
    property_history_df['price'] = property_history_df['price'].round(0).astype('Int64')

    # Cleaning procedure for property_agent_df
    # Dropping any rows where sales_id is null
    property_agent_df.dropna(subset=['sales_id'], inplace=True)
    # Now property_agent_df has no rows where sales_id is null.
    # Filling the missing values
    property_agent_df['phone'].fillna('000-000-0000', inplace=True)
    property_agent_df['email'].fillna('unknown@example.com', inplace=True)
    # Trim whitespace & normalize casing
    property_agent_df['name']  = property_agent_df['name'].str.strip().str.title()
    property_agent_df['email'] = property_agent_df['email'].str.strip().str.lower()
    # Dealing with duplicates
    property_agent_df.drop_duplicates(subset=['sales_id'], keep='first')

    # Cleaning procedure for property_officer_df
    # Dropping any rows where sales_id is null
    property_officer_df.dropna(subset=['sales_id'], inplace=True)
    # Now property_officer_df has no rows where sales_id is null.
    # Drop redundant columns returning
    property_officer_df.drop(columns=["website"], inplace=True)
    # Filling the missing values
    property_officer_df['phone'].fillna('000-000-0000', inplace=True)
    property_officer_df['email'].fillna('unknown@example.com', inplace=True)
    # Trim whitespace & normalize casing
    property_officer_df['name']  = property_officer_df['name'].str.strip().str.title()
    property_officer_df['email'] = property_officer_df['email'].str.strip().str.lower()
    # Dealing withd duplicates
    property_officer_df.drop_duplicates(subset=['sales_id'], keep='first')

    # Cleaning procedure for property_sales_info_df
    # Converting to the right data type for strings and categorise
    property_sales_info_df['sales_id'] = property_sales_info_df['sales_id'].astype(str)
    property_sales_info_df['addressLine1'] = property_sales_info_df['addressLine1'].astype(str)
    property_sales_info_df['zipCode'] = property_sales_info_df['zipCode'].astype(str)
    property_sales_info_df['propertyType'] = property_sales_info_df['propertyType'].astype('category')
    # Drop redundant columns returning
    property_sales_info_df.drop(columns=["lotSize"], inplace=True)
    # Convert your float‐with‐NaNs column into a nullable integer column:
    property_sales_info_df['bedrooms'] = property_sales_info_df['bedrooms'].astype('Int64')
    property_sales_info_df['squareFootage'] = property_sales_info_df['squareFootage'].astype('Int64')
    property_sales_info_df['daysOnMarket'] = property_sales_info_df['daysOnMarket'].astype('Int64')
    property_sales_info_df['yearBuilt'] = property_sales_info_df['yearBuilt'].astype('Int64')
    # converting price to int by rounding off
    property_sales_info_df['price'] = property_sales_info_df['price'].round(0).astype('Int64')
    # Dealing with datetime
    property_sales_info_df['listedDate'] = pd.to_datetime(property_sales_info_df['listedDate'], utc=True, errors='coerce')
    # Address normalization
    property_sales_info_df['addressLine1'] = property_sales_info_df['addressLine1'].str.strip()
    property_sales_info_df['addressLine1'] = property_sales_info_df['addressLine1'].str.replace(r'\s+', ' ', regex=True)
    property_sales_info_df['addressLine1'] = property_sales_info_df['addressLine1'].str.title()
    # Dealing withd duplicates
    property_sales_info_df.drop_duplicates(subset=['sales_id'], keep='first')

    # Cleaning procedure for rental_history_df
    # Drop multiple columns, returning a new DataFrame
    rental_history_df.drop(columns=["event", "listingType", "removedDate"], inplace=True)
    # Dealing withd duplicates
    rental_history_df.drop_duplicates(subset=['rental_id'], keep='first')
    # Dealing with datetime
    rental_history_df['listedDate']  = pd.to_datetime(rental_history_df['listedDate'])
    # Handling Numerics → integer or float
    rental_history_df['price'] = rental_history_df['price'].astype('Int64')
    rental_history_df['daysOnMarket']= rental_history_df['daysOnMarket'].astype('Int64')

    # Cleaning procedure for rental_listing_info_df
    # Converting to the right data type for strings and categorise
    rental_listing_info_df['rental_id'] = rental_listing_info_df['rental_id'].astype(str)
    rental_listing_info_df['addressLine1'] = rental_listing_info_df['addressLine1'].astype(str)
    rental_listing_info_df['zipCode'] = rental_listing_info_df['zipCode'].astype(str)
    rental_listing_info_df['propertyType']  = rental_listing_info_df['propertyType'].astype('category')
    # Convert your float‐with‐NaNs column into a nullable integer column:
    rental_listing_info_df['bedrooms'] = rental_listing_info_df['bedrooms'].astype('Int64')
    rental_listing_info_df['squareFootage'] = rental_listing_info_df['squareFootage'].astype('Int64')
    rental_listing_info_df['daysOnMarket']  = rental_listing_info_df['daysOnMarket'].astype('Int64')
    # converting price to int by rounding off
    rental_listing_info_df['price'] = rental_listing_info_df['price'].round(0).astype('Int64')
    # Dealing with datetime
    rental_listing_info_df['listedDate'] = pd.to_datetime(rental_listing_info_df['listedDate'], utc=True, errors='coerce')
    # Address normalization
    rental_listing_info_df['addressLine1'] = rental_listing_info_df['addressLine1'].str.strip()
    rental_listing_info_df['addressLine1'] = rental_listing_info_df['addressLine1'].str.replace(r'\s+', ' ', regex=True)
    rental_listing_info_df['addressLine1'] = rental_listing_info_df['addressLine1'].str.title()
    # Dealing withd duplicates
    rental_listing_info_df.drop_duplicates(subset=['rental_id'], keep='first')

    return {
        'property_history_df': property_history_df,
        'property_agent_df': property_agent_df,
        'property_officer_df': property_officer_df,
        'property_sales_info_df': property_sales_info_df,
        'rental_history_df': rental_history_df,
        'rental_listing_info_df': rental_listing_info_df
    }


x = cleaning_job_2(all_data_22)
print(x)

{'property_history_df':                           sales_history_id  \
0     3e26560b-d2bb-46da-8c36-84d0602cc0d2   
1     3065a596-b4fc-4fc1-9ac3-a5cd8e148bf2   
2     e42ec8e2-7b53-4916-a42c-d31519e382e2   
3     212933ff-a17c-44e2-97e0-97ba8e2fb855   
4     789ea7b4-89a8-45d7-9cb9-fc1ce6a0d30e   
...                                    ...   
4352  99b8e673-2025-413a-9392-db8a6e923965   
4353  03f6a621-8ad0-4347-918f-286a06cb2e5e   
4354  6db95a51-fcef-4fd2-a864-4874bb05d762   
4355  0f74de9b-a0a3-427d-a3a0-c7b3c8ddce9e   
4356  9cd851e4-80b6-45b1-8174-f19663e047b5   

                                              sales_id   price  \
0                   540-Marker-Ln,-Wellsburg,-WV-26070  315000   
1                    206-Crystal-Ln,-Weirton,-WV-26062  184900   
2                 203-Park-Ave,-Middlebourne,-WV-26149  309900   
3                       963-Ivydale-Rd,-Clay,-WV-25043  255000   
4                  13088-Clay-Hwy,-Lizemores,-WV-25125  235000   
...                        

The behavior will change in pandas 3.0. This inplace method will never work because the intermediate object on which we are setting values always behaves as a copy.

For example, when doing 'df[col].method(value, inplace=True)', try using 'df.method({col: value}, inplace=True)' or df[col] = df[col].method(value) instead, to perform the operation inplace on the original object.


  property_agent_df['phone'].fillna('000-000-0000', inplace=True)
The behavior will change in pandas 3.0. This inplace method will never work because the intermediate object on which we are setting values always behaves as a copy.

For example, when doing 'df[col].method(value, inplace=True)', try using 'df.method({col: value}, inplace=True)' or df[col] = df[col].method(value) instead, to perform the operation inplace on the original object.


  property_agent_df['email'].fillna('unknown@example.com', inplace=True)
The behavior will change in pandas 3.0. This inplace method will never work because the intermedi

In [17]:

def load_data(all_data_22):
    # 1. Call the function and grab its output
    cleaned_3 = cleaning_job_2(all_data_22)

    # 2. Extract each list by key
    property_history_df         = cleaned_3["property_history_df"]
    property_agent_df           = cleaned_3["property_agent_df"]
    property_officer_df         = cleaned_3["property_officer_df"]
    property_sales_info_df      = cleaned_3["property_sales_info_df"]
    rental_history_df           = cleaned_3["rental_history_df"]
    rental_listing_info_df      = cleaned_3["rental_listing_info_df"]

    # Load your .env
    load_dotenv()  # looks for a .env file in cwd

    DB_USER     = os.getenv("DB_USER")
    DB_PASSWORD = os.getenv("DB_PASSWORD")
    DB_HOST     = os.getenv("DB_HOST", "localhost")
    DB_PORT     = os.getenv("DB_PORT", "5432")
    DB_NAME     = os.getenv("DB_NAME")

    # Create the engine
    engine = create_engine(
        f"postgresql+psycopg2://{DB_USER}:{DB_PASSWORD}@{DB_HOST}:{DB_PORT}/{DB_NAME}"
    )

    # Prepare your DataFrames and target table names
    tables = {
        "sales_info": property_sales_info_df,
        "sales_history": property_history_df,
        "sales_agent": property_agent_df,
        "sales_officer": property_officer_df,
        "rental_history": rental_history_df,
        "rental_info": rental_listing_info_df
    }

    # Loop and write each one
    for table_name, df in tables.items():
        df.to_sql(
            name=table_name,
            con=engine,
            schema="public",         # adjust if you use another schema
            if_exists="replace",     # or "append" There are risk to using append for future batch jobs, like if it appends PK i already have in there, it'll break.
            index=False,             # drop the DataFrame’s index column
            chunksize=500            # adjust batch size to manage memory/performance
        )
        print(f"→ Loaded {len(df)} rows into {table_name}")


load_data(all_data_22)


# # run_job
# if __name__ == "__main__":
#     # # 1) extract your raw JSON‐like payload
#     # all_data = data_extraction()

#     # 2) hand it off to your loader
#     load_data(all_data)

#     print("✅ All tables written to the database.")


The behavior will change in pandas 3.0. This inplace method will never work because the intermediate object on which we are setting values always behaves as a copy.

For example, when doing 'df[col].method(value, inplace=True)', try using 'df.method({col: value}, inplace=True)' or df[col] = df[col].method(value) instead, to perform the operation inplace on the original object.


  property_agent_df['phone'].fillna('000-000-0000', inplace=True)
The behavior will change in pandas 3.0. This inplace method will never work because the intermediate object on which we are setting values always behaves as a copy.

For example, when doing 'df[col].method(value, inplace=True)', try using 'df.method({col: value}, inplace=True)' or df[col] = df[col].method(value) instead, to perform the operation inplace on the original object.


  property_agent_df['email'].fillna('unknown@example.com', inplace=True)
The behavior will change in pandas 3.0. This inplace method will never work because the intermedi

→ Loaded 3542 rows into sales_info
→ Loaded 4357 rows into sales_history
→ Loaded 2190 rows into sales_agent
→ Loaded 2218 rows into sales_officer
→ Loaded 7813 rows into rental_history
→ Loaded 3416 rows into rental_info
