In [6]:
import pandas as pd
import numpy as np
import json

def save_to_json(bidder_demand_dict, filename):
    """
    Save bidder_demand_dict to a JSON file, ensuring type compatibility.

    Parameters:
    - bidder_demand_dict: dict, a dictionary with bidders as keys containing quantity and price demand
    - filename: str, the name of the output file (e.g., "bidder_demand_dict.json")
    """
    # Create a converted dictionary to ensure all data types are JSON serializable
    def convert_to_serializable(obj):
        """ Recursively convert non-JSON-serializable types to native Python types. """
        if isinstance(obj, dict):
            return {k: convert_to_serializable(v) for k, v in obj.items()}
        elif isinstance(obj, list):
            return [convert_to_serializable(item) for item in obj]
        elif isinstance(obj, tuple):
            return tuple(convert_to_serializable(item) for item in obj)
        elif isinstance(obj, (int, float, str)):
            return obj
        else:
            # Attempt to convert to string as a fallback
            return str(obj)

    # Convert the dictionary to a serializable format
    serializable_dict = convert_to_serializable(bidder_demand_dict)
    
    # Write the converted data to a JSON file
    try:
        with open(filename, 'w') as f:
            json.dump(serializable_dict, f, indent=4)
        print(f"Data successfully saved to file: {filename}")
    except (TypeError, ValueError) as e:
        print(f"Failed to save file: {e}")

def load_from_json(filename):
    """
    Load from a JSON file.

    Parameters:
    - filename: str, the name of the JSON file to load (e.g., "bidder_demand_dict.json")

    Returns:
    - dict: the loaded dictionary with bidders as keys and their demand data as values
    """
    try:
        with open(filename, 'r') as f:
            data = json.load(f)
        print(f"Data successfully loaded from file: {filename}")
        return data
    except (FileNotFoundError, json.JSONDecodeError) as e:
        print(f"Failed to load file: {e}")
        return None
    
bidder_demand_dict = load_from_json("../data/processed data/bidder_demand_dict.json")


Data successfully loaded from file: ../data/processed data/bidder_demand_dict.json


In [7]:
def check_warp_violation(bidder_demand_dict):
    """
    Check whether each bidder exhibits violations of GARP (Generalized Axiom of Revealed Preference),
    and record the test results for each round pair.

    Parameters:
    - bidder_demand_dict: dict, bidder ID mapped to a list of (round_id, quantity_vector, price_vector, rivals_quantity_vector)

    Returns:
    - dict: summary of WARP/GARP violations for each bidder, including violation details and statistics
    """
    def garp_condition(price_a, price_b, quantity_a, quantity_b):
        """
        Check whether two rounds violate GARP.

        For heterogeneous goods, we compare budget-constrained expenditures instead of element-wise vectors.

        A violation occurs if:
        - The bundle chosen under prices in round A is strictly preferred to the bundle in round B under the same prices
        - And vice versa for round B
        """
        expenditure_a_a = sum(pa * qa for pa, qa in zip(price_a, quantity_a))
        expenditure_a_b = sum(pa * qb for pa, qb in zip(price_a, quantity_b))

        expenditure_b_b = sum(pb * qb for pb, qb in zip(price_b, quantity_b))
        expenditure_b_a = sum(pb * qa for pb, qa in zip(price_b, quantity_a))

        if expenditure_a_b < expenditure_a_a and expenditure_b_a < expenditure_b_b:
            return True  # Violation of GARP
        return False  # Satisfies GARP

    # Initialize result dictionary
    garp_violation_summary = {}

    for bidder, round_data in bidder_demand_dict.items():
        if len(round_data) < 2:
            # If there's only one round, no pairwise comparison is needed
            garp_violation_summary[bidder] = {
                'has_violation': False,
                'pair_violation': [],
                'violation_count': 0,
                'violation_ratio': 0
            }
            continue

        pair_violation = []
        total_violation_count = 0
        has_violation = False

        for i in range(len(round_data)):
            round_id_a, quantity_vector_a, price_vector_a, _ = round_data[i]

            for j in range(i + 1, len(round_data)):
                round_id_b, quantity_vector_b, price_vector_b, _ = round_data[j]

                # Ensure vector lengths match
                if not (len(price_vector_a) == len(price_vector_b) ==
                        len(quantity_vector_a) == len(quantity_vector_b)):
                    raise ValueError("Price and quantity vectors must be of the same length for all rounds.")

                # Ensure all elements are numeric
                if not all(isinstance(x, (int, float)) for x in
                           price_vector_a + price_vector_b + quantity_vector_a + quantity_vector_b):
                    raise ValueError("Price and quantity vectors must contain only numeric values.")

                # Check for GARP violation
                pair_violation_flag = garp_condition(price_vector_a, price_vector_b,
                                                     quantity_vector_a, quantity_vector_b)

                if pair_violation_flag:
                    pair_violation.append({
                        'round_pair': (round_id_a, round_id_b),
                        'violation': pair_violation_flag
                    })
                    has_violation = True
                    total_violation_count += 1

        # Calculate violation ratio
        total_comparisons = len(round_data) * (len(round_data) - 1) / 2
        violation_ratio = total_violation_count / total_comparisons if total_comparisons > 0 else 0

        # Store results
        garp_violation_summary[bidder] = {
            'has_violation': has_violation,
            'pair_violation': pair_violation,
            'violation_count': total_violation_count,
            'violation_ratio': violation_ratio
        }

    return garp_violation_summary


In [8]:
# 假设 bidder_demand_dict 已经生成
warp_violation_results = check_warp_violation(bidder_demand_dict)
save_to_json(warp_violation_results, "../output/rp_checking/warp_violation_results.json")

Data successfully saved to file: ../output/rp_checking/warp_violation_results.json


In [9]:
from collections import defaultdict

def calculate_expenditure(price_vector, quantity_vector):
    """
    Calculate the total expenditure in a given round.
    """
    return sum(p * q for p, q in zip(price_vector, quantity_vector))

def garp_condition(expenditure_a_a, expenditure_a_b, expenditure_b_b, expenditure_b_a):
    """
    Determine whether two rounds violate GARP.
    """
    if expenditure_a_b < expenditure_a_a and expenditure_b_a < expenditure_b_b:
        return True  # GARP is violated
    return False  # GARP is satisfied

def build_preference_graph(round_data):
    """
    Build a revealed preference graph where nodes are rounds (price-quantity pairs)
    and directed edges represent revealed preferences (i.e., one bundle revealed preferred over another).
    """
    graph = defaultdict(list)

    for i in range(len(round_data)):
        round_id_a, quantity_vector_a, price_vector_a, _ = round_data[i]
        expenditure_a_a = calculate_expenditure(price_vector_a, quantity_vector_a)

        for j in range(len(round_data)):
            if i == j:
                continue

            round_id_b, quantity_vector_b, price_vector_b, _ = round_data[j]
            expenditure_a_b = calculate_expenditure(price_vector_a, quantity_vector_b)

            # If round A reveals preference over round B, add an edge from A to B
            if expenditure_a_b <= expenditure_a_a:
                graph[round_id_a].append(round_id_b)

    return graph

def detect_cycle(graph):
    """
    Detect cycles in the revealed preference graph using depth-first search (DFS).
    A cycle indicates a violation of GARP via indirect (chain) preference contradictions.
    """
    visited = set()
    stack = set()

    def visit(node):
        if node in stack:
            return True  # Cycle detected
        if node in visited:
            return False

        visited.add(node)
        stack.add(node)

        for neighbor in graph[node]:
            if visit(neighbor):
                return True

        stack.remove(node)
        return False

    for node in list(graph.keys()):
        if visit(node):
            return True

    return False

def check_garp_violation(bidder_demand_dict):
    """
    Check whether each bidder violates GARP, including both direct revealed preference violations
    and indirect (transitive chain) preference cycles.

    Parameters:
    - bidder_demand_dict: dict, maps bidder ID to a list of tuples 
                          (round_id, quantity_vector, price_vector, rivals_quantity_vector)

    Returns:
    - dict: maps bidder ID to a GARP violation summary including:
        - has_violation: bool
        - pair_violation: list of violating round pairs
        - violation_count: int
        - violation_ratio: float
        - has_cycle_violation: bool
    """
    garp_violation_summary = {}

    for bidder, round_data in bidder_demand_dict.items():
        if len(round_data) < 2:
            # No pairwise comparison needed
            garp_violation_summary[bidder] = {
                'has_violation': False,
                'pair_violation': [],
                'violation_count': 0,
                'violation_ratio': 0,
                'has_cycle_violation': False
            }
            continue

        # Direct GARP violation detection
        pair_violation = []
        total_violation_count = 0
        has_violation = False

        for i in range(len(round_data)):
            round_id_a, quantity_vector_a, price_vector_a, _ = round_data[i]

            for j in range(i + 1, len(round_data)):
                round_id_b, quantity_vector_b, price_vector_b, _ = round_data[j]

                # Validity checks
                if not (len(price_vector_a) == len(price_vector_b) == 
                        len(quantity_vector_a) == len(quantity_vector_b)):
                    raise ValueError("Price and quantity vectors must be the same length across all rounds.")
                if not all(isinstance(x, (int, float)) for x in 
                           price_vector_a + price_vector_b + quantity_vector_a + quantity_vector_b):
                    raise ValueError("All elements in price and quantity vectors must be numeric.")

                # Calculate expenditures
                expenditure_a_a = calculate_expenditure(price_vector_a, quantity_vector_a)
                expenditure_a_b = calculate_expenditure(price_vector_a, quantity_vector_b)
                expenditure_b_b = calculate_expenditure(price_vector_b, quantity_vector_b)
                expenditure_b_a = calculate_expenditure(price_vector_b, quantity_vector_a)

                # Check GARP condition
                if garp_condition(expenditure_a_a, expenditure_a_b, expenditure_b_b, expenditure_b_a):
                    pair_violation.append({
                        'round_pair': (round_id_a, round_id_b),
                        'violation': True
                    })
                    has_violation = True
                    total_violation_count += 1

        # Calculate violation ratio
        total_comparisons = len(round_data) * (len(round_data) - 1) / 2
        violation_ratio = total_violation_count / total_comparisons if total_comparisons > 0 else 0

        # Check indirect GARP violations using graph cycles
        preference_graph = build_preference_graph(round_data)
        has_cycle_violation = detect_cycle(preference_graph)

        # Save results
        garp_violation_summary[bidder] = {
            'has_violation': has_violation,
            'pair_violation': pair_violation,
            'violation_count': total_violation_count,
            'violation_ratio': violation_ratio,
            'has_cycle_violation': has_cycle_violation
        }

    return garp_violation_summary


In [10]:
garp_violation_results = check_garp_violation(bidder_demand_dict)
save_to_json(garp_violation_results, "../output/rp_checking/garp_violation_results.json")

Data successfully saved to file: ../output/rp_checking/garp_violation_results.json
