In [10]:
import json
from typing import Dict
from pathlib import Path

In [11]:
def save_rule_to_json(rule_data: Dict, filename: str = 'rules.json'):
    """
    Save rule to JSON file. If file exists, it will add to existing rules.
    
    Args:
        rule_data (Dict): Rule data in dictionary format
        filename (str): Name of the JSON file to save to
    """
    # Initialize the rules structure
    data = {"rules": []}
    
    # Check if file exists and has content
    file_path = Path(filename)
    if file_path.exists() and file_path.stat().st_size > 0:
        with open(filename, 'r') as f:
            data = json.load(f)
    
    # Add new rule
    if "rules" not in data:
        data["rules"] = []
    
    # Check if rule with same ID exists
    rule_exists = False
    for i, rule in enumerate(data["rules"]):
        if rule["ruleId"] == rule_data["ruleId"]:
            data["rules"][i] = rule_data  # Update existing rule
            rule_exists = True
            break
    
    if not rule_exists:
        data["rules"].append(rule_data)
    
    
    # Save to file
    with open(filename, 'w') as f:
        json.dump(data, f, indent=2)


In [27]:
rule = {
      "ruleId": 3,
      "conditions": [
        {
          "ruleTypeKey": "airline",
          "airlineKey": "All",
          "operatorKey": "equalto"
        },
        {
          "ruleTypeKey": "origin", 
          "originKey": "LHR",
          "operatorKey": "equalto",
          "secondLayer": "airportcode"
        },
        {
          "ruleTypeKey": "cabinclass",
          "cabinclassKey": "economy", 
          "operatorKey": "equalto",
          "firstInput": "cabin class name"
        }
      ]
    }

In [28]:
save_rule_to_json(rule)

In [30]:
class MatchType(Enum):
    EQUAL = 'equalto'
    NOT_EQUAL = 'notequalto'
    IS_IN = 'isin'
    IS_NOT_IN = 'isnotin'

# Mapping dictionaries
AIRPORT_TO_COUNTRY = {
    'LHR': 'UK',
    'LGW': 'UK',
    'MAN': 'UK',
    'HKG': 'HK',
    'JFK': 'US',
    'LAX': 'US',
}

AIRPORT_TO_CITY = {
    'LHR': 'LON',
    'LGW': 'LON',
    'HKG': 'HKG',
    'JFK': 'NYC',
}

def match_airline(pnr_value: str, condition: Dict) -> bool:
    """
    Match airline specific rules
    """
    pnr_airline = pnr_value.upper()
    rule_value = condition['airlineKey'].upper()
    operator = MatchType(condition['operatorKey'])
    
    # Handle comma-separated values
    rule_values = set(rule_value.split(',')) if ',' in rule_value else {rule_value}
    
    # Special case for 'all'
    if rule_value == 'ALL':
        return True if operator == MatchType.EQUAL else False
        
    match operator:
        case MatchType.EQUAL:
            return pnr_airline in rule_values
        case MatchType.NOT_EQUAL:
            return pnr_airline not in rule_values
        case MatchType.IS_IN:
            return pnr_airline in rule_values
        case MatchType.IS_NOT_IN:
            return pnr_airline not in rule_values
    
    return False

def match_origin(pnr_value: str, condition: Dict) -> bool:
    """
    Match origin specific rules with second layer support
    """
    pnr_origin = pnr_value.upper()
    rule_value = condition['originKey'].upper()
    operator = MatchType(condition['operatorKey'])
    second_layer = condition.get('secondLayer')
    
    # Transform PNR value based on second layer
    if second_layer == 'country':
        pnr_origin = AIRPORT_TO_COUNTRY.get(pnr_origin, pnr_origin)
    elif second_layer == 'city':
        pnr_origin = AIRPORT_TO_CITY.get(pnr_origin, pnr_origin)
    
    # Handle comma-separated values
    rule_values = set(rule_value.split(',')) if ',' in rule_value else {rule_value}
    
    # Special case for 'all'
    if rule_value == 'ALL':
        return True if operator == MatchType.EQUAL else False
        
    match operator:
        case MatchType.EQUAL:
            return pnr_origin in rule_values
        case MatchType.NOT_EQUAL:
            return pnr_origin not in rule_values
        case MatchType.IS_IN:
            return pnr_origin in rule_values
        case MatchType.IS_NOT_IN:
            return pnr_origin not in rule_values
    
    return False

def match_cabinclass(pnr_value: str, condition: Dict) -> bool:
    """
    Match cabin class specific rules
    """
    pnr_cabin = pnr_value.lower()
    rule_value = condition['cabinclassKey'].lower()
    operator = MatchType(condition['operatorKey'])
    
    # Handle comma-separated values
    rule_values = set(rule_value.split(',')) if ',' in rule_value else {rule_value}
    
    # Special case for 'all'
    if rule_value == 'all':
        return True if operator == MatchType.EQUAL else False
        
    match operator:
        case MatchType.EQUAL:
            return pnr_cabin in rule_values
        case MatchType.NOT_EQUAL:
            return pnr_cabin not in rule_values
        case MatchType.IS_IN:
            return pnr_cabin in rule_values
        case MatchType.IS_NOT_IN:
            return pnr_cabin not in rule_values
    
    return False

def match_condition(pnr_data: Dict, condition: Dict) -> bool:
    """
    Route condition to appropriate matching function
    """
    rule_type = condition['ruleTypeKey']
    
    match rule_type:
        case 'airline':
            return match_airline(pnr_data.get('airline', ''), condition)
        case 'origin':
            return match_origin(pnr_data.get('origin', ''), condition)
        case 'cabinclassname':
            return match_cabinclass(pnr_data.get('cabinclass', ''), condition)
    
    return False

def find_matching_rules(pnr_data: Dict, rules_file: str = 'rules.json') -> List[int]:
    """
    Main function to find all matching rules for a PNR
    """
    matching_rule_ids = []
    
    # Load rules
    with open(rules_file, 'r') as f:
        rules_data = json.load(f)
    
    # Check each rule
    for rule in rules_data['rules']:
        all_conditions_match = True
        
        # Check each condition in the rule
        for condition in rule['conditions']:
            if not match_condition(pnr_data, condition):
                all_conditions_match = False
                break
        
        if all_conditions_match:
            matching_rule_ids.append(rule['ruleId'])
    
    return matching_rule_ids



Matching Rule IDs: [3]


In [34]:
# Example usage:
pnr_data = {
    "airline": "CX",
    "origin": "LHR",
    "cabinclass": "economy"
}

matching_rules = find_matching_rules(pnr_data)
print(f"Matching Rule IDs: {matching_rules}")

Matching Rule IDs: [2, 3]
