# The section below uses cuDF to process and clean the data

In [1]:
import pandas as pd #pandas uses cpu to process data
import cudf #cudf uses gpu to process data (RTX 5090 & RTX 4070Ti Dual Card + CUDA 12.8 + Python 3.11.12 + lastest Nvidia Driver)


In [2]:
gdf = cudf.read_csv("/home/jiajun_li/Documents/MDPROJ/Housing_Maintenance_Code_Violations_20250427.csv")

In [3]:
print(gdf.columns)

Index(['ViolationID', 'BuildingID', 'RegistrationID', 'BoroID', 'Borough',
       'HouseNumber', 'LowHouseNumber', 'HighHouseNumber', 'StreetName',
       'StreetCode', 'Postcode', 'Apartment', 'Story', 'Block', 'Lot', 'Class',
       'InspectionDate', 'ApprovedDate', 'OriginalCertifyByDate',
       'OriginalCorrectByDate', 'NewCertifyByDate', 'NewCorrectByDate',
       'CertifiedDate', 'OrderNumber', 'NOVID', 'NOVDescription',
       'NOVIssuedDate', 'CurrentStatusID', 'CurrentStatus',
       'CurrentStatusDate', 'NovType', 'ViolationStatus', 'RentImpairing',
       'Latitude', 'Longitude', 'CommunityBoard', 'CouncilDistrict',
       'CensusTract', 'BIN', 'BBL', 'NTA'],
      dtype='object')


In [4]:
#Drop columns that are not needed
cols_drop = [
    'RegistrationID',
    'LowHouseNumber',
    'HighHouseNumber',
    'CertifiedDate',
    'OrderNumber',
    'NOVID',
    'NOVIssuedDate',
    'CurrentStatusID',
    'CouncilDistrict',
    'CensusTract',
    'BIN',
    'BBL',
    'Block',
    'Lot',
    'OriginalCertifyByDate',
    'OriginalCorrectByDate',
    'NewCertifyByDate',
    'NewCorrectByDate',
    'Latitude',
    'Longitude',
    'CommunityBoard',
]
gdf = gdf.drop(columns = cols_drop)
print(gdf.columns)

Index(['ViolationID', 'BuildingID', 'BoroID', 'Borough', 'HouseNumber',
       'StreetName', 'StreetCode', 'Postcode', 'Apartment', 'Story', 'Class',
       'InspectionDate', 'ApprovedDate', 'NOVDescription', 'CurrentStatus',
       'CurrentStatusDate', 'NovType', 'ViolationStatus', 'RentImpairing',
       'NTA'],
      dtype='object')


In [5]:
print(count := gdf.shape[1])

20


In [6]:
print(gdf.shape)
print(gdf.dtypes)

#find the number of unique values in each column
for col in gdf.columns:
    print(f"{col}: {gdf[col].nunique()}")

(9936776, 20)
ViolationID           int64
BuildingID            int64
BoroID                int64
Borough              object
HouseNumber          object
StreetName           object
StreetCode            int64
Postcode              int64
Apartment            object
Story                object
Class                object
InspectionDate       object
ApprovedDate         object
NOVDescription       object
CurrentStatus        object
CurrentStatusDate    object
NovType              object
ViolationStatus      object
RentImpairing        object
NTA                  object
dtype: object
ViolationID: 9936776
BuildingID: 230898
BoroID: 5
Borough: 5
HouseNumber: 21940
StreetName: 6039
StreetCode: 5648
Postcode: 220
Apartment: 14633
Story: 250
Class: 4
InspectionDate: 18101
ApprovedDate: 20287
NOVDescription: 6110015
CurrentStatus: 23
CurrentStatusDate: 10905
NovType: 2
ViolationStatus: 2
RentImpairing: 2
NTA: 203


In [7]:
print(gdf["Postcode"])

0          11206
1          11206
2          11206
3          11206
4          11206
           ...  
9936771    11238
9936772    11238
9936773    11226
9936774    11230
9936775    11230
Name: Postcode, Length: 9936776, dtype: int64


In [8]:
import pandas as pd
#todo data cast that is compatible with postgres


df = gdf.copy().to_pandas()
#Convert the data types to be compatible with PostgreSQL
int_cols = ['ViolationID', 
            'BuildingID', 
            'BoroID',
            'StreetCode',
            'Postcode'
                              ]
for _ in int_cols:
    df[_] = pd.to_numeric(df[_], errors = 'coerce').astype('Int64')

In [35]:
print(df[['building_id']].nunique())

building_id    230898
dtype: int64


In [9]:
print(df['ViolationID'].dtypes)
print(df['ViolationID'].isnull().sum())
print(df['Postcode'].dtypes)
print(df['Postcode'].isnull().sum())

Int64
0
Int64
8253


In [10]:

#todo Convert date columns to datetime
date_cols = ['InspectionDate','ApprovedDate','CurrentStatusDate']
for _ in date_cols:
    df[_] = pd.to_datetime(df[_], errors = 'coerce')

In [11]:
for _ in date_cols:
    print(df[_].dtypes)
    print(df[_].head(10))

datetime64[ns]
0   2013-12-30
1   2014-07-02
2   2014-07-02
3   2014-07-02
4   2014-07-02
5   2015-03-02
6   2015-06-30
7   2015-06-30
8   2015-08-31
9   2013-10-21
Name: InspectionDate, dtype: datetime64[ns]
datetime64[ns]
0   2014-01-04
1   2014-07-07
2   2014-07-07
3   2014-07-07
4   2014-07-07
5   2015-03-02
6   2015-07-03
7   2015-07-03
8   2015-09-01
9   2013-10-22
Name: ApprovedDate, dtype: datetime64[ns]
datetime64[ns]
0   2014-01-25
1   2015-08-09
2   2015-08-09
3   2015-08-09
4   2015-08-09
5   2015-03-16
6   2016-01-06
7   2016-01-06
8   2015-09-02
9   2023-09-10
Name: CurrentStatusDate, dtype: datetime64[ns]


In [12]:
text_cols = [
    'Borough','HouseNumber','StreetName',
    'Apartment','Story','Class','NOVDescription',
    'CurrentStatus','NovType','ViolationStatus','NTA'
]

for _ in text_cols:
     df[_] = df[_].where(df[_].notna(), pd.NA).astype('string')

In [13]:
class_dict = {
    'A': 'Non-hazardous',
    'B': 'Hazardous',
    'C': 'Immediately hazardous',
    'I': 'Order to repair/vacate'
}
df['ClassDescription'] = df['Class'].map(class_dict)

In [14]:
df['RentImpairing'] = (
    df['RentImpairing']
      .map({'Yes': True, 'No': False, 'Y': True, 'N': False})
      .astype('boolean')
)

In [15]:
print(df.shape)
print(df.dtypes)
df['ClassDescription'] = df['ClassDescription'].astype('string')
print(df['ClassDescription'].isnull().sum())
print(df['ClassDescription'].dtypes)

(9936776, 21)
ViolationID                   Int64
BuildingID                    Int64
BoroID                        Int64
Borough              string[python]
HouseNumber          string[python]
StreetName           string[python]
StreetCode                    Int64
Postcode                      Int64
Apartment            string[python]
Story                string[python]
Class                string[python]
InspectionDate       datetime64[ns]
ApprovedDate         datetime64[ns]
NOVDescription       string[python]
CurrentStatus        string[python]
CurrentStatusDate    datetime64[ns]
NovType              string[python]
ViolationStatus      string[python]
RentImpairing               boolean
NTA                  string[python]
ClassDescription             object
dtype: object
0
string


In [None]:
import psycopg

DB_HOST = "localhost"
DB_PORT = '5432'
DB_NAME = "housing_maintenance_cv"
DB_USER = "postgres"
DB_PASS = "12345678"

def main():
    # Connect to PostgreSQL
    conn = psycopg.connect(
    host = DB_HOST,
    port = DB_PORT,
    dbname = DB_NAME,
    user = DB_USER,
    password = DB_PASS)

    cur = conn.cursor()

    # For replicability
    cur.execute("DROP TABLE IF EXISTS borough CASCADE;")
    conn.commit()

    cur.execute("DROP TABLE IF EXISTS building CASCADE;")
    conn.commit()

    cur.execute("DROP TABLE IF EXISTS nta CASCADE;")
    conn.commit()

    cur.execute("DROP TABLE IF EXISTS violation CASCADE;")
    conn.commit()

    cur.execute("DROP TABLE IF EXISTS violation_class CASCADE;")
    conn.commit()
 

    borough = """ CREATE TABLE borough (
                  borough_id BIGINT PRIMARY KEY,
                  borough_name TEXT NOT NULL
                  );
              """
    
    violation_class = """ CREATE TABLE violation_class (
                          class_type VARCHAR(1) PRIMARY KEY,
                          type_description VARCHAR(100) NOT NULL         
                          );
                      """


    nta = """ CREATE TABLE nta (
              nta_id BIGSERIAL PRIMARY KEY,
              nta_name TEXT NOT NULL,
              borough_id BIGINT NOT NULL,
              FOREIGN KEY (borough_id) REFERENCES borough(borough_id)
              );
          """

    building = """ CREATE TABLE building (
                   building_id BIGINT PRIMARY KEY,
                   house_number TEXT,
                   street_name TEXT,
                   street_code BIGINT,
                   postcode BIGINT,
                   nta_id BIGINT,
                   borough_id BIGINT NOT NULL,
                   FOREIGN KEY (borough_id) REFERENCES borough(borough_id),
                   FOREIGN KEY (nta_id) REFERENCES nta(nta_id)
                   );
               """

    violation = """ CREATE TABLE violation (
                    violation_id BIGINT PRIMARY KEY,
                    inspection_date DATE,
                    approved_date DATE,
                    nov_description TEXT,
                    current_status TEXT,
                    current_status_date DATE,
                    nov_type TEXT,
                    violation_status TEXT,
                    rent_impairing BOOLEAN,
                    apartment TEXT,
                    story TEXT,
                    building_id BIGINT NOT NULL,
                    class_type VARCHAR(1) NOT NULL,
                    FOREIGN KEY (building_id) REFERENCES building(building_id),
                    FOREIGN KEY (class_type) REFERENCES violation_class(class_type)
                    );
                """



    cur.execute(borough)
    cur.execute(violation_class)
    cur.execute(nta)
    cur.execute(building)
    cur.execute(violation)
    conn.commit()

if __name__ == "__main__":
    main()

In [17]:
DB_HOST = "localhost"
DB_PORT = '5432'
DB_NAME = "housing_maintenance_cv"
DB_USER = "postgres"
DB_PASS = "12345678"

In [None]:
df = df.rename(columns={
    'ViolationID'         : 'violation_id',
    'BuildingID'          : 'building_id',
    'BoroID'              : 'borough_id',
    'Class'               : 'class_type',
    'InspectionDate'      : 'inspection_date',
    'ApprovedDate'        : 'approved_date',
    'CurrentStatusDate'   : 'current_status_date',
    'NOVDescription'      : 'nov_description',
    'NovType'             : 'nov_type',
    'ViolationStatus'     : 'violation_status',
    'RentImpairing'       : 'rent_impairing',
    'NTA'                 : 'nta_name',
    'StreetCode'          : 'street_code',
    'Postcode'            : 'postcode',
    'HouseNumber'         : 'house_number',
    'StreetName'          : 'street_name',
    'Apartment'           : 'apartment',
    'Story'               : 'story',
    'CurrentStatus'       : 'current_status',
    'ClassDescription'    : 'class_description',
    'Borough'             : 'borough_name'
    
})

In [19]:
print(df.columns)

Index(['violation_id', 'building_id', 'borough_id', 'borough_name',
       'house_number', 'street_name', 'street_code', 'postcode', 'apartment',
       'story', 'class_type', 'inspection_date', 'approved_date',
       'nov_description', 'current_status', 'current_status_date', 'nov_type',
       'violation_status', 'rent_impairing', 'nta_name', 'class_description'],
      dtype='object')


In [32]:
#populate the borough table
import psycopg2
unique_boros = (
    df[['borough_id','borough_name']]
      .dropna(subset=['borough_id','borough_name'])
      .drop_duplicates()
      .values
)

conn = psycopg2.connect(
    host = DB_HOST,
    port = DB_PORT,
    dbname = DB_NAME,
    user = DB_USER,
    password = DB_PASS
)
cur = conn.cursor()

insert_sql = """
  INSERT INTO borough (borough_id, borough_name)
  VALUES (%s, %s)
  ON CONFLICT (borough_id) DO NOTHING;
"""
cur.executemany(insert_sql, unique_boros.tolist())
conn.commit()


In [33]:
#populate the nta table
conn = psycopg2.connect(
    host = DB_HOST,
    port = DB_PORT,
    dbname = DB_NAME,
    user = DB_USER,
    password = DB_PASS
)
cur = conn.cursor()
unique_ntas = (
    df[['nta_name','borough_id']]
      .dropna(subset=['nta_name','borough_id'])
      .drop_duplicates()
)
records = [
    (row.nta_name, int(row.borough_id))
    for row in unique_ntas.itertuples(index=False)
]

insert_sql = """
  INSERT INTO nta (nta_name, borough_id)
  VALUES (%s, %s)
"""
cur.executemany(insert_sql, records)
conn.commit()

In [None]:
#populate the building table
import psycopg2
conn = psycopg2.connect(
    host = DB_HOST,
    port = DB_PORT,
    dbname = DB_NAME,
    user = DB_USER,
    password = DB_PASS
)
cur = conn.cursor()

# Get NTA mappings
cur.execute("SELECT nta_id, nta_name FROM nta")
nta_map = {nta_name: nta_id for nta_id, nta_name in cur.fetchall()}

# Extract all buildings with required fields
buildings = (
    df[['building_id', 'house_number', 'street_name', 'street_code', 
        'postcode', 'nta_name', 'borough_id']]
    .dropna(subset=['building_id', 'borough_id'])  
    .drop_duplicates(subset=['building_id'])
    .copy()
)

# Map NTA names to IDs, leaving NULL where no match exists
buildings['nta_fk'] = buildings['nta_name'].map(nta_map)

# Build records with nullable NTA foreign keys
records = []
for row in buildings.itertuples(index=False):
    records.append((
        int(row.building_id),
        row.house_number,
        row.street_name,
        int(row.street_code) if pd.notna(row.street_code) else None,
        int(row.postcode) if pd.notna(row.postcode) else None,
        int(row.nta_fk) if pd.notna(row.nta_fk) else None,
        int(row.borough_id) 
    ))

# Insert buildings with ON CONFLICT handling
insert_sql = """
  INSERT INTO building (
    building_id,
    house_number,
    street_name,
    street_code,
    postcode,
    nta_id,
    borough_id
  ) VALUES (%s, %s, %s, %s, %s, %s, %s)
  ON CONFLICT (building_id) DO NOTHING;
"""
cur.executemany(insert_sql, records)
conn.commit()

In [22]:
#populate the violation_class table
conn = psycopg2.connect(
    host = DB_HOST,
    port = DB_PORT,
    dbname = DB_NAME,
    user = DB_USER,
    password = DB_PASS
)
cur = conn.cursor()
unique_classes = (
    df[['class_type','class_description']]
      .dropna(subset=['class_type','class_description'])
      .drop_duplicates()
)

records = [
    (row.class_type, row.class_description)
    for row in unique_classes.itertuples(index=False)
]

insert_sql = """
  INSERT INTO violation_class (class_type, type_description)
  VALUES (%s, %s)
  ON CONFLICT (class_type) DO NOTHING;
"""
cur.executemany(insert_sql, records)
conn.commit()

In [38]:
#populate the violation_class table
conn = psycopg2.connect(
    host = DB_HOST,
    port = DB_PORT,
    dbname = DB_NAME,
    user = DB_USER,
    password = DB_PASS
)
cur = conn.cursor()
unique_classes = (
    df[['class_type','class_description']]
      .dropna(subset=['class_type','class_description'])
      .drop_duplicates()
)

records = [
    (row.class_type, row.class_description)
    for row in unique_classes.itertuples(index=False)
]

# Insert with conflict handling
insert_sql = """
  INSERT INTO violation_class (class_type, type_description)
  VALUES (%s, %s)
  ON CONFLICT (class_type) DO NOTHING;
"""
cur.executemany(insert_sql, records)
conn.commit()

# Build records with nullable NTA foreign keys


In [40]:
#populate the violation table
print(df[['class_type']].isnull().sum())
print(df[['building_id']].isnull().sum())

class_type    0
dtype: int64
building_id    0
dtype: int64


In [44]:
violation = """ CREATE TABLE violation (
                    violation_id BIGINT PRIMARY KEY,
                    inspection_date DATE,
                    approved_date DATE,
                    nov_description TEXT,
                    current_status TEXT,
                    current_status_date DATE,
                    nov_type TEXT,
                    violation_status TEXT,
                    rent_impairing BOOLEAN,
                    apartment TEXT,
                    story TEXT,
                    building_id BIGINT NOT NULL,
                    class_type VARCHAR(1) NOT NULL,
                    FOREIGN KEY (building_id) REFERENCES building(building_id),
                    FOREIGN KEY (class_type) REFERENCES violation_class(class_type)
                    );
                """
cur.execute(violation)
conn.commit()

In [None]:
import psycopg2
from psycopg2.extras import execute_values
import time

conn = psycopg2.connect(
    host = DB_HOST,
    port = DB_PORT,
    dbname = DB_NAME,
    user = DB_USER,
    password = DB_PASS
)
conn.autocommit = False
cur = conn.cursor()

# Get existing building IDs for faster lookup (in-memory)
cur.execute("SELECT building_id FROM building")
valid_building_ids = set([row[0] for row in cur.fetchall()])

# Get valid class types
cur.execute("SELECT class_type FROM violation_class")
valid_class_types = set([row[0] for row in cur.fetchall()])

# Filter violations that have valid foreign keys
valid_violations = df[
    df['building_id'].isin(valid_building_ids) & 
    df['class_type'].isin(valid_class_types) &
    df['violation_id'].notna()
].copy()

print(f"Total violations: {len(df)}")
print(f"Valid violations for insert: {len(valid_violations)}")

# Configure batch size
BATCH_SIZE = 10000
total_batches = (len(valid_violations) // BATCH_SIZE) + 1

# Prepare SQL statement
insert_sql = """
    INSERT INTO violation (
        violation_id, inspection_date, approved_date, nov_description,
        current_status, current_status_date, nov_type, violation_status,
        rent_impairing, apartment, story, building_id, class_type
    ) VALUES %s
    ON CONFLICT (violation_id) DO NOTHING;
"""

start_time = time.time()
rows_inserted = 0

try:
    # Process in batches
    for batch_num in range(total_batches):
        start_idx = batch_num * BATCH_SIZE
        end_idx = min(start_idx + BATCH_SIZE, len(valid_violations))
        
        if start_idx >= end_idx:
            break
            
        batch = valid_violations.iloc[start_idx:end_idx]
        
        # Convert to list of tuples for insertion
        records = []
        for row in batch.itertuples(index=False):
            records.append((
                int(row.violation_id),
                #replace NaN and NAT
                None if pd.isna(row.inspection_date) else row.inspection_date,
                None if pd.isna(row.approved_date) else row.approved_date,
                None if pd.isna(row.nov_description) else row.nov_description,
                None if pd.isna(row.current_status) else row.current_status,
                None if pd.isna(row.current_status_date) else row.current_status_date,
                None if pd.isna(row.nov_type) else row.nov_type,
                None if pd.isna(row.violation_status) else row.violation_status,
                bool(row.rent_impairing) if pd.notna(row.rent_impairing) else None,
                None if pd.isna(row.apartment) else row.apartment,
                None if pd.isna(row.story) else row.story,
                int(row.building_id),
                row.class_type
            ))
        
        # Use execute_values for faster batch insert
        execute_values(cur, insert_sql, records, template=None, page_size=BATCH_SIZE)
        
        # Commit each batch
        conn.commit()
        
        rows_inserted += len(records)
        elapsed = time.time() - start_time
        rate = rows_inserted / elapsed if elapsed > 0 else 0
        
        # Progress update every 10 batches or at the end
        if batch_num % 10 == 0 or batch_num == total_batches - 1:
            print(f"Batch {batch_num+1}/{total_batches}: Inserted {rows_inserted:,} rows " 
                  f"({rows_inserted/len(valid_violations)*100:.1f}%) - "
                  f"Rate: {rate:.0f} rows/sec")
    
    print(f"\nCompleted: {rows_inserted:,} rows inserted in {time.time() - start_time:.1f} seconds")

except Exception as e:
    conn.rollback()
    print(f"Error: {e}")

Total violations: 9936776
Valid violations for insert: 9936776
Batch 1/994: Inserted 10,000 rows (0.1%) - Rate: 22080 rows/sec
Batch 11/994: Inserted 110,000 rows (1.1%) - Rate: 32338 rows/sec
Batch 21/994: Inserted 210,000 rows (2.1%) - Rate: 32042 rows/sec
Batch 31/994: Inserted 310,000 rows (3.1%) - Rate: 32580 rows/sec
Batch 41/994: Inserted 410,000 rows (4.1%) - Rate: 32473 rows/sec
Batch 51/994: Inserted 510,000 rows (5.1%) - Rate: 30816 rows/sec
Batch 61/994: Inserted 610,000 rows (6.1%) - Rate: 31139 rows/sec
Batch 71/994: Inserted 710,000 rows (7.1%) - Rate: 31446 rows/sec
Batch 81/994: Inserted 810,000 rows (8.2%) - Rate: 31686 rows/sec
Batch 91/994: Inserted 910,000 rows (9.2%) - Rate: 31880 rows/sec
Batch 101/994: Inserted 1,010,000 rows (10.2%) - Rate: 32045 rows/sec
Batch 111/994: Inserted 1,110,000 rows (11.2%) - Rate: 32073 rows/sec
Batch 121/994: Inserted 1,210,000 rows (12.2%) - Rate: 32152 rows/sec
Batch 131/994: Inserted 1,310,000 rows (13.2%) - Rate: 32171 rows/sec

## Query Buildings by Borough and Neighborhood
This section shows how to look up `borough_id` and `nta_id` from their tables and then retrieve matching records from the `building` table based on user inputs.

In [None]:
import psycopg2
# Example user inputs; replace with dynamic values as needed
input_borough = 'Bronx'
input_neighborhood = 'Fordham'

# Connect to the database
conn = psycopg2.connect(host=DB_HOST, port=DB_PORT, dbname=DB_NAME, user=DB_USER, password=DB_PASS)
cur = conn.cursor()

# Retrieve borough_id
cur.execute('SELECT borough_id FROM borough WHERE borough_name = %s', (input_borough,))
row = cur.fetchone()
borough_id = row[0] if row else None

# Retrieve nta_id filtering by borough
cur.execute('SELECT nta_id FROM nta WHERE nta_name = %s AND borough_id = %s', (input_neighborhood, borough_id))
row = cur.fetchone()
nta_id = row[0] if row else None

# Build and execute building query
query = 'SELECT * FROM building WHERE 1=1'
params = []
if borough_id is not None:
    query += ' AND borough_id = %s'; params.append(borough_id)
if nta_id is not None:
    query += ' AND nta_id = %s'; params.append(nta_id)

cur.execute(query, tuple(params))
buildings = cur.fetchall()
print('Matched Buildings:')
for b in buildings: print(b)

cur.close()
conn.close()