In [None]:
from sqlalchemy import create_engine
import pandas as pd
from sqlalchemy import text

In [None]:
DATABASE_URL = "postgresql+psycopg2://mds:mds@152.53.248.27:5432/postgres"

In [None]:
engine = create_engine(DATABASE_URL)

In [47]:
CHUNK_SIZE = 10000
invalid_rows = []

In [48]:
def prepare_bulk_data(df):
    listings_input = []
    zip_codes = set()
    brokers = set()
    row_map = {}

    for idx, row in df.iterrows():
        try:
            if any(pd.isna(row[col]) or str(row[col]).strip() == '' for col in ['zip_code', 'brokered_by', 'status', 'price']):
                invalid_rows.append((idx, row.to_dict()))
                continue

            zip_codes.add((int(row['zip_code']), row['city'], row['state']))
            brokers.add((int(row['brokered_by']),))

            listings_input.append((
                int(row['brokered_by']),
                row['status'],
                row['price'],
                row['prev_sold_date'] if not pd.isna(row['prev_sold_date']) else None
            ))

            row_map[len(listings_input) - 1] = row  # Map insert order index to row

        except Exception as e:
            invalid_rows.append((idx, row.to_dict(), str(e)))

    return listings_input, zip_codes, brokers, row_map

In [49]:
def bulk_named_insert(conn, sql, tuple_data):
    if not tuple_data:
        return
    named_data = [{str(i + 1): val for i, val in enumerate(row)} for row in tuple_data]
    conn.execute(text(sql), named_data)

In [50]:
def bulk_insert_all(df):
    with engine.begin() as conn:
        for i in range(0, len(df), CHUNK_SIZE):
            print(f"working on chunk number {i}")
            chunk = df.iloc[i:i + CHUNK_SIZE]

            listings_input, zip_codes, brokers, row_map = prepare_bulk_data(chunk)

            # Insert zip codes
            bulk_named_insert(conn,
                "INSERT INTO zip_codes (zip_code, city, state) VALUES (:1, :2, :3) ON CONFLICT DO NOTHING",
                list(zip_codes)
            )

            # Insert brokers
            bulk_named_insert(conn,
                "INSERT INTO brokers (brokered_by) VALUES (:1) ON CONFLICT DO NOTHING",
                list(brokers)
            )

            # Insert listings and capture listing_ids
            inserted_listing_ids = []
            if listings_input:
                result = conn.execute(
                    text("""
                        INSERT INTO listings (brokered_by, status, price, prev_sold_date)
                        VALUES (:brokered_by, :status, :price, :prev_sold_date)
                        RETURNING listing_id
                    """),
                    [
                        {
                            'brokered_by': b,
                            'status': s,
                            'price': p,
                            'prev_sold_date': d
                        } for b, s, p, d in listings_input
                    ]
                )
                inserted_listing_ids = result.scalars().all()

            # Prepare dependent inserts
            estate_details = []
            land_data = []
            addresses = []

            for i, listing_id in enumerate(inserted_listing_ids):
                row = row_map[i]
                try:
                    if any(not pd.isna(row[col]) and str(row[col]).strip() != '' for col in ['bed', 'bath', 'house_size']):
                        estate_details.append((
                            listing_id,
                            row['bed'],
                            row['bath'],
                            row['house_size']
                        ))

                    if not pd.isna(row['acre_lot']):
                        area_m2 = row['acre_lot'] * 4046.8564224
                        land_data.append((listing_id, area_m2))

                    addresses.append((
                        listing_id,
                        row['street'] if not pd.isna(row['street']) else None,
                        int(row['zip_code'])
                    ))
                except Exception as e:
                    invalid_rows.append((i, row.to_dict(), str(e)))

            # Insert dependent tables
            bulk_named_insert(conn,
                "INSERT INTO estate_details (listing_id, bed, bath, house_size) VALUES (:1, :2, :3, :4)",
                estate_details
            )

            bulk_named_insert(conn,
                "INSERT INTO land_data (listing_id, area_size_in_square_m) VALUES (:1, :2)",
                land_data
            )

            bulk_named_insert(conn,
                "INSERT INTO addresses (listing_id, street, zip_code) VALUES (:1, :2, :3)",
                addresses
            )

In [51]:
# Load data
df = pd.read_csv(r"C:\Users\ahmed\Desktop\Master_Studium\MDBS\realtor-data.csv")

In [52]:
df

Unnamed: 0,brokered_by,status,price,bed,bath,acre_lot,street,city,state,zip_code,house_size,prev_sold_date
0,103378.0,for_sale,105000.0,3.0,2.0,0.12,1962661.0,Adjuntas,Puerto Rico,601.0,920.0,
1,52707.0,for_sale,80000.0,4.0,2.0,0.08,1902874.0,Adjuntas,Puerto Rico,601.0,1527.0,
2,103379.0,for_sale,67000.0,2.0,1.0,0.15,1404990.0,Juana Diaz,Puerto Rico,795.0,748.0,
3,31239.0,for_sale,145000.0,4.0,2.0,0.10,1947675.0,Ponce,Puerto Rico,731.0,1800.0,
4,34632.0,for_sale,65000.0,6.0,2.0,0.05,331151.0,Mayaguez,Puerto Rico,680.0,,
...,...,...,...,...,...,...,...,...,...,...,...,...
2226377,23009.0,sold,359900.0,4.0,2.0,0.33,353094.0,Richland,Washington,99354.0,3600.0,2022-03-25
2226378,18208.0,sold,350000.0,3.0,2.0,0.10,1062149.0,Richland,Washington,99354.0,1616.0,2022-03-25
2226379,76856.0,sold,440000.0,6.0,3.0,0.50,405677.0,Richland,Washington,99354.0,3200.0,2022-03-24
2226380,53618.0,sold,179900.0,2.0,1.0,0.09,761379.0,Richland,Washington,99354.0,933.0,2022-03-24


In [53]:
bulk_insert_all(df)

working on chunk number 0


KeyboardInterrupt: 

In [None]:
print("Rows that could not be inserted:")
for bad_row in invalid_rows:
    print(bad_row)
