In [1]:
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import os
from datetime import datetime, timedelta

In [2]:
# 合併北、中、南、坪2022事故資料生成df_2022_raw

parent_dir = "./2022年國道事故簡訊通報資料/"
locations = ["北區", "中區", "南區", "坪控"]
months = [i for i in range(202201, 202213)]

ready_to_concat = []
for loc in locations:
    for month in months:
        df = pd.read_excel(os.path.join(parent_dir, f"{loc}/交通事故簡訊通報狀況及事故型態{month}.xlsx"))
        ready_to_concat.append(df)

df_2022_raw = pd.concat(ready_to_concat, axis=0).reset_index(drop=True)

date = []
weekday = []
for i in range(df_2022_raw.shape[0]):
    year = df_2022_raw["年"].iloc[i]
    month = df_2022_raw["月"].iloc[i]
    day = df_2022_raw["日"].iloc[i]
    YYYYMMDD = f"{year}" + f"{month:02d}" + f"{day:02d}"
    YYYYMMDD = datetime.strptime(YYYYMMDD, "%Y%m%d")
    date.append(YYYYMMDD)
    weekday.append(YYYYMMDD.isoweekday())
    
    # 特殊處理: 處理時間為空值或處理時間不合理(初步設定處理時間超過半天為不合理)
    if pd.isnull(df_2022_raw["處理分鐘"].iloc[i]) == True:
        start = datetime.strptime(df_2022_raw["事件發生"].iloc[i], "%H:%M")
        end = datetime.strptime(df_2022_raw["事件排除"].iloc[i], "%H:%M")
        df_2022_raw["處理分鐘"].iloc[i] = (end-start).total_seconds() // 60
        
df_2022_raw.insert(0, "Weekday", weekday)
df_2022_raw.insert(0, "Date", date)

# 處理分鐘過長視為不合理(此處將處理時間超過半天者視為不合理)
filt = (df_2022_raw["處理分鐘"] <= 720)
df_2022_raw = df_2022_raw.loc[filt].reset_index(drop=True)

# 某些國道方向寫的很奇怪就排除
direct_filter = (df_2022_raw["方向"] != "雙向") & (df_2022_raw["方向"].notna()) & (df_2022_raw["國道名稱"] != "台66") &\
                (df_2022_raw["國道名稱"] != "國道") & (df_2022_raw["國道名稱"].str.startswith("國2往") == False)
df_2022_raw = df_2022_raw.loc[direct_filter].reset_index(drop=True)

# 有些資料的事故發生時間記成了事件結束時間(開頭為1899/12/30的)，所以要更正
for i in range(df_2022_raw.shape[0]):
    if type(df_2022_raw["事件發生"].iloc[i]) == str:
        if df_2022_raw["事件發生"].iloc[i].startswith("1899/12/30 上午"):
            occur_time = datetime.strptime(df_2022_raw["事件發生"].iloc[i], "%Y/%m/%d 上午 %H:%M:%S")
            if occur_time.hour == 12:
                occur_time -= timedelta(hours=12)
            df_2022_raw["時"].iloc[i] = occur_time.hour
            df_2022_raw["分"].iloc[i] = occur_time.minute
        elif df_2022_raw["事件發生"].iloc[i].startswith("1899/12/30 下午"):
            occur_time = datetime.strptime(df_2022_raw["事件發生"].iloc[i], "%Y/%m/%d 下午 %H:%M:%S")
            if occur_time.hour != 12:
                occur_time += timedelta(hours=12)
            df_2022_raw["時"].iloc[i] = occur_time.hour
            df_2022_raw["分"].iloc[i] = occur_time.minute
            
        if df_2022_raw["時"].iloc[i] >= 24:
            occur_time = datetime.strptime(df_2022_raw["事件發生"].iloc[i], "%H:%M")
            df_2022_raw["時"].iloc[i] = occur_time.hour
            df_2022_raw["分"].iloc[i] = occur_time.minute

# 將整理好的資料儲存
df_2022_raw.to_excel("df_2022_raw.xlsx", index=False)

In [3]:
# 接著有些資料用Pandas整理不會比較快，所以要手動整理...

# 這邊記錄一下手動整理的欄位:
# 1.車種: 有時候會有亂碼產生，或是有莫名其妙的空格(e.g.大 客車)，又或是有字體不一致的
# 2.有些事故處理時間是0分鐘的，確認一下是否為人工誤植
# 3.然後df_2022的第290行的國道名稱會讀不到，應該是文字不知道哪邊出了問題，已修正
# 4.有些資料沒有註記車道位置，已人工檢核手動修正，但仍有29筆資料無法辨識佔用車道

# 至於肇事車輛的部分，因為我們最後是用involve_car這個暫存list的長度去看車輛數，所以有些沒有填到肇事車輛的資料就可以不用去理會~

In [4]:
# 接著經過手動整理後，我們會另存成df_2022.xlsx
# 這裡讀取我們剛剛手動整理的xlsx檔
df_2022 = pd.read_excel("df_2022.xlsx", parse_dates=["Date"])

# 發現車輛中有些欄位前後會有多餘空格導致後面判斷有誤，要整理一下
df_2022["車輛1"] = df_2022["車輛1"].str.strip()
df_2022["車輛2"] = df_2022["車輛2"].str.strip()
df_2022["車輛3"] = df_2022["車輛3"].str.strip()
df_2022["車輛4"] = df_2022["車輛4"].str.strip()
df_2022["車輛5"] = df_2022["車輛5"].str.strip()
df_2022["車輛6"] = df_2022["車輛6"].str.strip()
df_2022["車輛7"] = df_2022["車輛7"].str.strip()
df_2022["車輛8"] = df_2022["車輛8"].str.strip()
df_2022["車輛9"] = df_2022["車輛9"].str.strip()
df_2022["車輛10"] = df_2022["車輛10"].str.strip()
df_2022["車輛11"] = df_2022["車輛11"].str.strip()
df_2022["車輛12"] = df_2022["車輛12"].str.strip()

In [5]:
# 這個資料庫可能是由人工手動key資料的，所以名稱並沒有統一
# 比方說小客車就有小客車、小自客、自小客，甚至計程車又是一種車種類別
# 或是國1、國道1號兩種寫法...等
# 因此這裡我們先觀察資料後建立名稱集合

# 國道名稱集合建立
# 縱向國道
N1 = {"國1", "國道1號"}
N1H = {"國1高架"}
N3 = {"國3", "國道3號"}
N3A = {"國3甲", "國道3甲"}
N3N = {"南港連絡道"}
N3K = {"港西聯外道路"}
N5 = {"國5", "國道5號"}
# 橫向國道
N2 = {"國2", "國道2號", "(更正)國2"}
N4 = {"國4", "國道4號"}
N6 = {"國6", "國道6號"}
N8 = {"國8", "國道8號"}
N10 = {"國10", "國道10號"}

# 事故型態
colliTypes = {"追撞", "擦撞", "自撞", "翻", "火", "護欄", "邊坡"}

# 車種集合建立
passenger_car = {"小客車", "小自客", "小自", "自小客", "自營客", "小營客", "計程車", "救護車"}
small_truck = {"小貨車", "小貨"}
bus = {"大客車", "大客", "小巴", "中巴"}
big_truck = {"大貨車", "大貨"}
connect_truck = {"聯結車", "拖板車", "砂石車"}
container_truck = {"貨櫃車"}
special_truck  ={"油罐車", "槽車", "化學槽車"}
construction_vehs = {"施工", "事故", "警戒車", "緩撞車", "工程車"}

# 新增變數：連假期間 / 一般日期間
holiday = set()
new_year_111 = set(map(str, pd.date_range("2022-01-01", "2022-01-02")))
holiday.update(new_year_111)
luner = set(map(str, pd.date_range("2022-01-29", "2022-02-06")))
holiday.update(luner)
peace = set(map(str, pd.date_range("2022-02-26", "2022-02-28")))
holiday.update(peace)
tomb = set(map(str, pd.date_range("2022-04-02", "2022-04-05")))
holiday.update(tomb)
dragon = set(map(str, pd.date_range("2022-06-03", "2022-06-05")))
holiday.update(dragon)
moon = set(map(str, pd.date_range("2022-09-09", "2022-09-11")))
holiday.update(moon)
double10 = set(map(str, pd.date_range("2022-10-08", "2022-10-10")))
holiday.update(double10)
new_year_112 = set(map(str, pd.date_range("2022-12-30", "2022-12-31")))
holiday.update(new_year_112)

# 週一 / 週五 / 平日 / 假日
Mon = {1}
Fri = {5}
workday = {2, 3, 4}
weekend = {6, 7}

# 肇事分析系統資料庫中，肇事車輛數欄位最多可以填到12輛
num_cars_cols = ["車輛1", "車輛2", "車輛3", "車輛4", "車輛5", "車輛6",
                 "車輛7", "車輛8", "車輛9", "車輛10", "車輛11", "車輛12"]

# 定義我們要的資料集欄位
dataset = {
    "Date": [], "Weekday": [], "Day/Night": [], "Holiday": [],
    "Highway/Dir": [], "Center": [], "Lane": [],
    "AccidentType": [], "CarType": [], "NumCars": [],
    "Duration": []
}

In [6]:
# 接著就要把資料丟到我們創建的dataframe了
for i in range(df_2022.shape[0]):
    # 日期: 已經整理過了，直接丟進去
    dataset["Date"].append(df_2022["Date"].iloc[i])
    
    # 週一 / 週五 / 平日 / 假日
    if df_2022["Weekday"].iloc[i] in Mon:
        dataset["Weekday"].append("週一")
    elif df_2022["Weekday"].iloc[i] in Fri:
        dataset["Weekday"].append("週五")
    elif df_2022["Weekday"].iloc[i] in workday:
        dataset["Weekday"].append("平日")
    elif df_2022["Weekday"].iloc[i] in weekend:
        dataset["Weekday"].append("假日")
        
    # 日間 / 夜間
    if 6 <= df_2022["時"].iloc[i] < 18:
        dataset["Day/Night"].append("日間")
    else:
        dataset["Day/Night"].append("夜間")
        
    # 連假期間
    if str(df_2022["Date"].iloc[i]) in holiday:
        dataset["Holiday"].append("T")
    else:
        dataset["Holiday"].append("F")
    
    # 國道方向
    if df_2022["國道名稱"].iloc[i] in N1:
        if df_2022["方向"].iloc[i].startswith("北"):
            dataset["Highway/Dir"].append("國1北向")
        elif df_2022["方向"].iloc[i].startswith("南"):
            dataset["Highway/Dir"].append("國1南向")
    
    elif df_2022["國道名稱"].iloc[i] in N1H:
        if df_2022["方向"].iloc[i].startswith("北"):
            dataset["Highway/Dir"].append("國1高架北向")
        elif df_2022["方向"].iloc[i].startswith("南"):
            dataset["Highway/Dir"].append("國1高架南向")
    
    elif df_2022["國道名稱"].iloc[i] in N3:
        if df_2022["方向"].iloc[i].startswith("北"):
            dataset["Highway/Dir"].append("國3北向")
        elif df_2022["方向"].iloc[i].startswith("南"):
            dataset["Highway/Dir"].append("國3南向")
    
    elif df_2022["國道名稱"].iloc[i] in N3A:
        if df_2022["方向"].iloc[i].startswith("東"):
            dataset["Highway/Dir"].append("國3甲東向")
        elif df_2022["方向"].iloc[i].startswith("西"):
            dataset["Highway/Dir"].append("國3甲西向")
    
    elif df_2022["國道名稱"].iloc[i] in N3N:
        if df_2022["方向"].iloc[i].startswith("北"):
            dataset["Highway/Dir"].append("南港連絡道北向")
        elif df_2022["方向"].iloc[i].startswith("南"):
            dataset["Highway/Dir"].append("南港連絡道南向")
    
    elif df_2022["國道名稱"].iloc[i] in N3K:
        if df_2022["方向"].iloc[i].startswith("北"):
            dataset["Highway/Dir"].append("港西聯外道北向")
        elif df_2022["方向"].iloc[i].startswith("南"):
            dataset["Highway/Dir"].append("港西聯外道南向")
    
    elif df_2022["國道名稱"].iloc[i] in N5:
        if df_2022["方向"].iloc[i].startswith("北"):
            dataset["Highway/Dir"].append("國5北向")
        elif df_2022["方向"].iloc[i].startswith("南"):
            dataset["Highway/Dir"].append("國5南向")
            
    elif df_2022["國道名稱"].iloc[i] in N2:
        if df_2022["方向"].iloc[i].startswith("東"):
            dataset["Highway/Dir"].append("國2東向")
        elif df_2022["方向"].iloc[i].startswith("西"):
            dataset["Highway/Dir"].append("國2西向")
            
    elif df_2022["國道名稱"].iloc[i] in N4:
        if df_2022["方向"].iloc[i].startswith("東"):
            dataset["Highway/Dir"].append("國4東向")
        elif df_2022["方向"].iloc[i].startswith("西"):
            dataset["Highway/Dir"].append("國4西向")
            
    elif df_2022["國道名稱"].iloc[i] in N6:
        if df_2022["方向"].iloc[i].startswith("東"):
            dataset["Highway/Dir"].append("國6東向")
        elif df_2022["方向"].iloc[i].startswith("西"):
            dataset["Highway/Dir"].append("國6西向")
            
    elif df_2022["國道名稱"].iloc[i] in N8:
        if df_2022["方向"].iloc[i].startswith("東"):
            dataset["Highway/Dir"].append("國8東向")
        elif df_2022["方向"].iloc[i].startswith("西"):
            dataset["Highway/Dir"].append("國8西向")
            
    elif df_2022["國道名稱"].iloc[i] in N10:
        if df_2022["方向"].iloc[i].startswith("東"):
            dataset["Highway/Dir"].append("國10東向")
        elif df_2022["方向"].iloc[i].startswith("西"):
            dataset["Highway/Dir"].append("國10西向")
            
    # 通報單位
    if df_2022["簡訊內容"].iloc[i].find("北控") != -1:
        dataset["Center"].append("北控")
    elif df_2022["簡訊內容"].iloc[i].find("中控") != -1:
        dataset["Center"].append("中控")
    elif df_2022["簡訊內容"].iloc[i].find("南控") != -1:
        dataset["Center"].append("南控")
    elif df_2022["簡訊內容"].iloc[i].find("坪控") != -1:
        dataset["Center"].append("坪控")
        
    # 車道位置
    lane_loc = []
    if df_2022["內路肩"].iloc[i] == 1:
        lane_loc.append("內路肩")
    if df_2022["內車道"].iloc[i] == 1:
        lane_loc.append("內車道")
    if df_2022["中內車道"].iloc[i] == 1:
        lane_loc.append("中內車道")
    if df_2022["中車道"].iloc[i] == 1:
        lane_loc.append("中車道")
    if df_2022["中外車道"].iloc[i] == 1:
        lane_loc.append("中外車道")
    if df_2022["外車道"].iloc[i] == 1:
        lane_loc.append("外車道")
    if df_2022["外路肩"].iloc[i] == 1:
        lane_loc.append("外路肩")
    if df_2022["匝道"].iloc[i] == 1:
        lane_loc.append("匝道")
    dataset["Lane"].append(lane_loc)
    
    # 事故型態
    accitype = []
    if df_2022["簡訊內容"].iloc[i].find("追撞") != -1:
        accitype.append("追撞")
    if df_2022["簡訊內容"].iloc[i].find("擦撞") != -1:
        accitype.append("擦撞")
    if df_2022["簡訊內容"].iloc[i].find("自撞") != -1:
        if df_2022["簡訊內容"].iloc[i].find("小自撞") != -1:
            pass    # 如果有那種「1小自撞護欄」的，不算自撞事故，要排除
        else:
            accitype.append("自撞")
    if df_2022["簡訊內容"].iloc[i].find("翻") != -1:
        accitype.append("翻覆")
    if df_2022["簡訊內容"].iloc[i].find("火") != -1:
        accitype.append("火燒車")
    if df_2022["簡訊內容"].iloc[i].find("護欄") != -1:
        accitype.append("撞護欄")
    if df_2022["簡訊內容"].iloc[i].find("邊坡") != -1:
        accitype.append("衝出邊坡")
    if len(accitype) == 0:    # 如果皆非以上事故，填入其他
        accitype.append("其他")
        dataset["AccidentType"].append(accitype)
    else:
        dataset["AccidentType"].append(accitype)
    
    # 車種
    involve_car = []
    while True:
        for col in num_cars_cols:
            if pd.notna(df_2022[col].iloc[i]):
                if df_2022[col].iloc[i] in passenger_car:
                    involve_car.append("小客車")
                elif df_2022[col].iloc[i] in small_truck:
                    involve_car.append("小貨車")
                elif df_2022[col].iloc[i] in bus:
                    involve_car.append("大客車")
                elif df_2022[col].iloc[i] in big_truck:
                    involve_car.append("大貨車")
                elif df_2022[col].iloc[i] in connect_truck:
                    involve_car.append("聯結車")
                elif df_2022[col].iloc[i] in container_truck:
                    involve_car.append("貨櫃車")
                elif df_2022[col].iloc[i] in special_truck:
                    involve_car.append("危險品運送車")
                else:
                    # 新增: 判別施工車輛
                    for veh in construction_vehs:
                        if df_2022[col].iloc[i].startswith(veh):
                            involve_car.append("施工車輛")
                            break
                        else:
                            involve_car.append("其他車")
                            break
            else:
                break
        break
    dataset["CarType"].append(involve_car)
    
    # 肇事車輛數
    if len(involve_car) == 1:
        dataset["NumCars"].append("1")
    elif len(involve_car) == 2:
        dataset["NumCars"].append("2")
    elif len(involve_car) == 3:
        dataset["NumCars"].append("3")
    elif len(involve_car) >= 4:
        dataset["NumCars"].append("4以上")
    
    # 處理時間
    dataset["Duration"].append(df_2022["處理分鐘"].iloc[i])
        
new_dataframe = pd.DataFrame(dataset).sort_values(by=["Date"])
new_dataframe = new_dataframe.astype({"Lane": "string", "AccidentType": "string", "CarType": "string"})
new_dataframe

Unnamed: 0,Date,Weekday,Day/Night,Holiday,Highway/Dir,Center,Lane,AccidentType,CarType,NumCars,Duration
10048,2022-01-01,假日,夜間,T,國1北向,中控,"['內車道', '中車道']","['追撞', '翻覆']","['小客車', '小客車']",2,60
104,2022-01-01,假日,夜間,T,國3北向,北控,['匝道'],['追撞'],"['小客車', '小客車', '小客車']",3,12
103,2022-01-01,假日,夜間,T,國3北向,北控,['外車道'],['追撞'],"['小客車', '小客車']",2,15
102,2022-01-01,假日,夜間,T,國3北向,北控,['內車道'],['追撞'],"['小客車', '小客車']",2,2
101,2022-01-01,假日,夜間,T,國1南向,北控,['內車道'],['追撞'],"['小客車', '小客車']",2,22
...,...,...,...,...,...,...,...,...,...,...,...
9944,2022-12-31,假日,日間,T,國1高架北向,北控,['中車道'],['追撞'],"['小客車', '小客車']",2,9
9943,2022-12-31,假日,日間,T,國2西向,北控,['外車道'],['追撞'],"['小客車', '小貨車']",2,20
9942,2022-12-31,假日,日間,T,國3南向,北控,['匝道'],['追撞'],"['小客車', '小客車']",2,4
9956,2022-12-31,假日,日間,T,國1北向,北控,['外車道'],['自撞'],['小客車'],1,39


In [7]:
# 如果要用貝氏分類的方法分類，勢必所有變數都只能有一個類別
# 但通常事故的發生，可能會有兩台以上不同的肇事車種，
# 佔用車道位置也可能會橫跨到兩車道、三車道、甚至全車道
# 甚至事故型態可能也很多元...

# 要如何決定這起事故是屬於哪個車種、佔用哪個車道，屬於哪種事故型態
# 我們這裡利用各車種、各車道、以及各事故型態的事故處理時間中位數來排序
# 以該車種/車道/事故型態最高者作為該起事故的代表

# 比方說有一起事故共有貨櫃車、小客車、小貨車三種車種
# 因為貨櫃車的事故處理時間中位數最高，因此我們直接將該起事故的肇事車種判定為貨櫃車

In [8]:
# 計算事故佔用車道位置之事故處理時間中位數是否相同
# 故定義: 內路肩 > 中內車道 > 外路肩 > 中外車道 > 內車道 = 中車道 = 外車道 > 匝道
lane_locs = ["內路肩", "內車道", "中內車道", "中車道", "中外車道", "外車道", "外路肩", "匝道"]
all_data = []
for position in lane_locs:
    filted_df = new_dataframe.loc[new_dataframe["Lane"].str.contains(position)]
    data = np.array(filted_df["Duration"])
    print(f"{position}之事故處理時間中位數: {np.median(data)}")
    all_data.append(data)

內路肩之事故處理時間中位數: 27.0
內車道之事故處理時間中位數: 14.0
中內車道之事故處理時間中位數: 19.0
中車道之事故處理時間中位數: 14.0
中外車道之事故處理時間中位數: 15.0
外車道之事故處理時間中位數: 14.0
外路肩之事故處理時間中位數: 17.0
匝道之事故處理時間中位數: 13.0


In [9]:
# 計算事故發生之車種之事故處理時間中位數是否相同
# 故定義: 貨櫃車 > 危險品運送車 > 聯結車 > 其他車 > 大貨車 > 施工車輛 > 大客車 > 小貨車 > 小客車
veh_types = ["小客車", "小貨車", "大客車", "大貨車", "聯結車", "貨櫃車", "危險品運送車", "施工車輛", "其他車"]
all_data = []
for veh in veh_types:
    filted_df = new_dataframe.loc[new_dataframe["CarType"].str.contains(veh)]
    data = np.array(filted_df["Duration"])
    print(f"{veh}之事故處理時間中位數: {np.median(data)}")
    all_data.append(data)

小客車之事故處理時間中位數: 13.0
小貨車之事故處理時間中位數: 16.0
大客車之事故處理時間中位數: 17.0
大貨車之事故處理時間中位數: 18.0
聯結車之事故處理時間中位數: 26.0
貨櫃車之事故處理時間中位數: 130.0
危險品運送車之事故處理時間中位數: 56.0
施工車輛之事故處理時間中位數: 32.0
其他車之事故處理時間中位數: 24.0


In [10]:
# 計算事故型態之事故處理時間中位數是否相同
# 故定義: 衝出邊坡 > 火燒車 > 翻覆 > 自撞 > 其他 > 撞護欄 > 擦撞 > 追撞
accitypes = ["追撞", "擦撞", "自撞", "翻覆", "火燒車", "撞護欄", "衝出邊坡", "其他"]
all_data = []
for accident in accitypes:
    filted_df = new_dataframe.loc[new_dataframe["AccidentType"].str.contains(accident)]
    data = np.array(filted_df["Duration"])
    print(f"{accident}之事故處理時間中位數: {np.median(data)}")
    all_data.append(data)

追撞之事故處理時間中位數: 12.0
擦撞之事故處理時間中位數: 17.0
自撞之事故處理時間中位數: 24.0
翻覆之事故處理時間中位數: 42.0
火燒車之事故處理時間中位數: 63.0
撞護欄之事故處理時間中位數: 22.0
衝出邊坡之事故處理時間中位數: 65.0
其他之事故處理時間中位數: 23.0


In [11]:
# 以下就是將肇事資料集中的車種/車道/事故型態給予明確分類(依據上面的中位數大小)
# 但還是有些資料，在檢視過簡訊內容後仍無法判定佔用車道位置，所以直接剔除
new_dataframe = new_dataframe.loc[new_dataframe["Lane"].str.startswith("[]") == False]

for i in range(new_dataframe.shape[0]):    
    # 車道位置
    positions = ["內路肩", "中內車道", "外路肩", "中外車道", "內車道", "中車道", "外車道", "匝道"]
    idx = 0
    while idx < len(positions):
        if new_dataframe["Lane"].iloc[i].find(positions[idx]) == -1:
            idx += 1
        else:
            new_dataframe["Lane"].iloc[i] = positions[idx]
            break
        
    # 車種
    cartypes = ["貨櫃車", "危險品運送車", "聯結車", "其他車", "大貨車", "施工車輛", "大客車", "小貨車", "小客車"]
    idx = 0
    while idx < len(cartypes):
        if new_dataframe["CarType"].iloc[i].find(cartypes[idx]) == -1:
            idx += 1
        else:
            new_dataframe["CarType"].iloc[i] = cartypes[idx]
            break
        
    # 事故型態
    accitypes = ["衝出邊坡", "火燒車", "翻覆", "自撞", "其他", "撞護欄", "擦撞", "追撞"]
    idx = 0
    while idx < len(accitypes):
        if new_dataframe["AccidentType"].iloc[i].find(accitypes[idx]) == -1:
            idx += 1
        else:
            new_dataframe["AccidentType"].iloc[i] = accitypes[idx]
            break
        
new_dataframe

Unnamed: 0,Date,Weekday,Day/Night,Holiday,Highway/Dir,Center,Lane,AccidentType,CarType,NumCars,Duration
10048,2022-01-01,假日,夜間,T,國1北向,中控,內車道,翻覆,小客車,2,60
104,2022-01-01,假日,夜間,T,國3北向,北控,匝道,追撞,小客車,3,12
103,2022-01-01,假日,夜間,T,國3北向,北控,外車道,追撞,小客車,2,15
102,2022-01-01,假日,夜間,T,國3北向,北控,內車道,追撞,小客車,2,2
101,2022-01-01,假日,夜間,T,國1南向,北控,內車道,追撞,小客車,2,22
...,...,...,...,...,...,...,...,...,...,...,...
9944,2022-12-31,假日,日間,T,國1高架北向,北控,中車道,追撞,小客車,2,9
9943,2022-12-31,假日,日間,T,國2西向,北控,外車道,追撞,小貨車,2,20
9942,2022-12-31,假日,日間,T,國3南向,北控,匝道,追撞,小客車,2,4
9956,2022-12-31,假日,日間,T,國1北向,北控,外車道,自撞,小客車,1,39


In [12]:
# 存檔
# 後面用不到日期這個欄位的，因此刪掉
new_dataframe.drop(columns="Date", inplace=True)
new_dataframe.to_excel("dataset.xlsx", index=False, encoding="utf_8_sig")

In [13]:
# 接著就來檢視模式的分類效果


class BayesClassifier:
    def __init__(self, dataset, space, max_minutes):
        self.original_dataframe = pd.read_excel(dataset)
        self.space = space
        self.max_minutes = max_minutes
        
        if self.space == 5:
            self.label = ["<5", "5-10", "10-15", "15-20", "20-25", "25-30",
                          "30-35", "35-40", "40-45", "45-50", "50-55", "55-60",
                          "60-65", "65-70", "70-75", "75-80", "80-85", "85-90", "90+"]
        elif self.space == 10:
            self.label = ["<10", "10-20", "20-30", "30-40", "40-50", "50-60",
                          "60-70", "70-80", "80-90", "90+"]
        elif self.space == 15:
            self.label = ["<15", "15-30", "30-45", "45-60", "60-75", "75-90", "90+"]
            
        self.dataframe = self.classified_duration(self.original_dataframe.copy(deep=True))
    
    def classified_duration(self, df):
        for i in range(df.shape[0]):
            try:
                df["Duration"].iloc[i] = self.label[df["Duration"].iloc[i]//self.space]
            except IndexError:
                df["Duration"].iloc[i] = self.label[-1]
        return df
    
    def get_prob_distr(self, df):
        matrix = np.zeros((self.max_minutes+self.space+1) // self.space)
        for i in range(df.shape[0]):
            try:
                matrix[int(df["Duration"].iloc[i] // self.space)] += 1
            except IndexError:
                matrix[-1] += 1
        return matrix
    
    def compute_probs(self):
        print("... computing ...")
        predict_results = []
        minutes_list = [i for i in range(self.space, self.max_minutes+self.space+1, self.space)]
        correct_predict = 0
        for i in range(self.original_dataframe.shape[0]):
            condition = (
                (self.original_dataframe["Weekday"] == self.original_dataframe["Weekday"].iloc[i]) & \
                (self.original_dataframe["Day/Night"] == self.original_dataframe["Day/Night"].iloc[i]) & \
                (self.original_dataframe["Holiday"] == self.original_dataframe["Holiday"].iloc[i]) & \
                (self.original_dataframe["Highway/Dir"] == self.original_dataframe["Highway/Dir"].iloc[i]) & \
                (self.original_dataframe["Center"] == self.original_dataframe["Center"].iloc[i]) & \
                (self.original_dataframe["Lane"] == self.original_dataframe["Lane"].iloc[i]) & \
                (self.original_dataframe["AccidentType"] == self.original_dataframe["AccidentType"].iloc[i]) & \
                (self.original_dataframe["CarType"] == self.original_dataframe["CarType"].iloc[i]) & \
                (self.original_dataframe["NumCars"] == self.original_dataframe["NumCars"].iloc[i])
            )
            filted_dataframe = self.original_dataframe.loc[condition].reset_index(drop=True)
            prob_matrix = self.get_prob_distr(filted_dataframe) / filted_dataframe.shape[0]
            
            ith = 0
            while ith < len(prob_matrix):
                if prob_matrix[ith] == max(prob_matrix):
                    if ith > 0:
                        predict_results.append(((0.5 * (minutes_list[ith]+minutes_list[ith-1])), prob_matrix[ith]))
                    elif ith == 0:
                        predict_results.append(((0.5 * (minutes_list[ith])), prob_matrix[ith]))
                    
                    if self.original_dataframe["Duration"].iloc[i] < self.max_minutes:
                        if abs(predict_results[i][0] - self.original_dataframe["Duration"].iloc[i]) <= self.space:
                            correct_predict += 1
                    elif self.original_dataframe["Duration"].iloc[i] >= self.max_minutes:
                        if predict_results[i][0] > self.max_minutes:
                            correct_predict += 1
                    break
                else:
                    ith += 1
        
        print("... done!\n")
        print("... evaluating ...")
        print("... done!")
        return predict_results, correct_predict / self.original_dataframe.shape[0]
    
    def predict(self, accidata):
        condition = (
            (self.original_dataframe["Weekday"] == accidata["Weekday"]) & \
            (self.original_dataframe["Day/Night"] == accidata["Day/Night"]) & \
            (self.original_dataframe["Holiday"] == accidata["Holiday"]) & \
            (self.original_dataframe["Highway/Dir"] == accidata["Highway/Dir"]) & \
            (self.original_dataframe["Center"] == accidata["Center"]) & \
            (self.original_dataframe["Lane"] == accidata["Lane"]) & \
            (self.original_dataframe["AccidentType"] == accidata["AccidentType"]) & \
            (self.original_dataframe["CarType"] == accidata["CarType"]) & \
            (self.original_dataframe["NumCars"] == accidata["NumCars"])
        )
        filted_dataframe = self.original_dataframe.loc[condition].reset_index(drop=True)
        prob_matrix = self.get_prob_distr(filted_dataframe) / filted_dataframe.shape[0]
            
        ith = 0
        while ith < len(prob_matrix):
            if prob_matrix[ith] == max(prob_matrix):
                predict_interval = self.label[ith]
                break
            else:
                ith += 1
        
        return predict_interval

In [14]:
# 我們直接拿整個資料集，利用貝氏定理計算各資料的事故處理時間，並驗證準確率
if __name__ == "__main__":
    print("Start:", str(datetime.now())[:-7])
    classifier = BayesClassifier(dataset="dataset.xlsx", space=15, max_minutes=90)
    predict_results, correct_rate = classifier.compute_probs()
    print(f"\nCorrect Rate: {correct_rate:.2%}\n")
    print("End:", str(datetime.now())[:-7])

Start: 2023-05-07 17:43:21
... computing ...
... done!

... evaluating ...
... done!

Correct Rate: 85.68%

End: 2023-05-07 17:50:15


In [15]:
# 後續使用時，只要將所需欄位填入資料，即可給出一個預估處理時間的區間
data = {
    "Weekday": "假日",
    "Day/Night": "夜間",
    "Holiday": "T",
    "Highway/Dir": "國1北向",
    "Center": "中控",
    "Lane": "內車道",
    "AccidentType": "翻覆",
    "CarType": "小客車",
    "NumCars": "2"
}
predict_interval = classifier.predict(data)
print(f"預估事故處理時間約{predict_interval}分鐘")

預估事故處理時間約60-75分鐘
