In [27]:
import pandas as pd
import mysql.connector

conn = mysql.connector.connect(
    host="localhost",
    user="root",
    password="Riteshmattoo@2345",  # keep private in real projects
    database="county_migration"
)

cursor = conn.cursor()
print("Database Connected Successfully!")


Database Connected Successfully!


In [28]:
#Read & Merge CSV Files
RAW_FILES = {
    "San Joaquin": "san_joaquin_raw.csv",
    "Yolo":        "yolo_raw.csv",
    "Grant":       "grant_raw.csv"
}

frames = []
for county,file in RAW_FILES.items():
    df = pd.read_csv(file)
    df["county"] = county
    frames.append(df)

df = pd.concat(frames, ignore_index=True)
df.head()



Unnamed: 0,parcel_id,owner_first,owner_last,property_address,sale_price,sale_date,tax_status,county
0,SJ-1001,John,Smith,123 River Rd,350000,2021-01-10,PAID,San Joaquin
1,SJ-1002,Emily,Brown,88 Palm St,420000,2020-09-13,UNPAID,San Joaquin
2,SJ-1003,Michael,Clark,67 Maple Ave,389500,2022-03-18,PAID,San Joaquin
3,SJ-1004,Olivia,Lopez,45 Lake Blvd,512000,2019-06-29,PAID,San Joaquin
4,SJ-1005,Raj,Kumar,19 Sunset Dr,305000,2018-07-14,EXEMPT,San Joaquin


In [29]:
#Standardization
df["parcel_id"] = df["parcel_id"].astype(str).str.upper().str.strip()

df["owner_name"] = (
    df["owner_first"].fillna("").str.upper().str.strip()
    + " "
    + df["owner_last"].fillna("").str.upper().str.strip()
).str.strip()

df["address"] = df["property_address"].str.title()


In [30]:
#Clean Price + Date + Tax Status
def clean_price(x):
    try: return float(str(x).replace(",","").replace("$","").strip())
    except: return None

df["sale_price_num"] = df["sale_price"].apply(clean_price)
df["sale_date_parsed"] = pd.to_datetime(df["sale_date"], errors="coerce")
df["tax_status_clean"] = df["tax_status"].str.upper().str.strip()


In [31]:
#Validation Rules
rule_parcel_ok = df["parcel_id"] != ""
rule_price_ok = df["sale_price_num"].notna()
rule_date_ok = df["sale_date_parsed"].notna()
dup_mask = df.duplicated(subset=["county","parcel_id"], keep=False)

df["reason"] = ""
df.loc[~rule_parcel_ok, "reason"] += "Missing parcel_id; "
df.loc[~rule_price_ok,  "reason"] += "Invalid sale_price; "
df.loc[~rule_date_ok,   "reason"] += "Invalid sale_date; "
df.loc[dup_mask,        "reason"] += "Duplicate parcel_id; "


In [32]:
#Split Clean vs Rejected
valid_mask = rule_parcel_ok & rule_price_ok & rule_date_ok & ~dup_mask

clean_df  = df[valid_mask].copy()
reject_df = df[~valid_mask].copy()

clean_df.head(), reject_df.head()


(  parcel_id owner_first owner_last property_address  sale_price   sale_date  \
 0   SJ-1001        John      Smith     123 River Rd      350000  2021-01-10   
 1   SJ-1002       Emily      Brown       88 Palm St      420000  2020-09-13   
 2   SJ-1003     Michael      Clark     67 Maple Ave      389500  2022-03-18   
 3   SJ-1004      Olivia      Lopez     45 Lake Blvd      512000  2019-06-29   
 4   SJ-1005         Raj      Kumar     19 Sunset Dr      305000  2018-07-14   
 
   tax_status       county     owner_name       address  sale_price_num  \
 0       PAID  San Joaquin     JOHN SMITH  123 River Rd        350000.0   
 1     UNPAID  San Joaquin    EMILY BROWN    88 Palm St        420000.0   
 2       PAID  San Joaquin  MICHAEL CLARK  67 Maple Ave        389500.0   
 3       PAID  San Joaquin   OLIVIA LOPEZ  45 Lake Blvd        512000.0   
 4     EXEMPT  San Joaquin      RAJ KUMAR  19 Sunset Dr        305000.0   
 
   sale_date_parsed tax_status_clean reason  
 0       2021-01-10 

In [35]:
#Insert Clean data Into SQL
insert_clean = """
INSERT INTO property_master(county, parcel_id, owner_name, address, sale_price, sale_date, tax_status)
VALUES (%s,%s,%s,%s,%s,%s,%s)
ON DUPLICATE KEY UPDATE
    owner_name=VALUES(owner_name),
    address=VALUES(address),
    sale_price=VALUES(sale_price),
    sale_date=VALUES(sale_date),
    tax_status=VALUES(tax_status);
"""


for _, row in clean_df.iterrows():
    cursor.execute(insert_clean, (
        row["county"], row["parcel_id"], row["owner_name"], row["address"],
        row["sale_price_num"], str(row["sale_date_parsed"].date()),
        row["tax_status_clean"]
    ))
conn.commit()

print("Clean Data Inserted Successfully")


Clean Data Inserted Successfully


In [26]:
#Insert Rejected Data Into SQL
insert_reject = """
INSERT INTO rejected_records(county, parcel_id, owner_name, address, sale_price, sale_date, tax_status, reason)
VALUES (%s,%s,%s,%s,%s,%s,%s,%s)
"""

for _, row in reject_df.iterrows():
    cursor.execute(insert_reject, (
        row["county"], row["parcel_id"], row["owner_name"], row["address"],
        row["sale_price"], row["sale_date"],
        row["tax_status"], row["reason"]
    ))

conn.commit()
cursor.close()
conn.close()

print("Rejected Data Inserted Successfully")
print("Data Migration Completed ðŸš€")


Rejected Data Inserted Successfully
Data Migration Completed ðŸš€
