In [26]:
import pandas as pd
# import sqlalchemy
from sqlalchemy import create_engine
import urllib.parse

import configparser
# import os
from pathlib import Path
import numpy as np

import logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

from neo4j import GraphDatabase

import datetime
import os
from typing import Dict
import traceback

In [2]:
import warnings
warnings.filterwarnings('ignore')

In [3]:
config_file_path = './config.ini'

if not Path(config_file_path).exists():
    logger.warning(f"Config file {config_file_path} not found!")

config = configparser.ConfigParser()
config.read(config_file_path)

if 'DATABASE' not in config:
    raise ValueError("DATABASE section not found in config")

db_config = {
    'host': config['DATABASE']['host'],
    'port': int(config['DATABASE']['port']),
    'username': config['DATABASE']['username'],
    'password': config['DATABASE']['password'],
    'database': config['DATABASE']['database'],
    'query_request': config['DATABASE']['query1'],
    'query_assets': config['DATABASE']['query2'],
    'query_request_with_activities': config['DATABASE']['query3'],
    'schema': config['DATABASE']['schema']
}

db_host = db_config.get('host')
db_port = db_config.get('port')
db_username = db_config.get('username')
db_password = db_config.get('password')
db_database = db_config.get('database')
db_query1 = db_config.get('query_request')
db_query2 = db_config.get('query_assets')
db_query3 = db_config.get('query_request_with_activities')
db_schema = db_config.get('schema')

In [4]:
def database_connector( db_type, host, port, database, username, password, **kwargs):
    
        encoded_password = urllib.parse.quote_plus(password)
        connection_strings = {
            'postgresql': f"postgresql://{username}:{encoded_password}@{host}:{port}/{database}",
        }
        return connection_strings[db_type]

In [5]:
conn_string = database_connector(
            db_type='postgresql',
            host=db_host,
            port=db_port,
            database=db_database,
            username=db_username,
            password=db_password,
            schema=db_schema
            )

In [6]:
last_sync_date_time = '2025-11-01 07:00:00.000'
# '2025-11-01 07:00:02.929'

In [7]:
diff_time = pd.to_datetime(last_sync_date_time) + datetime.timedelta(days=1)
upper_bound = pd.to_datetime(diff_time, format='mixed').strftime('%Y-%m-%d %H:%M:%S.%f')[:-3]



In [8]:
upper_bound = '2025-11-02 08:00:00.000'

In [9]:
try:
        engine = create_engine(conn_string)
        
        if not db_query1:
            logger.warning("Query for v_request is missing!")
            
        else:
            db_query1 = db_query1.replace(';',f'\nWHERE "requestCreatedDate" >=\'{last_sync_date_time}\' and "requestCreatedDate" <= \'{upper_bound}\';')
    
        if not db_query2:
            logger.warning("Query for v_assets is missing!")
           
        # else:
        #     self.db_query2 = self.db_query2.replace(';',f'\nWHERE "requestCreatedDate" >=\'{self.last_sync_date_time}\';')
        
        if not db_query3:
            logger.warning("Query for v_request_with_activities is missing!")
            
        else:
            db_query3 = db_query3.replace(';',f'\nWHERE "requestCreatedDate" >=\'{last_sync_date_time}\' and "requestCreatedDate" <= \'{upper_bound}\';')

        # print(self.db_query1)
        # print(self.db_query2)
        # print(self.db_query3)

        df_request = pd.read_sql(db_query1, engine)
        df_assets = pd.read_sql(db_query2, engine)
        # df_assets = pd.read_csv('./fetched_data/v_assets.csv')
        df_request_with_activities = pd.read_sql(db_query3, engine)
        
        logger.info(f" Downloaded {len(df_request)} rows from 'v_requests', {len(df_assets)} rows from 'v_assets', and {len(df_request_with_activities)} rows from 'v_request_with_activities'.")
        
        # Saving the data (Don't save when pushing incrementally!!)
        # # self.df_request.to_csv(f"{target_dir_path}/v_requests.csv",index=False)
        # # self.df_assets.to_csv(f"{target_dir_path}/v_assets.csv",index=False)
        # # self.df_request_with_activities.to_csv(f"{target_dir_path}/v_requests_with_activities.csv",index=False)
        
        # logger.info(f"Data exported to {target_dir_path}")
        
except Exception as e:
    logger.warning(f"Error connecting to database: {e}")
    

finally:
    if 'engine' in locals():
            engine.dispose()


INFO:__main__: Downloaded 1 rows from 'v_requests', 508863 rows from 'v_assets', and 1 rows from 'v_request_with_activities'.


In [33]:
df_request

Unnamed: 0,requestId,requestAlternateId,workType,requestDescription,requestCreatedDate,requestTargetCompletionDate,requestCompletionDate,serviceClassificationId,serviceClassificationAlternateId,serviceClassificationPath,...,locationAlternateId,locationPath,assetId,assetAlternateId,assetDescription,completionNotes,customer,country,isSelfAssign,priorityCode
0,b71e6dbd-4a3f-41f2-ad82-1174c7f8c5b7,RRBC100309,Proactive,TESTING,2025-11-01 16:12:15.788,,,6d4a97a7-f1e4-4c1c-b37a-46a9c8cf35ee,SCRBC100235,Cleaning Services | Carpet Cleaning | Carpet -...,...,LUS508405,"US, AZ, Peoria / RBC-AZ001DR, Peoria-Sun City ...",,,,,Royal Bank of Canada,US,False,


In [34]:
df_request_with_activities

Unnamed: 0,requestId,requestAlternateId,workType,requestDescription,requestCreatedDate,requestTargetCompletionDate,requestCompletionDate,activityId,activityAlternateId,activityDescription,activityStartDate,activityCompletionDate,completionNotes,providertype,customer,country
0,b71e6dbd-4a3f-41f2-ad82-1174c7f8c5b7,RRBC100309,Proactive,TESTING,2025-11-01 16:12:15.788,,,,,,,,,,Royal Bank of Canada,US


In [37]:
is_hvac_df_ = pd.read_csv('./data/hvac_assets/IFM_Assets_RuleBasedEngineResults(IFM_Assets_RuleBasedEngineResul).csv')
suggested_asset_df = pd.read_csv('./data/asset_suggest_data/asset_suggest_model.csv')
vendor_data = pd.read_csv('./data/asset_vendor/request_act_vendor.csv')

In [None]:
# def data_preprocessor(self):
try:
    # Activity:
    activity_df = df_request_with_activities[df_request_with_activities['activityAlternateId'].notna()][['providertype','activityAlternateId','activityDescription']]
    activity_df.drop_duplicates(inplace = True)

    # Asset:
    requests_subset = df_request[['requestId','assetAlternateId', 'requestAlternateId']]
    requests_subset = requests_subset[requests_subset.assetAlternateId.notna()]

    v_assets = df_assets[['assetId','Asset Alt Id', 'Asset Description', 'manufacturer', 'model', 'serialNumber']]
    v_assets = v_assets.merge(requests_subset, left_on = 'Asset Alt Id', right_on = 'assetAlternateId', how= 'left')
    v_assets = v_assets[v_assets['requestAlternateId'].notna()] # keeping only those asset records which are associated to the presently fetched serviceRequests

    is_hvac_df = is_hvac_df_.copy()
    is_hvac_df['is_HVAC'] = True
    is_hvac_df.drop(columns=['Asset Description'], inplace = True)
    v_assets_with_hvac = v_assets.merge(is_hvac_df, on='Asset Alt Id', how='left')

    v_assets_with_hvac.loc[v_assets_with_hvac['is_HVAC'] == True, 'asset_type'] = 'HVAC'

    final_assets_df = v_assets_with_hvac[['assetId', 'Asset Description', 'Asset Alt Id', 'manufacturer', 'model',
                                            'serialNumber', 'is_HVAC', 'asset_type', 'requestId','assetAlternateId', 'requestAlternateId']]
    final_assets_df.loc[:, 'is_HVAC'] = final_assets_df['is_HVAC'].fillna(False)

    suggested_asset_df = suggested_asset_df.copy()
    suggested_asset_df.rename(columns={'asset_id': 'suggested_asset'}, inplace=True)
    suggested_asset_df_subset = suggested_asset_df[['request_id', 'suggested_asset']]

    final_assets_df = final_assets_df.merge(suggested_asset_df_subset, left_on = 'requestId', right_on = 'request_id', how = 'left')
    final_assets_df = final_assets_df[['assetId', 'Asset Description', 'Asset Alt Id', 'manufacturer', 'model',
                                        'serialNumber', 'is_HVAC', 'asset_type', 'suggested_asset','requestAlternateId']]

    vendor_data = vendor_data.copy()
    vendor_data = vendor_data[['requestAlternateId','vendorName', 'vendorAddress1', 'vendorCity',
                        'vendorRegion', 'vendorCountry', 'vendorPostalCode']]
    assets_with_vendors = final_assets_df.merge(vendor_data, on = 'requestAlternateId', how = 'left')
    assets_df = assets_with_vendors[['Asset Description', 'Asset Alt Id', 'manufacturer', 'model','serialNumber', 
                                                'is_HVAC', 'asset_type', 'suggested_asset','vendorName', 'vendorAddress1', 'vendorCity',
                                                'vendorRegion', 'vendorCountry', 'vendorPostalCode']]

    # Country:
    country_df = df_request[['country']].drop_duplicates()

    # Customer:
    customer_df = df_request[['customer']].drop_duplicates()

    # Location:
    location_df = df_request[['locationAlternateId', 'locationPath']].drop_duplicates()

    # Service Requests:
    temp_ser_req = df_request[['isSelfAssign', 'priorityCode', 
                'requestCreatedDate', 'requestDescription', 'requestAlternateId', 'completionNotes', 
                'requestTargetCompletionDate', 'serviceClassificationAlternateId', 'serviceClassificationPath',  
                'requestCompletionDate', 'workType']]

    def to_local_datetime(date_col):

        if date_col is None:
            return None
        
        if date_col.isna().all():
            return date_col
        
        dt_series = pd.to_datetime(date_col, format='mixed')
        formatted = dt_series.dt.strftime('%Y-%m-%d %H:%M:%S.%f').str[:-3]
        
        return formatted.str.replace(' ', 'T')


    def process_service_requests(df_service_request):
            
            date_cols = ['requestCreatedDate', 'requestTargetCompletionDate', 'requestCompletionDate']
            
            for col in date_cols:
                if col in df_service_request.columns:
                    df_service_request.loc[:, col] = to_local_datetime(df_service_request[col])
            
            df_service_request['createdYear'] = pd.to_datetime(df_service_request['requestCreatedDate']).dt.year
            df_service_request['createdMonth'] = pd.to_datetime(df_service_request['requestCreatedDate']).dt.month
                
            df_service_request['isCompleted'] = df_service_request['requestCompletionDate'].notna()
            
            conditions = [
                df_service_request['requestCompletionDate'].isna(),
                df_service_request['requestTargetCompletionDate'].isna(),
                df_service_request['requestCompletionDate'] <= df_service_request['requestTargetCompletionDate'],
                df_service_request['requestCompletionDate'] > df_service_request['requestTargetCompletionDate']
            ]
            
            choices = ['Open', 'Open', 'Met', 'Miss']
            
            df_service_request['sla'] = np.select(conditions, choices, default='Unknown')
            
            return df_service_request


    service_req_df = process_service_requests(temp_ser_req)

    activity_df = activity_df.copy()
    assets_df = assets_df.copy()
    country_df = country_df.copy()
    customer_df = customer_df.copy()
    location_df = location_df.copy()
    service_req_df = service_req_df.copy()
    logger.info("Node and Property data prepared!")

except Exception as e:
    logger.warning(f"Error preprocessing data: {e}")
    traceback.print_exc()


Traceback (most recent call last):
  File "/var/folders/r2/5_8vrrd90lsdfnwbdxcdvqxw0000gp/T/ipykernel_79002/18777002.py", line 20, in <module>
    v_assets_with_hvac.loc[v_assets_with_hvac['is_HVAC'] == True, 'asset_type'] = 'HVAC'
    ~~~~~~~~~~~~~~~~~~~~~~^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/Users/SChauhan17/Desktop/projects/data_ingestion_pipeline/data_injestion/lib/python3.13/site-packages/pandas/core/indexing.py", line 912, in __setitem__
    iloc._setitem_with_indexer(indexer, value, self.name)
    ~~~~~~~~~~~~~~~~~~~~~~~~~~^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/Users/SChauhan17/Desktop/projects/data_ingestion_pipeline/data_injestion/lib/python3.13/site-packages/pandas/core/indexing.py", line 1848, in _setitem_with_indexer
    raise ValueError(
    ...<2 lines>...
    )
ValueError: cannot set a frame with no defined index and a scalar


Bad pipe message: %s [b'\x9b\x04\x81i?\xa1.\xb7\xfb\xa9\x17d', b'hl\x99Q\x00\x01|\x00\x00\x00\x01\x00\x02\x00\x03\x00\x04\x00\x05\x00\x06\x00\x07\x00\x08\x00\t\x00\n\x00\x0b\x00\x0c\x00\r\x00\x0e\x00\x0f\x00\x10\x00\x11\x00\x12\x00\x13\x00\x14\x00\x15\x00\x16\x00\x17\x00\x18\x00\x19\x00\x1a\x00\x1b\x00/\x000\x001\x002\x003\x004\x005\x006\x007\x008\x009\x00:\x00;\x00<\x00=\x00>\x00?\x00@\x00A\x00B\x00C\x00D\x00E\x00F\x00g\x00h\x00i\x00j\x00k\x00l\x00m\x00\x84\x00\x85\x00\x86\x00\x87\x00\x88\x00\x89\x00\x96\x00\x97\x00\x98\x00\x99\x00\x9a\x00\x9b\x00\x9c\x00\x9d\x00\x9e\x00\x9f\x00\xa0\x00\xa1\x00\xa2\x00\xa3\x00\xa4\x00\xa5\x00\xa6']
Bad pipe message: %s [b'\x16w\xf2\xff\xeb.ZDW7s\x8e\xec\x8e\xf2I\xc9\x85\x00\x01|\x00\x00\x00\x01\x00\x02\x00\x03\x00\x04\x00\x05\x00\x06\x00\x07\x00\x08\x00\t\x00\n\x00\x0b\x00\x0c\x00\r\x00\x0e\x00\x0f\x00\x10\x00\x11\x00\x12\x00\x13\x00\x14\x00\x15\x00\x16\x00\x17\x00\x18\x00\x19\x00\x1a\x00\x1b\x00/\x000\x001\x002\x003\x004\x005\x006\x007\x008\x009\x00:

In [10]:
try:
    # renaming the features:
    activity_df.rename(columns={'activityAlternateId': 'activityId', 'providertype':'providerType'}, inplace=True)

    assets_df.rename(columns={'Asset Alt Id': 'assetId', 'Asset Description':'assetDescription', 'vendorAddress1':'vendorAddress'}, inplace=True)

    location_df.rename(columns={'locationAlternateId': 'locationId'}, inplace=True)

    service_req_df.rename(columns={'requestAlternateId': 'requestId', 'serviceClassificationAlternateId': 'serviceClassificationId'}, inplace=True)

    # logger.info(f"Data for migration to Neo4J is saved on path: {neo4j_dir_path} and ready to be imported!")

except Exception as e:
    logger.warning(f"Error Renaming Features: {e}")

In [11]:
activity_df

Unnamed: 0,providerType,activityId,activityDescription
0,,AC557088,HVAC | General - Knight Cancer Research Buildi...
1,TECHNICIAN,AC557890,Plumbing | Pipefitting - STFTNMU1 | RSM-08.4-0...
2,,AC553975,General Building | Structural & Roofing - Roof...
3,,AC558680,Fire Life Safety | Fire Extinguishers - One Jn...
4,DISPATCHSITE,AC558704,HVAC | General - Split System Quarterly | AIR...
...,...,...,...
9811,TECHNICIAN,AC560109,Fire Life Safety | Fire Protection - 052310-40...
9812,,AC556335,"HVAC | General - Pump, Condensate - Quarterly ..."
9813,TECHNICIAN,AC555114,General Building | Building Services - TX576 |...
9814,TECHNICIAN,AC557863,"HVAC | General - Heater (Gas) PM | Heater, Uni..."


In [12]:
try:

    LOCATED_AT = df_request[['assetAlternateId','locationAlternateId']].dropna().drop_duplicates()
    LOCATED_AT.rename(columns={'assetAlternateId': 'assetId', 'locationAlternateId': 'locationId'}, inplace=True)
    # self.LOCATED_AT.to_csv(f"{neo4j_relationship_dir_path}/LOCATED_AT.csv",index=False)

    AT_LOCATION = df_request[['requestAlternateId','locationAlternateId']].dropna().drop_duplicates()
    AT_LOCATION.rename(columns={'requestAlternateId': 'requestId', 'locationAlternateId': 'locationId'}, inplace=True)
    # self.AT_LOCATION.to_csv(f"{neo4j_relationship_dir_path}/AT_LOCATION.csv",index=False)

    HAS_ACTIVITY = df_request_with_activities[['activityAlternateId', 'requestAlternateId']].dropna().drop_duplicates()
    HAS_ACTIVITY.rename(columns={'requestAlternateId': 'requestId', 'activityAlternateId': 'activityId'}, inplace=True)
    # self.HAS_ACTIVITY.to_csv(f"{neo4j_relationship_dir_path}/HAS_ACTIVITY.csv",index=False)

    FOR_ASSET = df_request[['requestAlternateId','assetAlternateId']].dropna().drop_duplicates()
    FOR_ASSET.rename(columns={'requestAlternateId': 'requestId', 'assetAlternateId': 'assetId'}, inplace=True)
    # self.FOR_ASSET.to_csv(f"{neo4j_relationship_dir_path}/FOR_ASSET.csv",index=False)

    OPERATES_IN = df_request[['customer','country']].dropna().drop_duplicates()
    # self.OPERATES_IN.to_csv(f"{neo4j_relationship_dir_path}/OPERATES_IN.csv",index=False)

    RESIDES_AT = df_request[['customer','locationAlternateId']].dropna().drop_duplicates()
    RESIDES_AT.rename(columns={'locationAlternateId': 'locationId'}, inplace=True)
    # self.RESIDES_AT.to_csv(f"{neo4j_relationship_dir_path}/RESIDES_AT.csv",index=False)

    OWNS = df_request[['customer','assetAlternateId']].dropna().drop_duplicates()
    OWNS.rename(columns={'assetAlternateId': 'assetId'}, inplace=True)
    # self.OWNS.to_csv(f"{neo4j_relationship_dir_path}/OWNS.csv",index=False)

    CREATES = df_request[['customer','requestAlternateId']].dropna().drop_duplicates()
    CREATES.rename(columns={'requestAlternateId': 'requestId'}, inplace=True)
    # self.CREATES.to_csv(f"{neo4j_relationship_dir_path}/CREATES.csv",index=False)

    IN = df_request[['country','locationAlternateId']].dropna().drop_duplicates()
    IN.rename(columns={'locationAlternateId': 'locationId'}, inplace=True)
    # self.IN.to_csv(f"{neo4j_relationship_dir_path}/IN.csv",index=False)

    logger.info(f"Relationships created!")

except Exception as e:
    logger.warning(f"Error while creating and saving relationships: {e}")



INFO:__main__:Relationships created!


In [13]:
FOR_ASSET

Unnamed: 0,requestId,assetId
0,ROHS101050,AUS434275
1,RSTF129401,AUS303002
3,RJNJ122478,ACO100054
4,RMFG121180,AUS408425
5,RCHR249629,AUS330504
...,...,...
8860,RALK116010,AUS286004
8861,RCHR247411,AUS314632
8862,RSTF129381,AUS294405
8863,RCHR249274,AUS312181
