In [None]:
# prompt: python program
# 1. Simulate a basic CCS process in Python where one process performs an action (a) and
# transitions to the next state.

import time

class CCSProcess:
    def __init__(self, initial_state):
        self.state = initial_state

    def perform_action(self, action):
        if self.state == "State1" and action == "a":
            print("Performing action 'a' in State1")
            time.sleep(1) # Simulate some work
            self.state = "State2"
            print("Transitioned to State2")
        elif self.state == "State2" and action == "b":
            print("Performing action 'b' in State2")
            time.sleep(1)
            self.state = "State3"
            print("Transitioned to State3")
        else:
            print(f"Invalid action '{action}' in state {self.state}")

# Example usage:
my_process = CCSProcess("State1")
my_process.perform_action("a") # Valid action
my_process.perform_action("b") # Valid action
my_process.perform_action("a") # Invalid action


Performing action 'a' in State1
Transitioned to State2
Performing action 'b' in State2
Transitioned to State3
Invalid action 'a' in state State3


In [None]:
# prompt: Python program
# Model and simulate a parallel composition of two CCS processes in Python, where both
# processes execute concurrently.

import time
import threading

class CCSProcess:
    def __init__(self, initial_state, process_id):
        self.state = initial_state
        self.process_id = process_id

    def perform_action(self, action):
        if self.state == "State1" and action == "a":
            print(f"Process {self.process_id}: Performing action 'a' in State1")
            time.sleep(1)  # Simulate some work
            self.state = "State2"
            print(f"Process {self.process_id}: Transitioned to State2")
        elif self.state == "State2" and action == "b":
            print(f"Process {self.process_id}: Performing action 'b' in State2")
            time.sleep(1)
            self.state = "State3"
            print(f"Process {self.process_id}: Transitioned to State3")
        else:
            print(f"Process {self.process_id}: Invalid action '{action}' in state {self.state}")


def process_thread(process, actions):
    for action in actions:
        process.perform_action(action)


# Example usage with parallel composition:
process1 = CCSProcess("State1", 1)
process2 = CCSProcess("State1", 2)

actions1 = ["a", "b"]
actions2 = ["a", "b"]

thread1 = threading.Thread(target=process_thread, args=(process1, actions1))
thread2 = threading.Thread(target=process_thread, args=(process2, actions2))

thread1.start()
thread2.start()

thread1.join()
thread2.join()

print("Both processes finished.")


In [None]:
# prompt: 3. Implement Pi-Calculus communication in Python by simulating a sender process that sends a
# message over a channel and a receiver process that receives it.

import threading
import time

class Channel:
    def __init__(self):
        self.message = None
        self.lock = threading.Lock()
        self.condition = threading.Condition(self.lock)

    def send(self, message):
        with self.condition:
            while self.message is not None:
                self.condition.wait()  # Wait if the channel is occupied
            self.message = message
            print(f"Sent message: {message}")
            self.condition.notify()  # Notify the receiver

    def receive(self):
        with self.condition:
            while self.message is None:
                self.condition.wait()  # Wait if the channel is empty
            message = self.message
            self.message = None
            print(f"Received message: {message}")
            self.condition.notify()  # Notify the sender
            return message


def sender(channel, message):
    time.sleep(1) # Simulate some delay
    channel.send(message)


def receiver(channel):
    message = channel.receive()
    # Process the message
    print(f"Processed message: {message}")


if __name__ == "__main__":
    channel = Channel()
    message = "Hello from sender"

    sender_thread = threading.Thread(target=sender, args=(channel, message))
    receiver_thread = threading.Thread(target=receiver, args=(channel,))

    sender_thread.start()
    receiver_thread.start()

    sender_thread.join()
    receiver_thread.join()

    print("Communication complete.")


In [None]:
`# prompt: python code
#  Write a Python program to verify synchronization between two CCS processes using
# complementary actions (a and ā).

import time
import threading

class CCSProcess:
    def __init__(self, initial_state, process_id):
        self.state = initial_state
        self.process_id = process_id
        self.lock = threading.Lock()

    def perform_action(self, action):
        with self.lock:
            if self.state == "State1" and action == "a":
                print(f"Process {self.process_id}: Performing action 'a' in State1")
                time.sleep(0.1)  # Simulate some work
                self.state = "State2"
                print(f"Process {self.process_id}: Transitioned to State2")
            elif self.state == "State2" and action == "ā": # Complementary action
                print(f"Process {self.process_id}: Performing action 'ā' in State2")
                time.sleep(0.1)
                self.state = "State3"
                print(f"Process {self.process_id}: Transitioned to State3")
            else:
                print(f"Process {self.process_id}: Invalid action '{action}' in state {self.state}")


def process_thread(process, actions):
    for action in actions:
        process.perform_action(action)


# Example usage with synchronization using complementary actions:
process1 = CCSProcess("State1", 1)
process2 = CCSProcess("State1", 2)

actions1 = ["a"]
actions2 = ["a"]

thread1 = threading.Thread(target=process_thread, args=(process1, actions1))
thread2 = threading.Thread(target=process_thread, args=(process2, actions2))

thread1.start()
thread2.start()

thread1.join()
thread2.join()

actions1 = ["ā"]
actions2 = ["ā"]

thread1 = threading.Thread(target=process_thread, args=(process1, actions1))
thread2 = threading.Thread(target=process_thread, args=(process2, actions2))

thread1.start()
thread2.start()

thread1.join()
thread2.join()

print("Both processes finished synchronization.")


In [None]:
# prompt: 5. Simulate a basic producer-consumer system using Python, ensuring mutual exclusion and
# correct handling of shared resources.

import threading
import time
import random

# Shared resources
buffer = []
buffer_size = 5
buffer_lock = threading.Lock()
buffer_not_full = threading.Condition(buffer_lock)
buffer_not_empty = threading.Condition(buffer_lock)

def producer():
  for i in range(10):
    with buffer_not_full:
      while len(buffer) == buffer_size:
        print("Buffer full, producer waiting")
        buffer_not_full.wait()

      item = random.randint(1,100)
      buffer.append(item)
      print(f"Produced {item}, Buffer size: {len(buffer)}")
      buffer_not_empty.notify() # Notify consumer that buffer is not empty
    time.sleep(random.uniform(0.1,0.5)) # Simulate some work


def consumer():
  for _ in range(10):
    with buffer_not_empty:
      while not buffer:
        print("Buffer empty, consumer waiting")
        buffer_not_empty.wait()

      item = buffer.pop(0)
      print(f"Consumed {item}, Buffer size: {len(buffer)}")
      buffer_not_full.notify() # Notify producer that buffer is not full
    time.sleep(random.uniform(0.2,0.8)) # Simulate some work


if __name__ == "__main__":
  producer_thread = threading.Thread(target=producer)
  consumer_thread = threading.Thread(target=consumer)

  producer_thread.start()
  consumer_thread.start()

  producer_thread.join()
  consumer_thread.join()

  print("Producer-consumer simulation complete.")
