In [59]:
import re
import math
import sys
sys.path.append("../../src/")
import uncertainpy.gradual as grad
import time
from statistics import median
import random
random.seed(0)

In [60]:
# Parse a QBAF file to an adjacency matrix
def parse_qbaf(filename):
    num_nodes = 0
    with open(filename, 'r') as file:
        for line in file:
            match = re.match(r'(\w+)\((\d+), ([\d.]+)\)', line)
            if match:
                type, node_id1, weight = match.groups()
                if type == 'arg':
                    num_nodes += 1
            else:
                raise ValueError(f"can't parse this line: {line}")
        adj_matrix = [[0] * num_nodes for _ in range(num_nodes)]

    with open(filename, 'r') as file:
        for line in file:
            match = re.match(r'(\w+)\((\d+), ([\d.]+)\)', line)
            if match:
                type, node_id1, node_id2 = match.groups()
                if type == 'att':
                    adj_matrix[int(node_id1)][int(node_id2)] = -1
                elif type == 'sup':
                    adj_matrix[int(node_id1)][int(node_id2)] = 1
            else:
                raise ValueError(f"can't parse this line: {line}")
    return adj_matrix

In [61]:
# Convert the adjacency matrix to a python dictionary
def adjacency_matrix_to_dict(adj_matrix):
    graph_dict = {}

    for i, row in enumerate(adj_matrix):
        neighbors = set(j for j, weight in enumerate(row) if weight != 0)
        graph_dict[i] = neighbors

    return graph_dict

In [62]:
# Compute all the paths between two arguments
def find_all_paths_between_two_args(graph, start, end, path=[]):
    path = path + [start]
    if start == end:
        return [path]
    if start not in graph:
        return []
    paths = []
    for node in graph[start]:
        if node not in path:
            new_paths = find_all_paths_between_two_args(graph, node, end, path)
            for new_path in new_paths:
                paths.append(new_path)
    return paths

def find_all_paths_between_two_args_complete(graph, start, end, path=[]):
    if start == end:
        return []
    else:
        paths = find_all_paths_between_two_args(graph, start, end)
        return paths

In [63]:
# Compute the (direct or indirect) polarity from one node to another, return an integer
def compute_polarity_between_two_args(graph, start_node, end_node, adj_matrix):
    paths = find_all_paths_between_two_args_complete(graph, start_node, end_node)
    paths_count = len(paths)
    # print(paths_count)
    # print(paths)

    # case1: self to self:
    if start_node == end_node:
        return 1 # positive

    # case2: no path
    if paths_count == 0:
        return -2 # neutral

    # case3: 1 path or more than 1
    elif paths_count >= 1:

        # record the polarity for each path
        recodr_polarity = [1] * paths_count
        for j in range(paths_count):
            for i in range(len(paths[j])-1):
                recodr_polarity[j] *= adj_matrix[paths[j][i]][paths[j][i+1]]

        if all(x == 1 for x in recodr_polarity):
            pol = 1 # positive: if all the paths are positive
        elif all(x == -1 for x in recodr_polarity):
            pol = -1 # negative: if all the paths are negative
        else:
            pol = 0 # unknown: some paths positive while some negative

        return pol

In [64]:
# Compute the polarity for all the nodes to the topic argument, return a vector
def compute_polarity_vector(graph, topic_arg, adj_matrix):
    num_nodes = len(graph)
    polarity_vector = [-2] * num_nodes
    for i in range(num_nodes):
        polarity_vector[i] = compute_polarity_between_two_args(graph, i, topic_arg, adj_matrix)
    return polarity_vector

In [65]:
# Compute priority (not polarity) between two arguments
def compute_priority_between_two_args(graph, start, end):
    if start == end:
        return 3 # itself
    paths = find_all_paths_between_two_args(graph, start, end)
    if len(paths) == 0:
        return 0 # disconnected
    min_length = min(len(path) for path in paths) - 1
    priority = 1 / min_length # single/multi-path connected
    return priority

In [66]:
# Compute the priority for all arguments to the topic argument
def compute_priority_vector(graph, topic_arg):
    num_nodes = len(graph)
    priority_vector = [0] * num_nodes
    for i in range(num_nodes):
        priority_vector[i] = compute_priority_between_two_args(graph, i, topic_arg)
    return priority_vector

In [67]:
# Compute different quotiont from one argument to the topic argument
def diff_quotient(bag, arg, topic_arg, h, agg_f, inf_f):

    sigma = bag.arguments[str(topic_arg)].strength

    arg_initial = arg.get_initial_weight()
    arg.reset_initial_weight(arg_initial + h)

    grad.algorithms.computeStrengthValues(bag, agg_f, inf_f)
    sigma_new = bag.arguments[str(topic_arg)].strength

    arg.reset_initial_weight(arg_initial)
    grad.algorithms.computeStrengthValues(bag, agg_f, inf_f)

    return (sigma_new - sigma) / h

In [68]:
def compute_sparsity(bs1, bs2):
    count = 0
    for item in bs1:
        if bs1[item]!=bs2[item]:
            count += 1
    return count

In [69]:
def compute_potential_cause_acyclic_sparsity(filename, topic_arg, desired_strength, polarity, priority, agg_f, inf_f):

    bag = grad.BAG(filename)
    grad.algorithms.computeStrengthValues(bag, agg_f, inf_f)
    # print(f"number of args: {len(bag.arguments)}")

    pre_strength = -1

    # print(f"desired_strength:{desired_strength}")
    initial_strength = bag.arguments[str(topic_arg)].strength
    cur_strength = initial_strength
    print(f"cur_strength:{initial_strength}")

    # rank the priority list
    sorted_priority = sorted(priority, reverse=True)
    indices_sorted_priority = [index for index, value in sorted(enumerate(priority), key=lambda x: x[1], reverse=True)]

    # traverse all the argument from the highest priority to the lowest
    for i in indices_sorted_priority:
        if pre_strength == cur_strength:
            break
        if (desired_strength - cur_strength) * (desired_strength - initial_strength) > 0: # initial strength and current strength still at the same side

            # increase positive and decrease negative
            if (desired_strength - cur_strength) * polarity[i] > 0:
                bag.arguments[str(i)].reset_initial_weight(1.0)
            elif (desired_strength - cur_strength) * polarity[i] < 0:
                bag.arguments[str(i)].reset_initial_weight(0.0)

            # re-compute the strength
            print(f"desired_strength:{desired_strength}")
            pre_strength = cur_strength
            grad.algorithms.computeStrengthValues(bag, agg_f, inf_f)
            cur_strength = bag.arguments[str(topic_arg)].strength
            print(f"cur_strength:{cur_strength}")

            # print current bs
            current_bs_dict = {}
            for arg in bag.arguments.values():
                current_bs_dict[arg.get_name()] = arg.get_initial_weight()
            print(f"current base scores: {current_bs_dict}")

    # print current bs
    current_bs_dict = {}
    for arg in bag.arguments.values():
        current_bs_dict[arg.get_name()] = arg.get_initial_weight()
    print(f"current base scores: {current_bs_dict}")

    # compute validity
    validity = False
    if (desired_strength - cur_strength) * (desired_strength - initial_strength) < 0 or desired_strength == cur_strength:
        validity = True

    return current_bs_dict, validity

In [70]:
# priority = [3, 1.0, 1.0]
# sorted_priority = sorted(priority, reverse=True)
# indices_sorted_priority = [index for index, value in sorted(enumerate(priority), key=lambda x: x[1], reverse=True)]
# print("priority:", priority)
# print("priority_sorted:", sorted_priority)
# print("priority_index:", indices_sorted_priority)

In [71]:
def main():

    start_time = time.time() # record the current time

    # Set parameters
    filename = f'../../bags/acyclic_0.bag'
    topic_arg = 0
    desired_strength = 0.5
    agg_f = grad.semantics.modular.SumAggregation()
    inf_f = grad.semantics.modular.QuadraticMaximumInfluence(conservativeness=1)

    # Obtain origin_base_score_dict
    bag = grad.BAG(filename)
    origin_base_score_dict = {}
    for arg in bag.arguments.values():
        origin_base_score_dict[arg.name] = arg.get_initial_weight()
    # print(f"origin_base_score_dict:{origin_base_score_dict}")

    # Compute polarity, priority
    adj_matrix = parse_qbaf(filename)
    graph_dict = adjacency_matrix_to_dict(adj_matrix)
    polarity = compute_polarity_vector(graph_dict, topic_arg, adj_matrix)
    priority = compute_priority_vector(graph_dict, topic_arg)
    # print(f"cycle polarity:{cycles_polarity_dict}")
    # print(f"polarity:{polarity}")
    print(f"priority:{priority}")

    # Compute a potential cause
    potential_cause_dict, validity = compute_potential_cause_acyclic_sparsity(filename, topic_arg, desired_strength, polarity, priority, agg_f, inf_f)

    # Compute sparsity
    sparsity = compute_sparsity(origin_base_score_dict, potential_cause_dict)

    # Compute runtime
    end_time = time.time()
    runtime = end_time - start_time

    # Print out details
    print(f"================================================")
    print(f"Summary Results:")
    print(f"Validity:{validity}")
    print(f"Sparsity:{sparsity}")
    print(f"Runtime:{runtime}")

if __name__ == "__main__":
    main()

priority:[3, 1.0, 1.0]
cur_strength:0.13699037793869656
desired_strength:0.5
cur_strength:1.0
current base scores: {'0': 1.0, '1': 0.85, '2': 0.76}
current base scores: {'0': 1.0, '1': 0.85, '2': 0.76}
Summary Results:
Validity:True
Sparsity:1
Runtime:0.0010001659393310547
