In [10]:

import io
import boto3
import pandas as pd

# ----------------------------
# CONFIG (edit these)
# ----------------------------
BUCKET = "nyc-aws-bucket-sumanth-1325"
KEY = "raw_data/reference/taxi_zone_lookup.csv"   # example path in S3

OUT_PREFIX = "validated_data/reference/"  # where to save results (optional)

# ----------------------------
# 1) Read CSV from S3 into pandas
# ----------------------------
s3 = boto3.client("s3")
obj = s3.get_object(Bucket=BUCKET, Key=KEY)
df = pd.read_csv(io.BytesIO(obj["Body"].read()))

print("Rows:", len(df))
print("Columns:", df.columns.tolist())
df.head()


Rows: 265
Columns: ['LocationID', 'Borough', 'Zone', 'service_zone']


Unnamed: 0,LocationID,Borough,Zone,service_zone
0,1,EWR,Newark Airport,EWR
1,2,Queens,Jamaica Bay,Boro Zone
2,3,Bronx,Allerton/Pelham Gardens,Boro Zone
3,4,Manhattan,Alphabet City,Yellow Zone
4,5,Staten Island,Arden Heights,Boro Zone


In [2]:
# Standardize column names (NYC taxi zone lookup uses these)
# LocationID, Borough, Zone, service_zone

df["zone_id"] = df["LocationID"]
df["borough"] = df["Borough"].astype(str)
df["zone_name"] = df["Zone"].astype(str)
df["service_zone"] = df["service_zone"].astype(str)

# Normalization for matching (lowercase, trim, collapse spaces)
def norm(s: pd.Series) -> pd.Series:
    return (
        s.astype(str)
         .str.lower()
         .str.strip()
         .str.replace(r"\s+", " ", regex=True)
    )

df["zone_norm"] = norm(df["zone_name"])
df["borough_norm"] = norm(df["borough"])

# Optional: treat common 'nan' strings as missing
df["zone_norm"] = df["zone_norm"].replace({"nan": None, "none": None, "": None})
df["borough_norm"] = df["borough_norm"].replace({"nan": None, "none": None, "": None})


In [6]:
dup_by_id = df[df.duplicated(subset=["zone_id"], keep=False)].sort_values("zone_id")
print("Duplicate zone_id rows:", len(dup_by_id))

dup_by_id_view = dup_by_id[["zone_id", "zone_name", "borough", "service_zone"]]
dup_by_id_view.head(20)


Duplicate zone_id rows: 0


Unnamed: 0,zone_id,zone_name,borough,service_zone


In [7]:
dup_by_nat = df[df.duplicated(subset=["zone_norm", "borough_norm"], keep=False)] \
            .sort_values(["borough_norm", "zone_norm"])

print("Duplicate (zone, borough) rows:", len(dup_by_nat))

dup_by_nat_view = dup_by_nat[["zone_id", "zone_name", "borough", "service_zone"]]
dup_by_nat_view.head(20)


Duplicate (zone, borough) rows: 5


Unnamed: 0,zone_id,zone_name,borough,service_zone
102,103,Governor's Island/Ellis Island/Liberty Island,Manhattan,Yellow Zone
103,104,Governor's Island/Ellis Island/Liberty Island,Manhattan,Yellow Zone
104,105,Governor's Island/Ellis Island/Liberty Island,Manhattan,Yellow Zone
55,56,Corona,Queens,Boro Zone
56,57,Corona,Queens,Boro Zone


In [11]:
def upload_df_to_s3_csv(dataframe: pd.DataFrame, bucket: str, key: str) -> None:
    csv_buf = io.StringIO()
    dataframe.to_csv(csv_buf, index=False)
    s3.put_object(
        Bucket=bucket,
        Key=key,
        Body=csv_buf.getvalue().encode("utf-8"),
        ContentType="text/csv"
    )

# Output keys
dup_by_id_key = f"{OUT_PREFIX}dup_taxi_zones_by_zone_id.csv"
dup_by_nat_key = f"{OUT_PREFIX}dup_taxi_zones_by_zone_borough.csv"

upload_df_to_s3_csv(dup_by_id_view, BUCKET, dup_by_id_key)
upload_df_to_s3_csv(dup_by_nat_view, BUCKET, dup_by_nat_key)

print("Uploaded duplicate reports to:")
print(f" - s3://{BUCKET}/{dup_by_id_key}")
print(f" - s3://{BUCKET}/{dup_by_nat_key}")


Uploaded duplicate reports to:
 - s3://nyc-aws-bucket-sumanth-1325/validated_data/reference/dup_taxi_zones_by_zone_id.csv
 - s3://nyc-aws-bucket-sumanth-1325/validated_data/reference/dup_taxi_zones_by_zone_borough.csv


In [16]:
summary = pd.DataFrame([{
    "run_ts": pd.Timestamp.utcnow().isoformat(),
    "bucket": BUCKET,
    "source_key": KEY,
    "dup_by_zone_id_rows": len(dup_by_id_view),
    "dup_by_zone_borough_rows": len(dup_by_nat_view)
}])

summary_key = f"{OUT_PREFIX}matching_summary.csv"
upload_df_to_s3_csv(summary, BUCKET, summary_key)

summary


Unnamed: 0,run_ts,bucket,source_key,dup_by_zone_id_rows,dup_by_zone_borough_rows
0,2026-01-03T06:46:57.969498+00:00,nyc-aws-bucket-sumanth-1325,raw_data/reference/taxi_zone_lookup.csv,0,5


In [18]:
import numpy as np

# ----------------------------
# CONFIG
# ----------------------------
MASTER_PREFIX = "master_data/reference/"  # write golden record here
VALID_BOROUGHS = {"Manhattan", "Queens", "Brooklyn", "Bronx", "Staten Island", "EWR", "Unknown"}

def clean_str(x):
    if pd.isna(x):
        return None
    x = str(x).strip()
    return x if x != "" and x.lower() not in {"nan", "none"} else None

# Rename columns to match expected names
df = df.rename(columns={"LocationID": "zone_id", "Zone": "zone_name", "Borough": "borough"})

# Ensure clean
df["zone_name"] = df["zone_name"].apply(clean_str)
df["borough"] = df["borough"].apply(clean_str)
df["service_zone"] = df["service_zone"].apply(clean_str)

# Helper: choose best string by length (survivorship rule)
def pick_longest(values):
    vals = [v for v in values if v is not None]
    if not vals:
        return None
    # longest wins; deterministic tie-breaker: alphabetical
    vals_sorted = sorted(vals, key=lambda s: (len(s), s), reverse=True)
    return vals_sorted[0]

# Helper: choose most frequent non-null value
def pick_mode(values):
    vals = [v for v in values if v is not None]
    if not vals:
        return None
    return pd.Series(vals).mode().iloc[0]

golden_rows = []
exceptions = []

for zone_id, g in df.groupby("zone_id", dropna=False):
    # Skip null zone_id as exception
    if pd.isna(zone_id):
        exceptions.append({
            "domain": "taxi_zone",
            "entity_key": "NULL",
            "exception_type": "INVALID",
            "details": "zone_id is NULL",
            "severity": "HIGH",
            "status": "OPEN"
        })
        continue

    # Candidate values
    zone_candidates = list(g["zone_name"])
    borough_candidates = list(g["borough"])
    service_candidates = list(g["service_zone"])

    # Survivorship: zone_name = longest
    zone_best = pick_longest(zone_candidates)

    # Survivorship with governance: borough must be valid
    borough_valid = [b for b in borough_candidates if b in VALID_BOROUGHS]
    borough_unique_valid = sorted(set(borough_valid))

    # Conflict exception: same zone_id maps to multiple valid boroughs
    if len(borough_unique_valid) > 1:
        exceptions.append({
            "domain": "taxi_zone",
            "entity_key": str(int(zone_id)),
            "exception_type": "CONFLICT",
            "details": f"Multiple valid boroughs found for same zone_id: {borough_unique_valid}",
            "severity": "HIGH",
            "status": "OPEN"
        })
        # choose deterministic borough anyway (so analytics can proceed), but flag it
        borough_best = borough_unique_valid[0]
    else:
        borough_best = borough_unique_valid[0] if borough_unique_valid else pick_mode(borough_candidates)

    # Survivorship: service_zone = most frequent non-null
    service_best = pick_mode(service_candidates)

    golden_rows.append({
        "zone_id": int(zone_id),
        "zone_name": zone_best,
        "borough": borough_best,
        "service_zone": service_best,
        "source_system": "taxi_zone_lookup",
        "version": 1,
        "created_by": "mdm_pipeline",
        "updated_by": "mdm_pipeline",
        "approved_by": None
    })

golden_df = pd.DataFrame(golden_rows).sort_values("zone_id")
exceptions_df = pd.DataFrame(exceptions)

print("Golden records:", len(golden_df))
print("Exceptions:", len(exceptions_df))

golden_df.head()


Golden records: 265
Exceptions: 0


Unnamed: 0,zone_id,zone_name,borough,service_zone,source_system,version,created_by,updated_by,approved_by
0,1,Newark Airport,EWR,EWR,taxi_zone_lookup,1,mdm_pipeline,mdm_pipeline,
1,2,Jamaica Bay,Queens,Boro Zone,taxi_zone_lookup,1,mdm_pipeline,mdm_pipeline,
2,3,Allerton/Pelham Gardens,Bronx,Boro Zone,taxi_zone_lookup,1,mdm_pipeline,mdm_pipeline,
3,4,Alphabet City,Manhattan,Yellow Zone,taxi_zone_lookup,1,mdm_pipeline,mdm_pipeline,
4,5,Arden Heights,Staten Island,Boro Zone,taxi_zone_lookup,1,mdm_pipeline,mdm_pipeline,


In [19]:
def upload_df_to_s3_csv(dataframe: pd.DataFrame, bucket: str, key: str) -> None:
    csv_buf = io.StringIO()
    dataframe.to_csv(csv_buf, index=False)
    s3.put_object(
        Bucket=bucket,
        Key=key,
        Body=csv_buf.getvalue().encode("utf-8"),
        ContentType="text/csv"
    )

golden_key = f"{MASTER_PREFIX}taxi_zone_golden.csv"


upload_df_to_s3_csv(golden_df, BUCKET, golden_key)


print("Uploaded:")
print(f" - s3://{BUCKET}/{golden_key}")



Uploaded:
 - s3://nyc-aws-bucket-sumanth-1325/master_data/reference/taxi_zone_golden.csv


In [20]:
pip install psycopg2-binary


Collecting psycopg2-binary
  Downloading psycopg2_binary-2.9.11-cp311-cp311-win_amd64.whl.metadata (5.1 kB)
Downloading psycopg2_binary-2.9.11-cp311-cp311-win_amd64.whl (2.7 MB)
   ---------------------------------------- 0.0/2.7 MB ? eta -:--:--
   ---------------------------------------- 0.0/2.7 MB ? eta -:--:--
   ---------------------------------------- 0.0/2.7 MB 330.3 kB/s eta 0:00:09
   ---------------------------------------- 0.0/2.7 MB 330.3 kB/s eta 0:00:09
   -- ------------------------------------- 0.2/2.7 MB 1.3 MB/s eta 0:00:02
   ---------------------------- ----------- 2.0/2.7 MB 10.4 MB/s eta 0:00:01
   ---------------------------------------- 2.7/2.7 MB 12.3 MB/s eta 0:00:00
Installing collected packages: psycopg2-binary
Successfully installed psycopg2-binary-2.9.11
Note: you may need to restart the kernel to use updated packages.



[notice] A new release of pip is available: 24.0 -> 25.3
[notice] To update, run: python.exe -m pip install --upgrade pip
