# Лабораторна №3 Варіант №4

In [13]:
def safe_print(*args, **kwargs):
    print(*args, **kwargs, flush=True)

<img src="1.png">

In [14]:
class PetriNet:
    def __init__(self):
        # Множини
        self.places = []  # Позиції
        self.transitions = []  # Переходи
        self.arcs = []  # Дуги
        
        # Початкова розмітка
        self.initial_marking = {}
        
    def add_place(self, place_id, tokens=0):
        """Додати позицію"""
        if place_id not in self.places:
            self.places.append(place_id)
            self.initial_marking[place_id] = tokens
    
    def add_transition(self, transition_id):
        """Додати перехід"""
        if transition_id not in self.transitions:
            self.transitions.append(transition_id)
    
    def add_arc(self, source, target, weight=1):
        """Додати дугу (source -> target)"""
        self.arcs.append({
            'source': source,
            'target': target,
            'weight': weight
        })
    
    def get_input_places(self, transition):
        """Отримати вхідні позиції для переходу"""
        inputs = {}
        for arc in self.arcs:
            if arc['target'] == transition and arc['source'] in self.places:
                inputs[arc['source']] = arc['weight']
        return inputs
    
    def get_output_places(self, transition):
        """Отримати вихідні позиції для переходу"""
        outputs = {}
        for arc in self.arcs:
            if arc['source'] == transition and arc['target'] in self.places:
                outputs[arc['target']] = arc['weight']
        return outputs
    
    def is_enabled(self, marking, transition):
        """Перевірити чи перехід активний при даній розмітці"""
        inputs = self.get_input_places(transition)
        for place, weight in inputs.items():
            if marking.get(place, 0) < weight:
                return False
        return True
    
    def fire_transition(self, marking, transition):
        """Спрацювання переходу - повертає нову розмітку"""
        if not self.is_enabled(marking, transition):
            return None
        
        new_marking = marking.copy()
        
        # Забрати маркери з вхідних позицій
        inputs = self.get_input_places(transition)
        for place, weight in inputs.items():
            new_marking[place] -= weight
        
        # Додати маркери у вихідні позиції
        outputs = self.get_output_places(transition)
        for place, weight in outputs.items():
            new_marking[place] = new_marking.get(place, 0) + weight
        
        return new_marking
    
    def marking_to_tuple(self, marking):
        """Перетворити розмітку у кортеж для використання як ключ"""
        return tuple(marking.get(p, 0) for p in sorted(self.places))
    
    def tuple_to_marking(self, marking_tuple):
        """Перетворити кортеж назад у розмітку"""
        return {p: marking_tuple[i] for i, p in enumerate(sorted(self.places))}
    
    def build_reachability_graph(self):
        """Побудувати граф досяжних розміток"""
        # Результуючий граф: {розмітка: {перехід: нова_розмітка}}
        graph = {}
        
        # Черга для BFS
        queue = deque([self.initial_marking])
        visited = {self.marking_to_tuple(self.initial_marking)}
        
        while queue:
            current_marking = queue.popleft()
            current_tuple = self.marking_to_tuple(current_marking)
            
            if current_tuple not in graph:
                graph[current_tuple] = {}
            
            # Перевірити всі переходи
            for transition in self.transitions:
                if self.is_enabled(current_marking, transition):
                    new_marking = self.fire_transition(current_marking, transition)
                    new_tuple = self.marking_to_tuple(new_marking)
                    
                    # Додати до графу
                    graph[current_tuple][transition] = new_tuple
                    
                    # Якщо нова розмітка ще не відвідана, додати до черги
                    if new_tuple not in visited:
                        visited.add(new_tuple)
                        queue.append(new_marking)
        
        return graph
    
    def print_reachability_graph(self, graph):
        """Вивести граф досяжних розміток"""
        print("=== ГРАФ ДОСЯЖНИХ РОЗМІТОК ===\n")
        print(f"Кількість досяжних розміток: {len(graph)}\n")
        
        for i, (marking_tuple, transitions) in enumerate(graph.items(), 1):
            marking = self.tuple_to_marking(marking_tuple)
            print(f"Розмітка M{i}: {marking}")
            
            if transitions:
                for transition, next_tuple in transitions.items():
                    next_marking = self.tuple_to_marking(next_tuple)
                    print(f"  --[{transition}]--> {next_marking}")
            else:
                print("  (тупикова розмітка)")
            print()

In [15]:
def from_diagram():
    """
    Реалізація мережі Петрі з діаграми.
    Позначення:
    - P1, P2, P3, P4 - позиції (кола)
    - T1, T2 - переходи (прямокутники)
    """
    
    pn = PetriNet()
    
    # Додаємо позиції (кількість позицій визначаємо з діаграми)
    pn.add_place('P1', tokens=1)  # Початкова позиція з 1 маркером
    pn.add_place('P2', tokens=0)
    pn.add_place('P3', tokens=0)
    pn.add_place('P4', tokens=0)
    
    # Додаємо переходи
    pn.add_transition('T1')
    pn.add_transition('T2')
    
    # Додаємо дуги (на основі діаграми)
    # Від P1 до T1
    pn.add_arc('P1', 'T1', weight=1)
    
    # Від T1 до P2 і P3 (розгалуження)
    pn.add_arc('T1', 'P2', weight=1)
    pn.add_arc('T1', 'P3', weight=1)
    
    # Від P2 і P3 до T2 (синхронізація)
    pn.add_arc('P2', 'T2', weight=1)
    pn.add_arc('P3', 'T2', weight=1)
    
    # Від T2 до P4
    pn.add_arc('T2', 'P4', weight=1)
    
    # Від P4 назад до P1 (цикл)
    pn.add_arc('P4', 'P1', weight=1)
    
    return pn


# Запуск програми
if __name__ == "__main__":
    print("Побудова графу досяжних розміток для мережі Петрі\n")
    
    # Створення мережі Петрі з діаграми
    petri_net = from_diagram()
    
    print("Початкова розмітка:", petri_net.initial_marking)
    print("Позиції:", petri_net.places)
    print("Переходи:", petri_net.transitions)
    print()
    
    # Побудова графу досяжних розміток
    reachability_graph = petri_net.build_reachability_graph()
    
    # Виведення результату
    petri_net.print_reachability_graph(reachability_graph)
    
    # Додаткова статистика
    print("=== СТАТИСТИКА ===")
    print(f"Загальна кількість досяжних розміток: {len(reachability_graph)}")
    
    # Пошук тупикових розміток
    deadlocks = [m for m, trans in reachability_graph.items() if not trans]
    if deadlocks:
        print(f"Кількість тупикових розміток: {len(deadlocks)}")
        for dl in deadlocks:
            print(f"  - {petri_net.tuple_to_marking(dl)}")
    else:
        print("Тупикових розміток немає")

Побудова графу досяжних розміток для мережі Петрі

Початкова розмітка: {'P1': 1, 'P2': 0, 'P3': 0, 'P4': 0}
Позиції: ['P1', 'P2', 'P3', 'P4']
Переходи: ['T1', 'T2']

=== ГРАФ ДОСЯЖНИХ РОЗМІТОК ===

Кількість досяжних розміток: 3

Розмітка M1: {'P1': 1, 'P2': 0, 'P3': 0, 'P4': 0}
  --[T1]--> {'P1': 0, 'P2': 1, 'P3': 1, 'P4': 0}

Розмітка M2: {'P1': 0, 'P2': 1, 'P3': 1, 'P4': 0}
  --[T2]--> {'P1': 0, 'P2': 0, 'P3': 0, 'P4': 1}

Розмітка M3: {'P1': 0, 'P2': 0, 'P3': 0, 'P4': 1}
  (тупикова розмітка)

=== СТАТИСТИКА ===
Загальна кількість досяжних розміток: 3
Кількість тупикових розміток: 1
  - {'P1': 0, 'P2': 0, 'P3': 0, 'P4': 1}


4. Перетворення задачі 3 для випадку, коли запрошення до столу необов'язкові для розгляду 
філософом.

In [16]:
class PetriNet:
    def __init__(self):
        self.places = {}  # {place_name: token_count}
        self.transitions = {}  # {transition_name: {'input': [(place, weight)], 'output': [(place, weight)]}}
        self.lock = threading.Lock()
    
    def add_place(self, name, tokens=0):
        self.places[name] = tokens
    
    def add_transition(self, name, inputs, outputs):
        """
        inputs: [(place_name, weight), ...]
        outputs: [(place_name, weight), ...]
        """
        self.transitions[name] = {'input': inputs, 'output': outputs}
    
    def is_enabled(self, transition_name):
        """Перевірка чи дозволений перехід"""
        trans = self.transitions[transition_name]
        for place, weight in trans['input']:
            if self.places.get(place, 0) < weight:
                return False
        return True
    
    def fire(self, transition_name):
        """Спрацювання переходу"""
        with self.lock:
            if not self.is_enabled(transition_name):
                return False
            
            trans = self.transitions[transition_name]
            
            # Забрати фішки з вхідних позицій
            for place, weight in trans['input']:
                self.places[place] -= weight
            
            # Додати фішки до вихідних позицій
            for place, weight in trans['output']:
                self.places[place] = self.places.get(place, 0) + weight
            
            return True
    
    def get_marking(self):
        """Отримати поточну розмітку"""
        return dict(self.places)
    
    def print_state(self):
        safe_print("Поточна розмітка:", self.get_marking())

In [18]:
class DiningPhilosophers4:
    def __init__(self, n=5):
        self.n = n
        self.pn = PetriNet()
        
        self.pn.add_place("invitation", 1)
        
        for i in range(n):
            self.pn.add_place(f"thinking_{i}", 1)
            self.pn.add_place(f"invited_{i}", 0)
            self.pn.add_place(f"eating_{i}", 0)
            self.pn.add_place(f"fork_{i}", 1)
        
        for i in range(n):
            left_fork = i
            right_fork = (i + 1) % n
            
            # Отримати запрошення
            self.pn.add_transition(
                f"get_invitation_{i}",
                [(f"thinking_{i}", 1), ("invitation", 1)],
                [(f"invited_{i}", 1)]
            )
            
            # Взяти виделки після запрошення
            self.pn.add_transition(
                f"take_forks_invited_{i}",
                [(f"invited_{i}", 1), (f"fork_{left_fork}", 1), (f"fork_{right_fork}", 1)],
                [(f"eating_{i}", 1)]
            )
            
            # Взяти виделки БЕЗ запрошення (якщо філософ ігнорує запрошення)
            self.pn.add_transition(
                f"take_forks_direct_{i}",
                [(f"thinking_{i}", 1), (f"fork_{left_fork}", 1), (f"fork_{right_fork}", 1)],
                [(f"eating_{i}", 1)]
            )
            
            # Покласти виделки після запрошеної їжі
            self.pn.add_transition(
                f"release_invited_{i}",
                [(f"eating_{i}", 1)],
                [(f"thinking_{i}", 1), (f"fork_{left_fork}", 1), (f"fork_{right_fork}", 1), ("invitation", 1)]
            )
            
            # Покласти виделки після прямої їжі
            self.pn.add_transition(
                f"release_direct_{i}",
                [(f"eating_{i}", 1)],
                [(f"thinking_{i}", 1), (f"fork_{left_fork}", 1), (f"fork_{right_fork}", 1)]
            )
    
    def philosopher(self, phil_id):
        for _ in range(3):
            safe_print(f"Філософ {phil_id} роздумує...")
            time.sleep(random.uniform(0.1, 0.3))
            
            # Філософ вирішує: чекати запрошення чи їсти без нього
            wait_invitation = random.choice([True, False])
            
            if wait_invitation:
                while not self.pn.fire(f"get_invitation_{phil_id}"):
                    time.sleep(0.05)
                safe_print(f"Філософ {phil_id} отримав запрошення")
                
                while not self.pn.fire(f"take_forks_invited_{phil_id}"):
                    time.sleep(0.05)
                safe_print(f"Філософ {phil_id} їсть (з запрошенням)")
                
                time.sleep(random.uniform(0.1, 0.2))
                self.pn.fire(f"release_invited_{phil_id}")
                safe_print(f"Філософ {phil_id} закінчив їсти та повернув запрошення")
            else:
                while not self.pn.fire(f"take_forks_direct_{phil_id}"):
                    time.sleep(0.05)
                safe_print(f"Філософ {phil_id} їсть (БЕЗ запрошення)")
                
                time.sleep(random.uniform(0.1, 0.2))
                self.pn.fire(f"release_direct_{phil_id}")
                safe_print(f"Філософ {phil_id} закінчив їсти")
    
    def run(self):
        threads = [threading.Thread(target=self.philosopher, args=(i,)) for i in range(self.n)]
        for t in threads:
            t.start()
        for t in threads:
            t.join()

dp4 = DiningPhilosophers4(5)
dp4.run()

Філософ 0 роздумує...
Філософ 1 роздумує...
Філософ 2 роздумує...
Філософ 3 роздумує...
Філософ 4 роздумує...
Філософ 3 їсть (БЕЗ запрошення)
Філософ 2 отримав запрошення
Філософ 3 закінчив їсти
Філософ 3 роздумує...
Філософ 2 їсть (з запрошенням)
Філософ 2 закінчив їсти та повернув запрошення
Філософ 2 роздумує...
Філософ 0 отримав запрошення
Філософ 0 їсть (з запрошенням)
Філософ 2 їсть (БЕЗ запрошення)
Філософ 0 закінчив їсти та повернув запрошення
Філософ 0 роздумує...
Філософ 4 отримав запрошення
Філософ 4 їсть (з запрошенням)
Філософ 2 закінчив їсти
Філософ 2 роздумує...
Філософ 4 закінчив їсти та повернув запрошення
Філософ 4 роздумує...
Філософ 0 їсть (БЕЗ запрошення)
Філософ 3 отримав запрошення
Філософ 3 їсть (з запрошенням)
Філософ 3 закінчив їсти та повернув запрошення
Філософ 3 роздумує...
Філософ 2 їсть (БЕЗ запрошення)
Філософ 1 отримав запрошення
Філософ 0 закінчив їсти
Філософ 0 роздумує...
Філософ 2 закінчив їсти
Філософ 1 їсть (з запрошенням)
Філософ 3 їсть (БЕЗ запр