In [1]:
import sys
import os

# Add the integration_helper folder to sys.path
sys.path.append(os.path.abspath('/Users/gokulr/Developer/ProfessionalWorkspace/Worley/Notebooks/integration_helper'))


In [2]:
!aws s3 ls

2025-09-16 13:57:32 archival-io-227
2025-09-16 13:59:09 aws-glue-assets-894168368672-us-east-1
2025-09-17 06:36:36 worley-integrattion-work


In [3]:

import os
import re
import time
import json
import uuid
import base64
import boto3
import requests
import pandas as pd
import numpy as np
from io import StringIO
from datetime import datetime
from urllib.parse import urlencode
from integration_helper.logger import get_logger
from integration_helper.aws import DynamoDB, get_secret, update_secret
from integration_helper.constants import REGION
from integration_helper.jwt_auth_token import create_token, validate_jwt_token

In [20]:

REGION = "us-east-1"
# Logger setup
logger = get_logger(__name__)

S3_BUCKET = 'worley-integrattion-work'
S3_KEY = 'INT024_source_files/Sample_file_INT24_test.csv'
FAILED_LOG_KEY = "INT024/failed_pre_validation_logs.json"
metadata_table_name = 'sydney-int-mf-stg-metadata-table'
primary_keys = 'INT024_TEST_DATA'
input_keys = 'INT024_TEST_DATA'
POST_URL_fail = 'https://reqres.in/api/users' 
POST_URL_pass = "https://eol99qtwfxfpjp.m.pipedream.net"
host_url = "emzo-dev1.fa.us6.oraclecloud.com"
workers_endpoint = "/hcmRestApi/resources/11.13.18.05/workers"
salaries_endpoint = "/hcmRestApi/resources/11.13.18.05/workers"
workers_query_param = {}
salaries_query_param = {}
api_timeout = 30


In [5]:
# DynamoDB setup and metadata fetching
ddb = DynamoDB(metadata_table_name=metadata_table_name, default_region=REGION)
metadata = ddb.get_metadata_from_ddb(
    source_system_id=primary_keys, metadata_type=input_keys
)

2025-09-17 06:40:07 IST - INFO - Given params are - source_system_id='INT024_TEST_DATA', metadata_type='INT024_TEST_DATA', primary_key='SourceSystemId', sort_key='MetadataType'
Here is the dynamoDB response: {'Items': [{'SourceSystemId': 'INT024_TEST_DATA', 'host_url': 'emzo-dev1.fa.us6.oraclecloud.com', 'auth_api_parameter': {'auth_certificate_key': 'worley-systemintegration-sydney-stg-eightfold-hcm-oauth-certificate', 'auth_type': 'OAuth', 'auth_query_params': None, 'auth_headers': {'alg': 'RS256', 'typ': 'JWT', 'x5t': ''}, 'auth_private_key': 'worley-systemintegration-sydney-stg-eightfold-hcm-oauth-private-key', 'auth_body': {'iss': 'aws.worley.com', 'aud': '{host_url}', 'sub': '{username}', 'exp': '', 'iat': ''}, 'auth_method': None, 'endpoint': None, 'auth_jwt_token_key': 'worley-systemintegration-sydney-stg-eightfold-hcm-oauth-token', 'auth_ssl_verify': None, 'auth_expire': Decimal('600'), 'secret_credentials': 'worley-systemintegration-sydney-stg-eightfold-hcm-credentials', 'aut

In [6]:
auth_info = metadata['auth_api_parameter']
print("Auth API Parameter:", auth_info)
workers_payload_json = metadata['workers_payload_json']
print("Workers Payload JSON:", workers_payload_json)
salaries_payload_json = metadata['salaries_payload_json']
print("Salaries Payload JSON:", salaries_payload_json)
field_transformations = metadata['field_transformations']
print("Field Transformations:", field_transformations)
conditional_field_rules = metadata['conditional_field_rules']
print("Conditional Field Rules:", conditional_field_rules)
pre_validation_rules = metadata['pre_validation_rules']
print("Pre-validation Rules:", pre_validation_rules)
lookup_rules = metadata['lookup_rules']
print("Lookup Rules:", lookup_rules)

Auth API Parameter: {'auth_certificate_key': 'worley-systemintegration-sydney-stg-eightfold-hcm-oauth-certificate', 'auth_type': 'OAuth', 'auth_query_params': None, 'auth_headers': {'alg': 'RS256', 'typ': 'JWT', 'x5t': ''}, 'auth_private_key': 'worley-systemintegration-sydney-stg-eightfold-hcm-oauth-private-key', 'auth_body': {'iss': 'aws.worley.com', 'aud': '{host_url}', 'sub': '{username}', 'exp': '', 'iat': ''}, 'auth_method': None, 'endpoint': None, 'auth_jwt_token_key': 'worley-systemintegration-sydney-stg-eightfold-hcm-oauth-token', 'auth_ssl_verify': None, 'auth_expire': 600, 'secret_credentials': 'worley-systemintegration-sydney-stg-eightfold-hcm-credentials', 'auth_api_name': 'hcm'}
Workers Payload JSON: {'workRelationships|0|assignments|0|assignmentsEFF|0|InformationWOR__Contractor__RateprivateVO|0|amount': 'ContractorPayRate', 'workRelationships|0|assignments|0|assignmentsEFF|0|InformationWOR__Contractor__RateprivateVO|0|frequency': 'PayFrequency', 'externalIdentifiers|0|Fro

In [7]:
# ----------------- Helper: JWT Auth ----------------- #
def get_valid_jwt_token(auth_info, host_url):
    try:
        jwt_token_key = auth_info.get('auth_jwt_token_key')
        past_jwt_token = get_secret(jwt_token_key, REGION)

        # Validate token if exists
        if past_jwt_token:
            token_details = validate_jwt_token(past_jwt_token)
            if token_details:
                expire_epoch = token_details.get('exp')
                if int(time.time()) < expire_epoch and token_details.get('aud').count('https://') == 1:
                    return past_jwt_token  # Still valid

        # Generate new token
        secret_param_key = json.loads(get_secret(auth_info['secret_credentials'], REGION))
        username = secret_param_key.get("username")

        certificate_info = get_secret(auth_info['auth_certificate_key'], REGION)
        priv_key_bytes = get_secret(auth_info['auth_private_key'], REGION).encode('ascii')
        der_private_key = base64.b64decode(priv_key_bytes)

        expiry_time_secs = auth_info.get('auth_expire')

        jwt_token = create_token(
            der_private_key,
            expiry_time_secs,
            certificate_info=certificate_info,
            host_url=host_url,
            username=username,
            auth_info=auth_info
        )

        update_secret(secret_name=jwt_token_key, region_name=REGION, secret_value=jwt_token)
        return jwt_token

    except Exception as e:
        logger.error(f"JWT Auth Error: {e}")
        return None

In [8]:
# ----------------- HCM Request Function ----------------- #
def call_hcm_api_get_old(resource, params, jwt_token):
    url = f"https://{host_url}{resource}"
    headers = {
        #"Authorization": f"Bearer {jwt_token}",
        "REST-Framework-Version": 2
        # "Content-Type": "application/json"
    }

    full_url = f"{url}?{urlencode(params)}"
    
    username="integration.user"
    password="WorleyCloud@2023#"

    # Log the full request URL and body
    logger.info("Calling HCM service.")
    logger.info("Request URL: %s", full_url)
    logger.info("Request Query Parameters: %s", params)
    logger.info("Request Headers: %s", headers)
    try:
        response = requests.get(
            url,
            #headers=headers,
            auth=(username, password),
            params=params,
            timeout=api_timeout
            #verify=api_ssl_verify,
            #proxies=api_proxy,
            #cert=api_cert,
            #allow_redirects=api_allow_redirects
        )

        logger.info("HCM API response status: %d", response.status_code)

        return {
            "statusCode": response.status_code,
            "headers": {
                "Content-Type": response.headers.get("Content-Type", "application/octet-stream")
            },
            "body": response.text
        }

    except Exception as e:
        # Try to get status code from the response if possible, else default to 500
        status_code = 500
        if hasattr(e, 'response') and e.response is not None:
            status_code = e.response.status_code
        logger.error("Error calling HCM service: %s", str(e))
        return {
            "statusCode": status_code,
            "body": json.dumps({"error": str(e)})
        }



In [9]:

# ----------------- Generic HCM Request Function ----------------- #
def call_hcm_api_get(resource: str, params: dict, jwt_token=None):
    # Clean resource path
    if not resource.startswith("/"):
        resource = "/" + resource
    resource = resource.replace("//", "/")

    # Build readable full URL (for logging)
    readable_url = f"https://{host_url}{resource}?" + "&".join([f"{k}={v}" for k, v in params.items()])

    # Build encoded URL for actual request
    url = f"https://{host_url}{resource}"

    headers = {
        "REST-Framework-Version": "2"
        #"Authorization": f"Bearer {jwt_token}"
    }
    username="integration.user"
    password="WorleyCloud@2023#"

    # Log request details
    logger.info("Calling HCM service.")
    logger.info("Request URL (Readable): %s", readable_url)
    logger.info("Request Query Parameters: %s", params)
    logger.info("Request Headers: %s", headers)

    try:
        response = requests.get(
            url,
            auth=(username, password),
            headers=headers,
            params=params,  # requests handles proper encoding internally
            timeout=api_timeout
        )

        logger.info("HCM API response status: %d", response.status_code)
        #logger.info("Response body: %s", response.text[:1000])  # limit log size

        return {
            "statusCode": response.status_code,
            "headers": dict(response.headers),
            "body": response.json() if response.headers.get("Content-Type", "").startswith("application/json") else response.text
        }

    except Exception as e:
        logger.error("Error calling HCM service: %s", str(e))
        return {
            "statusCode": 500,
            "body": {"error": str(e)}
        }


In [10]:
# ----------------- HCM Request Function ----------------- #
def call_hcm_api_post(resource, params, hcm_body, jwt_token, base_url):
    url = f"https://{base_url}{resource}"
    headers = {
        #"Authorization": f"Bearer {jwt_token}",
        "Content-Type": "application/json"
    }

    full_url = f"{url}?{urlencode(params)}"
    
    username="integration.user"
    password="WorleyCloud@2023#"

    # Log the full request URL and body
    logger.info("Calling HCM service.")
    logger.info("Request URL: %s", full_url)
    logger.info("Request Query Parameters: %s", params)
    logger.info("Request Headers: %s", headers)
    try:
        response = requests.post(
            url,
            headers=headers,
            auth=(username, password),
            params=params,
            #data = json.dumps(hcm_body),
            #data = json.dumps(hcm_body, indent=2),
            json = hcm_body,
            timeout=api_timeout
            #verify=api_ssl_verify,
            #proxies=api_proxy,
            #cert=api_cert,
            #allow_redirects=api_allow_redirects
        )

        logger.info("HCM API response status: %d", response.status_code)
        logger.info("=== FINAL JSON Payload ===")
        logger.info(json.dumps(hcm_body, indent=2))
        return {
            "statusCode": response.status_code,
            "headers": dict(response.headers),
            "body": response.text
        }

    except Exception as e:
        status_code = getattr(e.response, "status_code", 500)
        logger.error("Error calling HCM service: %s", str(e))
        return {
            "statusCode": status_code,
            "body": json.dumps({"error": str(e)})
        }



In [21]:
def read_csv_from_s3(bucket, key):
    # Connect to S3 and read file
    s3 = boto3.client('s3')
    response = s3.get_object(Bucket=bucket, Key=key)
    content = response['Body'].read().decode('utf-8')
    
    # Load CSV into DataFrame
    df = pd.read_csv(StringIO(content))
    
    # Set display options (optional, for better debugging)
    pd.set_option('display.max_columns', None)
    pd.set_option('display.max_colwidth', None)
    
    # STEP 1: Replace string 'nan' variations with proper NA
    df = df.replace(['nan', 'NaN', 'NAN'], pd.NA)
    
    # STEP 2: Replace numpy NaNs with pd.NA
    df = df.mask(df.isna(), pd.NA)
    
    # STEP 3: Convert columns to nullable types
    def convert_nullable(col):
        if col.dtype.kind == 'f':
            # Check if all values are whole numbers
            try:
                return pd.to_numeric(col, errors='coerce').astype('Int64')
            except:
                return col.astype('Float64')  # Use nullable float
        elif col.dtype == object:
            return col.astype('string')  # Nullable string type
        return col  # Leave other types unchanged

    df = df.apply(convert_nullable)
    
    # Optional: force all columns to string if needed
    df = df.astype("string")
    print(df.dtypes)
    df.head(20)
    
    return df


In [12]:
def map_csv_row_to_flat_json(row_dict, payload_mapping):
    flat_json = {}
    csv_columns = set(row_dict.keys())

    for json_key, value in payload_mapping.items():
        if value in csv_columns:
            cell_value = row_dict.get(value)
            flat_json[json_key] = cell_value if pd.notna(cell_value) else ""
        else:
            # Keep the static/default value from mapping if not in CSV
            flat_json[json_key] = value if value else ""

    return flat_json


In [13]:
def flatten_to_nested(flat_json, sep='|'):
    """
    Convert a flat JSON dict with keys joined by a separator into a nested dict/list structure.
    Numeric keys are treated as list indices.
    """
    def unflatten_json(flat_json, sep):
        nested = {}

        for flat_key, value in flat_json.items():
            keys = flat_key.split(sep)
            current = nested

            for i, key in enumerate(keys):
                # Convert numeric keys to int for list indices
                try:
                    key_int = int(key)
                    key = key_int
                except ValueError:
                    pass

                if i == len(keys) - 1:
                    # Last key: assign value
                    if isinstance(key, int):
                        while len(current) <= key:
                            current.append(None)
                        current[key] = value
                    else:
                        current[key] = value
                else:
                    next_key = keys[i + 1]
                    try:
                        next_key_int = int(next_key)
                        next_is_list = True
                    except ValueError:
                        next_is_list = False

                    if isinstance(key, int):
                        while len(current) <= key:
                            current.append(None)
                        if current[key] is None:
                            current[key] = [] if next_is_list else {}
                        current = current[key]
                    else:
                        if key not in current:
                            current[key] = [] if next_is_list else {}
                        current = current[key]

        return nested

    def convert_numeric_dicts_to_lists(obj):
        if isinstance(obj, dict):
            obj = {k: convert_numeric_dicts_to_lists(v) for k, v in obj.items()}
            if all(k.isdigit() for k in obj.keys()):
                sorted_items = sorted(((int(k), v) for k, v in obj.items()), key=lambda x: x[0])
                return [v for _, v in sorted_items]
            return obj
        elif isinstance(obj, list):
            return [i for i in (convert_numeric_dicts_to_lists(v) for v in obj) if i is not None]
        else:
            return obj

    # Perform unflattening and normalization
    nested = unflatten_json(flat_json, sep)
    return convert_numeric_dicts_to_lists(nested)


In [14]:
def post_json(url, payload):
    headers = {'Content-Type': 'application/json'}
    response = requests.post(url, headers=headers, data=json.dumps(payload))
    return response.status_code, response.text

In [15]:
def transform_special_fields(flat_payload, field_transformations):
    def format_date(val, fmt):
        try:
            # Parse with pandas or datetime
            dt = pd.to_datetime(str(val), errors='coerce')
            if pd.isnull(dt):
                return val  # Return original if not parseable
            return dt.strftime(fmt)
        except Exception:
            return val

    for field, rules in field_transformations.items():
        if field not in flat_payload:
            continue
        val = flat_payload[field]
        if val in [None, "", float('nan')]:
            continue

        rule_type = rules.get("type")
        if rule_type == "date":
            fmt = rules.get("format", "%Y-%m-%d")
            flat_payload[field] = format_date(val, fmt)

        elif rule_type == "datetime":
            fmt = rules.get("format", "%Y-%m-%d %H:%M:%S")
            flat_payload[field] = format_date(val, fmt)

        elif rule_type == "prefix":
            prefix = rules.get("prefix", "")
            val_str = str(val)
            if not val_str.startswith(prefix):
                flat_payload[field] = f"{prefix}{val_str}"

    return flat_payload


In [16]:
def apply_conditional_field_rules_generic(flat_payload, row_dict, conditional_config):
    """
    Apply conditional field rules to flat_payload based on values in row_dict.

    - Evaluates conditions for each rule.
    - Tracks which keys are managed by rules.
    - Applies conditional_add / conditional_remove / field_group logic deterministically.
    - Ensures CategoryCode is present if any assignmentsEFF field is set.
    """

    def _is_valid(val):
        return pd.notna(val) and str(val).strip().lower() not in ["", "nan", "none"]

    def _evaluate_conditions(conditions, row_dict):
        for field, expected in (conditions or {}).items():
            actual = row_dict.get(field, "")
            actual = "" if pd.isna(actual) else str(actual).strip()
            expected = str(expected).strip()
            if expected == "__nonempty__":
                if actual.lower() in ["", "nan", "none"]:
                    return False
            else:
                if actual != expected:
                    return False
        return True

    # ---- accumulators ----
    desired_additions = {}      # flat_key -> value
    managed_keys = set()        # all keys controlled by any rule
    fg_all_prefixes = []        # prefixes from field_group rules
    fg_keep_prefixes = set()    # prefixes that survive based on CSV values

    # ---- pass 1: evaluate rules ----
    for rule_name, rule in (conditional_config or {}).items():
        rule_type = rule.get("type")

        # ---- field_group ----
        if rule_type == "field_group":
            prefixes = rule.get("field_prefixes", []) or []
            fg_all_prefixes.extend(prefixes)
            conditions = rule.get("conditions", {}) or {}
            for csv_field, prefix in conditions.items():
                if _is_valid(row_dict.get(csv_field)):
                    fg_keep_prefixes.add(prefix)
            continue

        # ---- conditional_add / conditional_add_default ----
        if rule_type in ("conditional_add", "conditional_add_default"):
            conditions = rule.get("conditions", {}) or {}
            fields_dict = rule.get("fields", {}) or {}

            for fk in fields_dict.keys():
                managed_keys.add(fk)

            if _evaluate_conditions(conditions, row_dict):
                for flat_key, source in fields_dict.items():
                    value = None
                    """
                    if isinstance(source, str) and source.startswith("__from_csv__:"):
                        csv_field = source.replace("__from_csv__:", "")
                        if _is_valid(row_dict.get(csv_field)):
                            value = str(row_dict[csv_field]).strip()
                    elif source in row_dict and _is_valid(row_dict[source]):
                        value = str(row_dict[source]).strip()
                    else:
                        value = source
                    """    
                    if isinstance(source, str):
                        if source.startswith("__from_csv__:"):
                            csv_field = source.replace("__from_csv__:", "")
                            if _is_valid(row_dict.get(csv_field)):
                                value = str(row_dict[csv_field]).strip()
                            else:
                                continue  # skip if source is missing
                        else:
                            value = source  # always treat as literal if not explicitly __from_csv__

                    
                    
                    desired_additions[flat_key] = value
            continue

        # ---- conditional_remove ----
        if rule_type == "conditional_remove":
            if _evaluate_conditions(rule.get("conditions", {}), row_dict):
                for fk in rule.get("fields", []) or []:
                    managed_keys.add(fk)
                    desired_additions.pop(fk, None)
            continue

        # ---- catchall ----
        for fk in (rule.get("fields", {}) or {}).keys():
            managed_keys.add(fk)

    # ---- pass 2: removals ----
    fg_remove_prefixes = [p for p in fg_all_prefixes if p not in fg_keep_prefixes]

    for k in list(flat_payload.keys()):
        if k in managed_keys and k not in desired_additions:
            flat_payload.pop(k, None)
            continue
        if any(k.startswith(p) for p in fg_remove_prefixes):
            flat_payload.pop(k, None)
            continue

    # ---- pass 3: additions ----
    for k, v in desired_additions.items():
        if any(k.startswith(p) for p in fg_remove_prefixes):
            continue
        flat_payload[k] = v

    # ---- final enforcement ----
    eff_prefix = "workRelationships|0|assignments|0|assignmentsEFF|0|"
    eff_category_key = eff_prefix + "CategoryCode"
    if any(k.startswith(eff_prefix) for k in flat_payload.keys()) and eff_category_key not in flat_payload:
        flat_payload[eff_category_key] = "PER_ASG_EIT"

    return flat_payload


In [17]:

def lookup_values(row_dict, lookup_rules, call_hcm_api_get, host_url, jwt_token):
    """
    Apply lookup rules using metadata config, return a dict of {target_field: resolved_value}.
    Uses HCM API GET calls via call_hcm_api_get(), with internal caching to avoid redundant calls.
    """
    lookup_results = {}
    lookup_cache = {}  

    for rule in lookup_rules:
        input_field = rule.get("input_field")
        if not input_field or not row_dict.get(input_field):
            continue

        # Prepare query parameters
        query_params = rule.get("query_params", {})
        formatted_params = {}
        for key, value in query_params.items():
            formatted_params[key] = value.format(**row_dict)

        resource = rule.get("resource")
        if not resource:
            continue

        # Minimal change: create cache key from resource + params
        cache_key = (resource, frozenset(formatted_params.items()))

        if cache_key in lookup_cache:
            response = lookup_cache[cache_key]
        else:
            response = call_hcm_api_get(resource, formatted_params, jwt_token)
            lookup_cache[cache_key] = response

        if response.get("statusCode") != 200:
            continue

        try:
            body = json.loads(response.get("body", "{}"))
            items = body.get("items", [])
            if not items:
                continue

            response_value = items[0].get(rule.get("response_field"))
            if response_value:
                lookup_results[rule["target_field"]] = response_value
        except Exception as e:
            logger.warning(f"Lookup failed for rule {rule['name']}: {str(e)}")
            continue

    return lookup_results


In [18]:
def apply_lookup_values_to_payload(payload_dict, lookup_results):
    """
    For each target field in lookup_results, replace *every* matching key
    in the flat payload dict that ends with that field.
    """
    for target_field, resolved_value in lookup_results.items():
        for key in payload_dict.keys():
            if key.endswith(f"|{target_field}") or key == target_field:
                payload_dict[key] = resolved_value
    return payload_dict


In [22]:
df = read_csv_from_s3(S3_BUCKET, S3_KEY)
print(df.dtypes)
df.head(20)

ProposedPersonType       string[python]
TaleoCandidateID         string[python]
WPEmployeeID             string[python]
FirstName                string[python]
MiddleName               string[python]
LastName                 string[python]
Email                    string[python]
Address                  string[python]
Address2                 string[python]
Country                  string[python]
State                    string[python]
County                   string[python]
City                     string[python]
ZipCode                  string[python]
HomePhone                string[python]
MobilePhone              string[python]
SSN                      string[python]
Birthdate                string[python]
Department               string[python]
WorkAssignmentManager    string[python]
PrimaryLocationName      string[python]
LegalEmployer            string[python]
JobGrade                 string[python]
GlobalJobTitle           string[python]
StartDate                string[python]


Unnamed: 0,ProposedPersonType,TaleoCandidateID,WPEmployeeID,FirstName,MiddleName,LastName,Email,Address,Address2,Country,State,County,City,ZipCode,HomePhone,MobilePhone,SSN,Birthdate,Department,WorkAssignmentManager,PrimaryLocationName,LegalEmployer,JobGrade,GlobalJobTitle,StartDate,BasePay,DateFrom,ActionCode,PayFrequency,Schedule,Currency,SalaryBasis,ContractorPayRate,AgencyBillRate,WorkerCategory
0,Employee,122011771,,Franklin,De,Lamar,franklin@test.com,9/999 main road,9/999 main road,United States,TX,Montgomery,Spring,77380,3334505675,,637-91-3953,2/2/03,800666304812500135 USA-ANCHORAGE OPS AND MAINT,736449,AR Buenos Aires - Alem,Worley Engineering Services Inc.,,1040.Ppl.Admin5D,19-09-25,25,19-09-25,ADD_PEN_WKR,Hourly,FT,USD,AR Monthly Salary Basis,,,WOR_STAFF


In [21]:
jwt_token = get_valid_jwt_token(auth_info, host_url)
if not jwt_token:
    print("Failed to obtain JWT token", str(auth_info))
print("JWT Token:", jwt_token)

2025-09-15 10:04:44 IST - INFO - Getting secret worley-systemintegration-sydney-stg-eightfold-hcm-oauth-token
2025-09-15 10:04:44 IST - INFO - Region: us-east-1
2025-09-15 10:04:45 IST - INFO - Validating the JWT token
2025-09-15 10:04:45 IST - INFO - Decoded the JWT token. Details are - {'iss': 'aws.worley.com', 'aud': 'https://emzo-dev1.fa.us6.oraclecloud.com', 'sub': 'eightfold_hcm.user', 'exp': 1757931207, 'iat': 1757930607}
JWT Token: eyJhbGciOiJSUzI1NiIsInR5cCI6IkpXVCIsIng1dCI6IllDRV9yampYRHdfbWVDMTVpeERRbDBhWk8xVSJ9.eyJpc3MiOiJhd3Mud29ybGV5LmNvbSIsImF1ZCI6Imh0dHBzOi8vZW16by1kZXYxLmZhLnVzNi5vcmFjbGVjbG91ZC5jb20iLCJzdWIiOiJlaWdodGZvbGRfaGNtLnVzZXIiLCJleHAiOjE3NTc5MzEyMDcsImlhdCI6MTc1NzkzMDYwN30.LtnlmLYqFn00vpbJmHBuT4RFMj1n1Y_oyKuji6jyo4BUBOr6o7MuTut8uNU_G5X6-FpUzc8ozUBuQp3u1iL66-nsUL0gWH__y9u0ZhP7Ugx5qi6D0ZXqaUmQZOR6VJjuKGFlbpuFUQOBBG9sLtuxp7VFDhJTvTssjiDGPTaQCyhXCSTra3jrFN8BwRbAdyn5GUaNWOlwSpGy2J8FmGs8yLNXg_3yhNsTPxgnD2r9cENFW1ssyyzW77_MEbtysfyoqBRO_mgar1k6TgTghuT2K7QpjgYE-SaCTMD

In [22]:
hcm_response = call_hcm_api_get(workers_endpoint, workers_query_param, jwt_token)
print("HCM Response:", hcm_response)

2025-09-15 10:04:45 IST - INFO - Calling HCM service.
2025-09-15 10:04:45 IST - INFO - Request URL: https://emzo-dev1.fa.us6.oraclecloud.com/hcmRestApi/resources/11.13.18.05/workers?
2025-09-15 10:04:45 IST - INFO - Request Query Parameters: {}
2025-09-15 10:04:45 IST - INFO - Request Headers: {'Authorization': 'Bearer eyJhbGciOiJSUzI1NiIsInR5cCI6IkpXVCIsIng1dCI6IllDRV9yampYRHdfbWVDMTVpeERRbDBhWk8xVSJ9.eyJpc3MiOiJhd3Mud29ybGV5LmNvbSIsImF1ZCI6Imh0dHBzOi8vZW16by1kZXYxLmZhLnVzNi5vcmFjbGVjbG91ZC5jb20iLCJzdWIiOiJlaWdodGZvbGRfaGNtLnVzZXIiLCJleHAiOjE3NTc5MzEyMDcsImlhdCI6MTc1NzkzMDYwN30.LtnlmLYqFn00vpbJmHBuT4RFMj1n1Y_oyKuji6jyo4BUBOr6o7MuTut8uNU_G5X6-FpUzc8ozUBuQp3u1iL66-nsUL0gWH__y9u0ZhP7Ugx5qi6D0ZXqaUmQZOR6VJjuKGFlbpuFUQOBBG9sLtuxp7VFDhJTvTssjiDGPTaQCyhXCSTra3jrFN8BwRbAdyn5GUaNWOlwSpGy2J8FmGs8yLNXg_3yhNsTPxgnD2r9cENFW1ssyyzW77_MEbtysfyoqBRO_mgar1k6TgTghuT2K7QpjgYE-SaCTMD4VsjAZZ9hyeZ8gTbbPhbjTFixYT-8xZgf2SgGDahUQZ3QdwOugA'}
2025-09-15 10:04:46 IST - INFO - HCM API response status: 200
HCM Res

In [23]:
def log_failed_post(failures, bucket, key, df_success_count, total_calls, api_type):
    """
    Convert failures list into a clean dataframe and return it.
    Each failure dict is expected to contain:
      - row_index
      - payload_json (string JSON of the payload or '{}' fallback)
      - CandidateId / CandidateName (optional)
      - person_id / person_number (optional)
      - workers_status / workers_error_message
      - salaries_status / salaries_error_message
    This function logs details, builds the DF, prints head/types, and returns the df.
    """
    if not failures:
        logger.info(f"No failed {api_type} POST requests to log.")
        return pd.DataFrame()

    logger.info(f"Preparing to log {len(failures)} failure records for {api_type}.")

    formatted_failures = []
    for i, f in enumerate(failures):
        logger.debug(f"[log_failed_post] Processing failure {i}: keys={list(f.keys())}")

        # Safely parse payload_json (may be missing or invalid)
        payload = {}
        try:
            payload = json.loads(f.get('payload_json', '{}') or '{}')
        except Exception as e:
            logger.warning(f"[log_failed_post] Failed to parse payload_json for row {f.get('row_index')}: {e}")
            payload = {}

        # Candidate details: prefer explicit fields in failure dict, else fall back to payload/row content
        candidate_id = f.get('TaleoCandidateID') or payload.get('TaleoCandidateID') or \
                       (payload.get('externalIdentifiers', [{}])[0].get('ExternalIdentifierNumber', "") if isinstance(payload, dict) else "")
        # Candidate name: prefer explicit CandidateName, else row fields or payload names
        if f.get('CandidateName'):
            candidate_name = f.get('CandidateName')
        else:
            candidate_name = " ".join(filter(None, [
                payload.get('FirstName') or payload.get('names', [{}])[0].get('FirstName', "") if isinstance(payload, dict) else "",
                payload.get('MiddleNames') or payload.get('names', [{}])[0].get('MiddleNames', "") if isinstance(payload, dict) else "",
                payload.get('LastName') or payload.get('names', [{}])[0].get('LastName', "") if isinstance(payload, dict) else ""
            ])).strip()

        # If still empty, try to read from explicit fields
        if not candidate_name:
            candidate_name = f.get('CandidateName') or ""

        person_number = f.get('person_number', '') or f.get('personNumber', '')
        person_id = f.get('person_id', '') or f.get('personId', '')

        # Always capture status fields (can be SKIPPED, numeric codes, 201 etc)
        workers_status = f.get('workers_status', '')
        workers_error_message = f.get('workers_error_message', '') or f.get('workers_error', '')

        salaries_status = f.get('salaries_status', '')
        salaries_error_message = f.get('salaries_error_message', '') or f.get('salaries_error', '')

        formatted = {
            'Index': f.get('row_index', ''),
            'ID': str(uuid.uuid4()),
            'CandidateId': candidate_id,
            'Candidate Name': candidate_name,
            'Employee PersonNumber': person_number,
            'Employee Person ID': person_id,
            'Workers API Status Code': workers_status,
            'Workers API Error Details': workers_error_message,
            'Salary API Status Code': salaries_status,
            'Salary API Error Details': salaries_error_message,
        }
        logger.debug(f"[log_failed_post] Formatted record: {formatted}")
        formatted_failures.append(formatted)

    # Build dataframe
    df = pd.DataFrame(formatted_failures)
    logger.info(f"[log_failed_post] Built DataFrame with {len(df)} rows.")

    # Append summary rows as before
    summary = pd.DataFrame([
        {
            'Index': '',
            'ID': '',
            'CandidateId': '',
            'Candidate Name': '',
            'Employee PersonNumber': '',
            'Employee Person ID': '',
            'Workers API Status Code': '',
            'Workers API Error Details': '',
            'Salary API Status Code': '',
            'Salary API Error Details': ''
        },
        {
            'Index': f'Total {api_type} API calls',
            'ID': total_calls,
            'CandidateId': 'Succeeded',
            'Candidate Name': df_success_count,
            'Employee PersonNumber': '',
            'Employee Person ID': '',
            'Workers API Status Code': 'Failed',
            'Workers API Error Details': len(failures) if api_type.startswith('Workers') else '',
            'Salary API Status Code': '',
            'Salary API Error Details': len(failures) if api_type.startswith('Salaries') else ''
        }
    ])
    df_out = pd.concat([df, summary], ignore_index=True)

    # Print types and sample for debugging (keeps previous behavior)
    logger.info(f"[log_failed_post] Output df dtypes:\n{df_out.dtypes}")
    logger.info(f"[log_failed_post] Output df head:\n{df_out.head(20).to_string(index=False)}")

    # Save to s3 if needed (commented out in your current code)
    try:
        csv_buffer = StringIO()
        df_out.to_csv(csv_buffer, index=False)
        #print(df.head(20))
        # s3 = boto3.client('s3')
        # s3.put_object(Bucket=bucket, Key=key, Body=csv_buffer.getvalue())
        logger.info(f"[log_failed_post] Prepared CSV for upload to s3://{bucket}/{key} (upload is optional/commented).")
    except Exception as e:
        logger.error(f"[log_failed_post] Failed to prepare/upload CSV to S3: {e}")

    return df_out


In [24]:
def pre_validation_logic(row, pre_validation_rules, jwt_token, idx, workers_failures):
    """
    Run pre-validation in order:
      1) WPEmployeeID
      2) SSN
      3) Email/Phone
      4) Name+DOB
    Skip rules when required inputs are blank.
    """
    row_dict = row.to_dict()
    logger.info(f"[pre_validation] Row {idx} start: TaleoCandidateID={row_dict.get('TaleoCandidateID')}")

    found_not_active = False

    ordered_rule_names = [
        "Check by WPEmployeeID",
        "Check by SSN",
        "Check by Email/Phone",
        "Check by Name+DOB"
    ]
    name_to_rule = {r.get('name'): r for r in pre_validation_rules}
    rules_to_run = [name_to_rule[n] for n in ordered_rule_names if n in name_to_rule]

    for rule in rules_to_run:
        rule_name = rule.get("name")

        # --- Skip validation if required inputs are blank ---
        if rule_name == "Check by WPEmployeeID" and not row_dict.get("WPEmployeeID"):
            logger.info(f"[pre_validation] Row {idx} skipping {rule_name} (no WPEmployeeID)")
            continue
        if rule_name == "Check by SSN" and not row_dict.get("SSN"):
            logger.info(f"[pre_validation] Row {idx} skipping {rule_name} (no SSN)")
            continue
        if rule_name == "Check by Email/Phone" and not (
            row_dict.get("Email") or row_dict.get("MobilePhone") or row_dict.get("HomePhone")
        ):
            logger.info(f"[pre_validation] Row {idx} skipping {rule_name} (no Email/Phone)")
            continue
        if rule_name == "Check by Name+DOB" and not (
            row_dict.get("FirstName") and row_dict.get("LastName") and row_dict.get("DateOfBirth")
        ):
            logger.info(f"[pre_validation] Row {idx} skipping {rule_name} (missing Name or DOB)")
            continue

        # --- Build query ---
        try:
            query = rule["query_template"].format(
                WPEmployeeID=str(row_dict.get("WPEmployeeID", "")).strip(),
                SSN=str(row_dict.get("SSN", "")).strip(),
                Email=str(row_dict.get("Email", "")).strip(),
                Phone=str(row_dict.get("MobilePhone", "") or row_dict.get("HomePhone", "") or "").strip(),
                FirstName=str(row_dict.get("FirstName", "")).strip(),
                FirstInitial=(str(row_dict.get("FirstName", "")).strip()[:1] if row_dict.get("FirstName") else ""),
                LastName=str(row_dict.get("LastName", "")).strip(),
                DateOfBirth=str(row_dict.get("DateOfBirth", "")).strip()
            )
        except Exception as e:
            logger.error(f"[pre_validation] Row {idx} error formatting query for {rule_name}: {e}")
            continue

        params = {"q": query}
        if rule.get("expand"):
            params["expand"] = rule["expand"]

        logger.info(f"[pre_validation] Row {idx} executing {rule_name}, q={query}")

        # --- Call API ---
        try:
            resp = call_hcm_api_get(workers_endpoint, params, jwt_token)
        except Exception as e:
            logger.error(f"[pre_validation] Row {idx} API call failed for {rule_name}: {e}")
            continue

        if resp.get("statusCode") != 200:
            logger.warning(f"[pre_validation] Row {idx} {rule_name} returned status {resp.get('statusCode')}")
            continue

        try:
            data = json.loads(resp.get("body", "{}") or "{}")
        except Exception as e:
            logger.error(f"[pre_validation] Row {idx} failed parsing response for {rule_name}: {e}")
            continue

        items = data.get("items", [])
        if not items:
            logger.info(f"[pre_validation] Row {idx} {rule_name} no matches.")
            continue

        # --- Process items ---
        for item in items:
            person_id = item.get("PersonId", "")
            person_number = item.get("PersonNumber", "")

            for wr in item.get("workRelationships", []):
                for a in wr.get("assignments", []):
                    status = a.get("AssignmentStatusType", "")
                    if status == rule["active_status"]:
                        skip_msg = rule.get("skip_error_message", "Employee already exists")
                        logger.info(f"[pre_validation] Row {idx} SKIP due to ACTIVE assignment.")
                        workers_failures.append({
                            "row_index": idx,
                            "CandidateId": row_dict.get("TaleoCandidateID", ""),
                            "CandidateName": " ".join(filter(None, [row_dict.get("FirstName", ""), row_dict.get("LastName", "")])),
                            "workers_status": "SKIPPED",
                            "workers_error_message": skip_msg,
                            "salaries_status": "",
                            "salaries_error_message": "",
                            "payload_json": json.dumps(row_dict),
                            "person_id": person_id,
                            "person_number": person_number
                        })
                        return None
                    else:
                        found_not_active = True

        if found_not_active:
            break  # Stop further rules if non-ACTIVE found

    # --- Final decision ---
    action = "ADD_PWK_EMP" if found_not_active else "ADD_PEN_WKR"
    logger.info(f"[pre_validation] Row {idx} ActionCode={action}")
    return action


In [26]:
# ---------- main processing (replace your existing for loop block) ----------
df = read_csv_from_s3(S3_BUCKET, S3_KEY)
logger.info(f"Loaded input CSV from S3 {S3_BUCKET}/{S3_KEY}; rows={len(df)}")
logger.info(f"Columns: {list(df.columns)}")

workers_failures = []
salaries_failures = []
df_workers_success_count = 0
df_salaries_success_count = 0
jwt_token = None

for idx, row in df.iterrows():
    row_dict = row.to_dict()
    logger.info(f"=== Processing row {idx} TaleoCandidateID={row_dict.get('TaleoCandidateID')} WPEmployeeID={row_dict.get('WPEmployeeID')} ===")

    # 1) Pre-validation
    try:
        action_code = pre_validation_logic(row, pre_validation_rules, jwt_token, idx, workers_failures)
    except Exception as e:
        logger.error(f"[Main] pre_validation_logic raised for row {idx}: {e}", exc_info=True)
        # log as worker failure and skip
        workers_failures.append({
            'row_index': idx,
            'CandidateId': row_dict.get("TaleoCandidateID", ""),
            'CandidateName': " ".join(filter(None, [row_dict.get("FirstName", ""), row_dict.get("LastName", "")])),
            'workers_status': 'PREVALIDATION_ERROR',
            'workers_error_message': str(e),
            'salaries_status': '',
            'salaries_error_message': '',
            'payload_json': json.dumps(row_dict)
        })
        continue

    if not action_code:
        logger.info(f"[Main] Row {idx} skipped by pre_validation (already logged in workers_failures).")
        continue

    logger.info(f"[Main] Row {idx} pre-validation returned ActionCode={action_code}")

    # 2) Lookup values
    try:
        lookup_results = lookup_values(
            row_dict=row_dict,
            lookup_rules=metadata.get("lookup_rules", []),
            call_hcm_api_get=call_hcm_api_get,
            host_url=metadata.get("host_url"),
            jwt_token=jwt_token
        )
        logger.info(f"[Main] Row {idx} lookup_results keys: {list(lookup_results.keys()) if isinstance(lookup_results, dict) else lookup_results}")
    except Exception as e:
        logger.error(f"[Main] Row {idx} lookup_values failed: {e}", exc_info=True)
        # proceed (lookup failure may still allow post), but record a worker failure if post fails later

    # 3) Build worker payload and set ActionCode
    workers_flat_payload = map_csv_row_to_flat_json(row_dict, workers_payload_json)
    workers_flat_payload["workRelationships|0|assignments|0|ActionCode"] = action_code
    workers_flat_payload = apply_lookup_values_to_payload(workers_flat_payload, lookup_results)
    workers_flat_payload = transform_special_fields(workers_flat_payload, field_transformations)
    workers_flat_payload = apply_conditional_field_rules_generic(
        flat_payload=workers_flat_payload,
        row_dict=row_dict,
        conditional_config=metadata.get("conditional_field_rules", {})
    )
    workers_nested_payload = flatten_to_nested(workers_flat_payload)
    logger.info(f"[Main] Row {idx} prepared workers payload. ActionCode={action_code}")

    # 4) POST Workers
    try:
        logger.info(f"=== [{idx+1}] workers nested JSON Payload ===")
        logger.info(json.dumps(workers_nested_payload, indent=2))
        workers_response_obj = call_hcm_api_post(
            workers_endpoint, workers_query_param, workers_nested_payload, jwt_token, host_url
        )
    except Exception as e:
        logger.exception(f"[Main] Row {idx} call_hcm_api_post(workers) raised: {e}")
        workers_failures.append({
            'row_index': idx,
            'CandidateId': row_dict.get("TaleoCandidateID", ""),
            'CandidateName': " ".join(filter(None, [row_dict.get("FirstName", ""), row_dict.get("LastName", "")])),
            'workers_status': 'POST_EXCEPTION',
            'workers_error_message': str(e),
            'salaries_status': '',
            'salaries_error_message': '',
            'payload_json': json.dumps(workers_nested_payload)
        })
        continue

    workers_status = workers_response_obj.get("statusCode")
    workers_body_str = workers_response_obj.get("body", "")
    logger.info(f"[Main] Row {idx} Workers API returned status {workers_status}")

    # If worker succeeded (201) -> try parse body to extract PersonId/PersonNumber
    person_id = ""
    person_number = ""
    if workers_status == 201:
        df_workers_success_count += 1
        try:
            wbody = json.loads(workers_body_str or "{}")
            # Some responses nest data — try direct keys first
            person_id = wbody.get("PersonId") or wbody.get("personId") or ""
            person_number = wbody.get("PersonNumber") or wbody.get("personNumber") or ""
            logger.info(f"[Main] Row {idx} Worker created PersonId={person_id} PersonNumber={person_number}")
        except Exception as e:
            logger.warning(f"[Main] Row {idx} Failed to parse worker response JSON: {e}")
            wbody = {}
    else:
        # Worker failed — capture into workers_failures and skip salary
        logger.warning(f"[Main] Row {idx} Worker POST failed status={workers_status} body={workers_body_str}")
        workers_failures.append({
            'row_index': idx,
            'CandidateId': row_dict.get("TaleoCandidateID", ""),
            'CandidateName': " ".join(filter(None, [row_dict.get("FirstName", ""), row_dict.get("LastName", "")])),
            'workers_status': workers_status,
            'workers_error_message': workers_body_str or "No response body received",
            'salaries_status': '',
            'salaries_error_message': '',
            'payload_json': json.dumps(workers_nested_payload),
            'person_id': person_id,
            'person_number': person_number
        })
        continue  # do not attempt salary for failed worker

    # 5) Build salary payload, use same action_code and set AssignmentId = person_id (if present)
    salaries_flat_payload = map_csv_row_to_flat_json(row_dict, salaries_payload_json)
    salaries_flat_payload["Salaries|0|ActionCode"] = action_code
    if person_id:
        salaries_flat_payload["Salaries|0|AssignmentId"] = person_id
    salaries_flat_payload = apply_lookup_values_to_payload(salaries_flat_payload, lookup_results)
    salaries_flat_payload = transform_special_fields(salaries_flat_payload, field_transformations)
    salaries_nested_payload = flatten_to_nested(salaries_flat_payload)
    logger.info(f"[Main] Row {idx} prepared salaries payload. ActionCode={action_code} AssignmentId={person_id}")

    # 6) POST salary
    try:
        logger.info(f"=== [{idx+1}] salaries nested JSON Payload ===")
        logger.info(json.dumps(salaries_nested_payload, indent=2))
        salaries_response_obj = call_hcm_api_post(
            salaries_endpoint, salaries_query_param, salaries_nested_payload, jwt_token, host_url
        )
    except Exception as e:
        logger.exception(f"[Main] Row {idx} call_hcm_api_post(salaries) raised: {e}")
        salaries_failures.append({
            'row_index': idx,
            'CandidateId': row_dict.get("TaleoCandidateID", ""),
            'CandidateName': " ".join(filter(None, [row_dict.get("FirstName", ""), row_dict.get("LastName", "")])),
            'workers_status': workers_status,
            'workers_error_message': '',
            'salaries_status': 'POST_EXCEPTION',
            'salaries_error_message': str(e),
            'payload_json': json.dumps(salaries_nested_payload),
            'person_id': person_id,
            'person_number': person_number
        })
        continue

    salaries_status = salaries_response_obj.get("statusCode")
    salaries_body_str = salaries_response_obj.get("body", "")
    logger.info(f"[Main] Row {idx} Salaries API returned status {salaries_status}")

    if salaries_status == 201:
        df_salaries_success_count += 1
        logger.info(f"[Main] Row {idx} Salary posted successfully.")
    else:
        logger.warning(f"[Main] Row {idx} Salary POST failed status={salaries_status}, body={salaries_body_str}")
        salaries_failures.append({
            'row_index': idx,
            'CandidateId': row_dict.get("TaleoCandidateID", ""),
            'CandidateName': " ".join(filter(None, [row_dict.get("FirstName", ""), row_dict.get("LastName", "")])),
            'workers_status': workers_status,
            'workers_error_message': '',
            'salaries_status': salaries_status,
            'salaries_error_message': salaries_body_str,
            'payload_json': json.dumps(salaries_nested_payload),
            'person_id': person_id,
            'person_number': person_number
        })

# End for loop

# Build totals for reporting
total_worker_calls = df_workers_success_count + len(workers_failures)
total_salary_calls = df_salaries_success_count + len(salaries_failures)
logger.info(f"[Main] Completed processing. Worker successes: {df_workers_success_count}, Worker failures: {len(workers_failures)}")
logger.info(f"[Main] Salary successes: {df_salaries_success_count}, Salary failures: {len(salaries_failures)}")

# Combine both failure lists into a single aggregated list for a single error report
combined_failures = []
# We ensure CandidateId/Name present on each record
for rec in workers_failures:
    rec.setdefault('CandidateId', rec.get('TaleoCandidateID', ''))
    rec.setdefault('CandidateName', rec.get('CandidateName', ''))
    combined_failures.append(rec)
for rec in salaries_failures:
    rec.setdefault('CandidateId', rec.get('TaleoCandidateID', ''))
    rec.setdefault('CandidateName', rec.get('CandidateName', ''))
    combined_failures.append(rec)

# Single final CSV/DF of all failures
FAILED_LOG_KEY = 'INT024/failed_post_log.csv'
final_failures_df = log_failed_post(
    combined_failures, S3_BUCKET, FAILED_LOG_KEY,
    df_workers_success_count + df_salaries_success_count,
    total_worker_calls + total_salary_calls,
    api_type='Workers/Salaries'
)

logger.info("[Main] Final failure dataframe prepared.")


ProposedPersonType       string[python]
TaleoCandidateID         string[python]
WPEmployeeID             string[python]
FirstName                string[python]
MiddleName               string[python]
LastName                 string[python]
Email                    string[python]
Address                  string[python]
Address2                 string[python]
Country                  string[python]
State                    string[python]
County                   string[python]
City                     string[python]
ZipCode                  string[python]
HomePhone                string[python]
MobilePhone              string[python]
SSN                      string[python]
Birthdate                string[python]
Department               string[python]
WorkAssignmentManager    string[python]
PrimaryLocationName      string[python]
LegalEmployer            string[python]
JobGrade                 string[python]
GlobalJobTitle           string[python]
StartDate                string[python]
