In [4]:
# Import the classes.
import someip_timing_analysis as someip

# For plotting the network.
import networkx as nx

In [5]:
# Set the logging level.
someip.analysis.set_logger_level(someip.analysis.logging.ERROR)


AttributeError: module 'someip_timing_analysis' has no attribute 'analysis'

# Network Support

In [None]:
class Device(Node):
    """The device extends node from the graph library.
    """
    def __init__(self, id: int, entities: List[Entity], base_delay: float = 0):
        """It takes as input the unique id, and the list of entities.
        Args:
            id (int): The unique id.
            entities (List[Entity]): The list of entities.
            base_delay (_type_, optional): Base software delay. Defaults to 0.
        """
        Node.__init__(self, id)
        self.entities = entities
        self.base_delay = base_delay

    def device_delay(self) -> float:
        return len(self.entities) * self.base_delay

def device_to_device_delay(graph: Graph, source: Device, target: Device) -> float:
    """Get the delay between every node.

    Args:
        source (Device): Source device.
        target (Device): Target device.

    Returns:
        float: The delay between the nodes.
    """
    if source == target:
        return 0
    (weight, _) = graph.find_shortest_path(source, target)
    return source.device_delay() + weight + target.device_delay()


# Scenario (SOME/IP)

In [None]:
# Load the services.
S0 = Service("S0", boot_del=2, init_del=4, rep_del=1, rep_max=3, cyc_del=2, ans_del=1, offer_mode=True)
S1 = Service("S1", boot_del=1, init_del=3, rep_del=1, rep_max=3, cyc_del=2, ans_del=1, offer_mode=True)
S2 = Service("S2", boot_del=4, init_del=6, rep_del=1, rep_max=3, cyc_del=2, ans_del=1, offer_mode=True)
S3 = Service("S3", boot_del=2, init_del=4, rep_del=1, rep_max=3, cyc_del=2, ans_del=1, offer_mode=True)
S4 = Service("S4", boot_del=1, init_del=3, rep_del=1, rep_max=3, cyc_del=2, ans_del=1, offer_mode=True)

# Load the clients.
C0 = Client("C0", boot_del=3, init_del=2, rep_del=1, rep_max=3, find_mode=True)
C1 = Client("C1", boot_del=3, init_del=2, rep_del=1, rep_max=3, find_mode=True)
C2 = Client("C2", boot_del=3, init_del=2, rep_del=1, rep_max=3, find_mode=True)
C3 = Client("C3", boot_del=3, init_del=2, rep_del=1, rep_max=3, find_mode=True)
C4 = Client("C4", boot_del=3, init_del=2, rep_del=1, rep_max=3, find_mode=True)
C5 = Client("C5", boot_del=3, init_del=2, rep_del=1, rep_max=3, find_mode=True)


# Scenario (Graph)

In [None]:
# Place services and clients inside nodes.
n0 = Device(id = 0, entities = [S0, C0, C1], base_delay = 0.05)
n1 = Device(id = 1, entities = [S1, S2],     base_delay = 0.025)
n2 = Device(id = 2, entities = [S3, C2, C3], base_delay = 0.075)
n3 = Device(id = 3, entities = [S4, C4],     base_delay = 0.045)
n4 = Device(id = 4, entities = [C5],         base_delay = 0.076)

# Create the graph, using weights computed using the following formula:
#   (frame_length * 8) / link_speed
# 
G = Graph(
    connections = [
        (n0, n1, lambda graph, node0, node1: ( 256 * 8) / 1e09),
        (n1, n2, lambda graph, node0, node1: ( 256 * 8) / 1e06),
        (n1, n3, lambda graph, node0, node1: (1024 * 8) / 1e03),
        (n3, n4, lambda graph, node0, node1: ( 256 * 8) / 1e09),
        (n2, n3, lambda graph, node0, node1: ( 256 * 8) / 1e06),
        (n2, n4, lambda graph, node0, node1: (1024 * 8) / 1e03)
    ],
    directed = False
)


# Visualize the graph

In [None]:
if G.directed:
    graph = nx.DiGraph()
else:
    graph = nx.Graph()

# List of edges.
edge_list = [(source.id, target.id) for (source, target) in G.get_edge_list()]

# List of edge labels.
edge_labels = {}
for (source, target) in G.get_edge_list():
    edge_labels[source.id, target.id] = str(G.get_weight(source, target))

# Add the edges.
graph.add_edges_from(edge_list)
# Positions for all nodes.
pos = nx.spring_layout(graph, seed=1, k=0.15, iterations=20)
# Add the nodes.
nx.draw_networkx_nodes(graph, pos, node_color="tab:blue")
# Draw the node labels.
nx.draw_networkx_labels(graph, pos, font_family="sans-serif")
# Draw the edges.
nx.draw_networkx_edges(graph, pos, edgelist=edge_list, width=1.5, edge_color="tab:gray")
# Draw the edges weights.
nx.draw_networkx_edge_labels(graph, pos, edge_labels)
# Draw the graph.
plt.tight_layout()
plt.show()

# Compute timing analysis

In [None]:
# Check where the SOME/IP entities are located.
containing_node = {}
for node in G.graph:
    for entity in node.entities:
        containing_node[entity] = node

def create_relation(graph: Graph, e0: Entity, e1: Entity) -> Relation:
    """Creates a relation between the pair of entities.

    Args:
        graph (Graph): The graph.
        e0 (Entity): The first entity.
        e1 (Entity): The second entity.

    Returns:
        Relation: A relation between the entities.
    """
    return Relation(e0, e1, device_to_device_delay(graph, containing_node[e0], containing_node[e1]))

# Define the relations between clients and services.
system = [
    create_relation(G, C0, S3),
    create_relation(G, C0, S4),
    create_relation(G, C1, S0),
    create_relation(G, C2, S4),
    create_relation(G, C3, S4),
    create_relation(G, C3, S0),
    create_relation(G, C4, S1),
    create_relation(G, C5, S0)
]

# Compute the worst-case discovery time.
analysis.timing_analysis_system(system)


# Graph to CSV

In [None]:
# We save the current graph.
Graph.write_to_csv(G, "test.csv")
# We load the graph from file.
G1 = Graph.read_from_csv("test.csv")
print(G)
print(G1)