In [2]:
import sys
import random
import logging
import numpy as np 

sys.path.append("../interfaces/")
sys.path.append("../db_functions/")
sys.path.append("../../services/")

from unittest.mock import patch
import datetime

from connection import connect_to_db
from ParkingSpaceInterface import ParkingSpaceInterface as PSI
import service_functions as serv_func
import parking_space_functions as ps_func
from gen_test_data import reset_all_ps 

In [3]:
connect_to_db()

True

In [3]:
def park_car_at_time(space_id, car_id, time):
    @patch("parking_space_functions.now")
    def park(mock_now):
        mock_now.return_value = time
        return serv_func.park_car(space_id, car_id)

    return park()


def leave_car_at_time(car_id, time):
    @patch("parking_space_functions.now")
    def leave(mock_now):
        mock_now.return_value = time
        return serv_func.leave_car(car_id)

    return leave()


def generate_license_plate():
    import string

    letters = "".join(random.choices(string.ascii_uppercase, k=3))
    numbers = "".join(random.choices(string.digits, k=4))
    return letters + numbers

In [4]:
def gen_ps_history():
    logging.basicConfig(level=logging.DEBUG)

    park_cnt = {}  # 停車車次
    flow = {}

    # 清空所有停車格
    logging.debug("reset_all_ps")
    reset_all_ps()
    logging.debug("reset_all_ps done")

    # 設定產生資料的時間範圍
    start_date = datetime.datetime(2023, 12, 18)
    end_date = datetime.datetime(2023, 12, 21)

    # 產生車牌號碼
    car_ids = [generate_license_plate() for i in range(3000)]
    car_ids = list(set(car_ids))

    # 取得所有停車格的 id
    pss = PSI.read_all_ps()
    spaces_ids = [ps["space_id"] for ps in pss]

    # 紀錄每台車停在哪個停車格
    park_at = {}
    
    # 要測試警告的車位
    warning_test_space_ids = ["1001, 2001, 3001, 4001, 5001"]

    # probability
    probability = {
        "park": np.array([1, 1, 1, 1, 1, 5, 8, 15, 15, 15, 7, 3, 5, 4, 3, 2, 2, 2, 2, 2, 2, 1, 1, 1]),
        "leave": np.array([1, 1, 1, 1, 1, 3, 2, 2, 2, 2, 7, 16, 4, 4, 1, 4, 5, 6, 8, 8, 9, 5, 5, 2]),
    }

    probability["park"] = probability["park"] / 50
    probability["leave"] = probability["leave"] / 50

    # 開始模擬
    current_date = start_date
    while current_date <= end_date:
        park_cnt[current_date] = 0

        for hour in range(6, 22):
            if current_date == end_date and hour >10:
                continue
            
            current_hour = current_date + datetime.timedelta(hours=hour)
            flow[current_hour] = len(park_at)

            for sec in range(0, 3600, 5):
                random_num = random.random()
                park_prob = probability["park"][hour]
                leave_prob = probability["leave"][hour] + park_prob

                if random_num < park_prob:
                    park_cnt[current_date] += 1

                    # 選出一個沒有停車的車子和一個沒有被佔用的停車格
                    not_parked_car_ids = [car_id for car_id in car_ids if car_id not in park_at.keys()]
                    not_occupied_space_ids = [space_id for space_id in spaces_ids if space_id not in park_at.values()]
                    if len(not_occupied_space_ids) == 0 or len(not_parked_car_ids) == 0:
                        continue
                    car_id = random.choice(not_parked_car_ids)
                    space_id = random.choice(not_occupied_space_ids)

                    # 停車
                    current_time = current_hour + datetime.timedelta(seconds=sec)
                    park_at[car_id] = space_id
                    success, msg = park_car_at_time(space_id, car_id, current_time)

                    # 印出停車資訊
                    logging.debug(f"{current_time}: {msg}")

                elif random_num < leave_prob:
                    # 選出一個已經停車的車子
                    parking_car_ids = [car_id for car_id in car_ids if car_id in park_at.keys()]
                    if len(parking_car_ids) == 0:
                        continue
                    car_id = random.choice(parking_car_ids)
                    
                    # 如果是要測試警告的車位，就不要離場
                    if park_at[car_id] in warning_test_space_ids:
                        continue

                    # 車子離場
                    current_time = current_hour + datetime.timedelta(seconds=sec)
                    park_at.pop(car_id)
                    success, msg = leave_car_at_time(car_id, current_time)

                    # 印出離場資訊
                    logging.debug(f"{current_time}: {msg}")

        current_date += datetime.timedelta(days=1)
        ps_func.check_pss_status()

    return park_cnt, park_at, flow

In [1]:
gen_ps_history()

NameError: name 'gen_ps_history' is not defined