In [1]:
from main import generate_estimation
from math import ceil
import time
import json
from datetime import datetime
# _, numOfCars = generate_estimation()
# print(_)
# numOfCars
# greenTime = min(numOfCars * 2 + 3, 120)
# print(numOfCars, greenTime)

In [2]:
class TrafficLight:
    def __init__(self, trafficID, lanes):
        self.trafficID = trafficID
        self.lanes = lanes
        self.totalCars = 0
        self.carsPerLane = 0
        self.greenTime = 0

    def update_traffic(self, totalCars):
        self.totalCars = totalCars
        self.carsPerLane = ceil(totalCars / self.lanes)
        # self.greenTime = min(self.carsPerLane * 2 + 3, 50)
        self.greenTime = max(min(ceil((self.totalCars * 2.5)/(self.lanes +1)), 50),5)
    def getGreenTime(self):
        return self.greenTime
    def getTrafficID(self):
        return self.trafficID
    def resetGreenTime(self):
        self.greenTime = 0
    def __str__(self):
        return (f"Traffic Light {self.trafficID} -> Lanes: {self.lanes}, Total Cars: {self.totalCars}, "
                f"Cars per Lane: {self.carsPerLane}, Green Time: {self.greenTime}s")
       # totalCars = 20  # Simulated number of cars

class Intersection:
    def __init__(self, intersectionID, trafficLights):
        self.intersectionID = intersectionID
        self.trafficLights = trafficLights
    def getIntersectionID(self):
        return self.intersectionID
    def getTrafficLights(self):
        return self.trafficLights
    def getSignalOn(self):
        for light in self.trafficLights:
            if light.greenTime > 0:
                return light.getTrafficID
        return None


class CircularTrafficLights:
    def __init__(self, lights):
        self.lights = lights
        self.index = 0

    def __iter__(self):
        return self

    def __next__(self):
        if not self.lights:
            raise StopIteration
        light = self.lights[self.index]
        self.index = (self.index + 1) % len(self.lights)
        return light

    # def update_light(self, trafficID, totalCars, carsPerLane, greenTime):
    #     for light in self.lights:
    #         if light.trafficID == trafficID:
    #             light.update_traffic(totalCars, carsPerLane, greenTime)
    #             print(f"Updated Traffic Light {trafficID}")

def save_to_database(data, filename='test.json'):
    try:
        with open(filename, 'r') as json_file:
            existing_data = json.load(json_file)
            if not isinstance(existing_data, list):
                existing_data = []
    except (FileNotFoundError, json.JSONDecodeError):
        existing_data = []

    
    data_dict = {
        f"Cycle {data[0]}": {
            "trafficID": data[1],
            "num_cars": data[2],
            "weatherStamp:": data[3],
            "timeStamp": data[4],
            "dateStamp": data[5],
            "greenTime": data[6]
        }
    }

    existing_data.append(data_dict)  # Use append to add the dictionary to the list

    with open(filename, 'w') as json_file:
        json.dump(existing_data, json_file, indent=4)

In [None]:
def procesIntersectionWithThermal(intersection):
    i = 0
    cycle = 0
    for light in intersection:
        if(i % len(intersection) == 0):
            cycle += 1
        if(cycle == 11):
            break
        i += 1
        time.sleep(2)
        while True:
            try:
                history, numOfCars = generate_estimation()
                break  # Exit the loop if the function succeeds
            except Exception as e:
                print(f"Error occurred: {e}. Retrying...")

        light.update_traffic(numOfCars)
        weatherStamp = history[0][2]
        timeStamp = history[0][3]
        dateStamp = history[0][4]

        data = [
            cycle, light.trafficID, numOfCars,
            weatherStamp, timeStamp, dateStamp,
            light.getGreenTime()
        ]
        save_to_database(data)
        print(light)
        time.sleep(light.getGreenTime())
        light.resetGreenTime()


def processIntersectionWithSensors(intersection):

    i = 0
    cycle = 0
    for light in intersection:
        if(i % len(intersection) == 0):
            cycle += 1
        if(cycle == 11):
            break
        i += 1
        time.sleep(2)
        
    

In [3]:
# Example usage:

intersection = [
    TrafficLight(1, 4), # trafficID, lanes
    TrafficLight(2, 4),
    TrafficLight(3, 4),
    TrafficLight(4, 4)
]

network = [Intersection('A',intersection), Intersection('B',intersection), Intersection('C',intersection), Intersection('D',intersection)]
# traffic_circle = []
# for i, intersection in enumerate(network):
#     traffic_circle.append(CircularTrafficLights(intersection))

traffic_circle1 = CircularTrafficLights(network[0].getTrafficLights())
traffic_circle2 = CircularTrafficLights(network[1].getTrafficLights())
traffic_circle3 = CircularTrafficLights(network[2].getTrafficLights())
traffic_circle4 = CircularTrafficLights(network[3].getTrafficLights())

# Simulating updates and looping through them
i = 0
cycle = 0
for light in traffic_circle:
    if(i % len(intersection) == 0):
        cycle += 1
    if(cycle == 11):
        break
    i += 1
    time.sleep(2)

    while True:
        try:
            history, numOfCars = generate_estimation()
            break  # Exit the loop if the function succeeds
        except Exception as e:
            print(f"Error occurred: {e}. Retrying...")

    light.update_traffic(numOfCars)
    data = [
        cycle, light.trafficID, numOfCars,
        history[0][2], history[0][3], history[0][4],
        light.getGreenTime()
    ]
    save_to_database(data)
    print(light)
    time.sleep(light.getGreenTime())
    light.resetGreenTime()

Traffic Light 1 -> Lanes: 4, Total Cars: 34, Cars per Lane: 9, Green Time: 17s


KeyboardInterrupt: 

In [5]:
import threading
import time

def function1(arg):
    for i in range(5):
        print(f"Thread {arg} iteration {i}")
        # time.sleep(1)

# Create threads targeting the same function but with different arguments
thread1 = threading.Thread(target=function1, args=("1",))
thread2 = threading.Thread(target=function1, args=("2",))

# Start threads
thread1.start()
thread2.start()

# Wait for both threads to complete
thread1.join()
thread2.join()

print("Both threads have completed their execution")


Thread 1 iteration 0
Thread 1 iteration 1
Thread 1 iteration 2
Thread 1 iteration 3
Thread 1 iteration 4
Thread 2 iteration 0
Thread 2 iteration 1
Thread 2 iteration 2
Thread 2 iteration 3
Thread 2 iteration 4
Both threads have completed their execution
