In [72]:
from lxml import etree
import pandas as pd
from tqdm import tqdm

In [73]:
file_path = "C:/Users/Настя/YandexDisk-n4skolesnikova/HSE 4th year/Graduation Thesis/gibdd_parser/storage/ДТП_"

regions = [1, 3, 4, 5, 7, 8, 10, 11, 12, 14, 15, 17, 18, 19, 20, 22, 24, 25, 26, 27, 28, 29, 30, 
           32, 33, 34, 35, 36, 37, 38, 40, 41, 42, 44, 45, 46, 47, 49, 50, 52, 53, 54, 56, 57, 58, 
           60, 61, 63, 64, 65, 66, 67, 68, 69, 70, 71, 73, 75, 76, 77, 78, 79, 80, 81, 82, 83, 84, 
           85, 86, 87, 88, 89, 90, 91, 92, 93, 94, 95, 96, 97, 98, 99, 71100, 71140, 10011]

years = range(2015, 2025)
months = ['01', '02', '03', '04', '05', '06', '07', '08', '09', '10', '11', '12']

# test
# regions = [42, 45]
# years = [2021, 2023]
# months = ['01', '02']


russians = ['VIS', 'Bronto', 'Lada', 'GAZ', 'ГАЗ', 'Газ', 'Урал', 'ПАЗ', 'КАВЗ', 'ЛиАЗ', 'ГОЛАЗ', 'КАМАЗ', 'НефАЗ', 'VDL', 'Бравис',\
            'УАЗ', 'Ssang', 'ЗИЛ', 'Баз', 'Грейт', 'Opel', 'Volgabus', 'BAW', 'Tar', 'Lifan', 'Geely', 'Блеск', 'Chery', 'Jianghuai', 'Хавтай']

In [75]:
file_paths = []
for region in regions:
    for year in years:
        for month in months:
            file_name = file_path + str(region) + '_' + str(year) + '_' + month + '.xml'
            file_paths.append((region, file_name))

data = []
dropped_rta = 0

for region, file_name in tqdm(file_paths, total=len(file_paths), desc="Processing all files"):
    try:
        tree = etree.parse(file_name)
        root = tree.getroot()

        # loop over accidents for the n-th month of the m-th year in the k-th region
        for tab in root.iter('tab'):
            kuch = tab.find('KUCH')
            if kuch is not None and kuch.text and kuch.text.isdigit():
                if int(kuch.text) >= 10:
                    dropped_rta += 1
                    continue
            kts = tab.find('KTS')       # num of vehicles in accident
            # removing outliers
            if kts is not None and kts.text and kts.text.isdigit():
                if int(kts.text) >= 5:
                    dropped_rta += 1
                    continue

            record = {}
            record['region'] = region

            for element in tab.iter():
                if element.tag not in {'tab', 'infoDtp', 'ts_info', 'ts_uch', 'uchInfo'}:
                    record[element.tag] = element.text
                elif element.tag == 'infoDtp':
                    for info in element.iter():
                        if info.tag not in {'infoDtp', 'ts_info', 'ts_uch', 'uchInfo'}:
                            record[info.tag] = info.text


            # creating binary features
            record['vehicle_failure'] = 0
            record['non_private_vehicle'] = 0
            record['russian_vehicle'] = 0

            record['white_vehicle'] = 0
            record['black_vehicle'] = 0
            record['colored_vehicle'] = 0

            record['drunk_driver'] = 0
            record['female_driver'] = 0
            record['escaped'] = 0
            record['no_seatbelt_injury'] = 0


            # creating non-binary features
            record['n_drunk'] = 0
            record['n_children'] = 0
            record['n_cyclists'] = 0
            record['n_pedestrians'] = 0

            record['vehicle_age_min'] = 0
            record['vehicle_age_max'] = 0
            record['vehicle_age_avg'] = 0
            lst_age = []        # auxiliary element

            record['n_class_a'] = 0
            record['n_class_b'] = 0
            record['n_class_c'] = 0
            record['n_class_d'] = 0
            record['n_class_e'] = 0
            record['n_class_s'] = 0

            record['n_front_drive'] = 0
            record['n_rear_drive'] = 0
            record['n_4wd'] = 0

            record['n_guilty'] = 0
            record['guilty_share'] = 0
            num_of_guilty_drivers = 0       # auxiliary element
            record['n_fatal_violations'] = 0
            record['violations'] = None
            record['injury_severity'] = None

            record['guilty_exp_avg'] = None
            guilty_drivers_experience = 0       # auxiliary element
            record['exp_avg'] = None
            drivers_experience = 0      # auxiliary element

            # auxiliary elements
            violations = set()
            injuries = set()

            # loop over vehicles in the accident
            for ts_info in tab.iter('ts_info'):

                failures = ts_info.find('t_n')
                if failures is not None and failures.text and failures.text != 'Технические неисправности отсутствуют':
                    record['vehicle_failure'] = 1

                property = ts_info.find('f_sob')
                if property is not None and property.text and property.text != 'Частная собственность':
                    record['non_private_vehicle'] = 1

                year_of_release = ts_info.find('g_v')
                if year_of_release is not None and year_of_release.text and year_of_release.text.isdigit():
                    lst_age.append(int(year_of_release.text))

                color = ts_info.find('color')
                if color is not None and color.text:
                    if color.text == 'Белый':
                        record['white_vehicle'] = 1
                    elif color.text == 'Черный':
                        record['black_vehicle'] = 1
                    else:
                        record['colored_vehicle'] = 1

                wheel = ts_info.find('r_rul')
                if wheel is not None:
                    if wheel.text == 'С передним приводом':
                        record['n_front_drive'] += 1
                    elif wheel.text == 'С задним приводом':
                        record['n_rear_drive'] += 1
                    elif wheel.text == 'Полноприводные':
                        record['n_4wd'] += 1

                vehicle_class = ts_info.find('t_ts')
                if vehicle_class is not None and vehicle_class.text:
                    if vehicle_class.text == 'А-класс (особо малый) до 3,5 м':
                        record['n_class_a'] += 1
                    elif vehicle_class.text == 'В-класс (малый) до 3,9 м':
                        record['n_class_b'] += 1
                    elif vehicle_class.text == 'С-класс (малый средний, компактный) до 4,3 м':
                        record['n_class_c'] += 1
                    elif vehicle_class.text == 'D-класс (средний) до 4,6 м':
                        record['n_class_d'] += 1
                    elif vehicle_class.text == 'Е-класс (высший средний, бизнес-класс) до 4,9 м':
                        record['n_class_e'] += 1
                    elif vehicle_class.text == 'S-класс (высший, представительский класс) более 4,9 м':
                        record['n_class_s'] += 1

                model = ts_info.find('m_ts')
                if model is not None and model.text:
                    for rus in russians:
                        if rus in model:
                            record['russian_vehicle'] = 1
                            continue


                is_vehicle_guilty = 0

                # loop over passengers
                for ts_uch in ts_info.iter('ts_uch'):

                    drunk = ts_uch.find('ALCO')                                
                    participant = ts_uch.find('k_UCH')
                    gender = ts_uch.find('POL')

                    if participant is not None:
                        if participant.text == 'Водитель' and drunk is not None and drunk.text:
                            record['drunk_driver'] = 1
                            record['n_drunk'] += 1
                        elif participant.text == 'Пассажир' and drunk is not None and drunk.text:
                            record['n_drunk'] += 1
                        elif participant.text == 'Велосипедист':
                            record['n_cyclists'] += 1
                        
                        if gender is not None and gender.text == 'Женский' and participant.text == 'Водитель':
                            record['female_driver'] = 1

                    # escaped drivers
                    leaving = ts_uch.find('s_SM')
                    if leaving is not None and leaving.text and leaving.text != 'Нет (не скрывался)':
                        record['escaped'] = 1

                    child = ts_uch.find('s_SEAT_GROUP')
                    if child is not None and child.text:
                        record['n_children'] += 1

                    safety_belt = ts_uch.find('SAFETY_BELT')
                    injury = ts_uch.find('s_T')
                    if injury is not None and injury.text and injury.text != 'Не пострадал':
                        injuries.add(injury.text)
                        if safety_belt is not None and safety_belt.text == 'Нет':
                            record['no_seatbelt_injury'] = 1

                    is_obj_guily = 0
                    
                    # fatal (provides driver's guilty evidence)
                    for fatal_violation in ts_uch.iter('NPDD'):
                        if fatal_violation is not None and fatal_violation.text and fatal_violation.text != 'Нет нарушений':
                            is_vehicle_guilty = 1
                            violations.add(fatal_violation.text)

                    # concomitant
                    for violation in ts_uch.iter('SOP_NPDD'):
                        if violation is not None and violation.text and violation.text != 'Нет нарушений':
                            violations.add(violation.text)

                    experience = ts_uch.find('v_ST')
                    if experience is not None and experience.text and experience.text.isdigit():
                        drivers_experience += int(experience.text)      # for total
                        if fatal_violation is not None and fatal_violation.text and fatal_violation.text != 'Нет нарушений':
                            guilty_drivers_experience += int(experience.text)       # for guilty drivers

                record['n_guilty'] += is_vehicle_guilty
                num_of_guilty_drivers += is_vehicle_guilty

            if num_of_guilty_drivers != 0:  
                record['guilty_exp_avg'] = guilty_drivers_experience / num_of_guilty_drivers        # guilty drivers
            record['exp_avg'] = drivers_experience / int(kts.text)          # total

            if lst_age:
                record['vehicle_age_avg'] = sum(lst_age) / len(lst_age)
                record['vehicle_age_min'] = min(lst_age)
                record['vehicle_age_max'] = max(lst_age)
            else:
                record['vehicle_age_avg'] = None
                record['vehicle_age_min'] = None
                record['vehicle_age_max'] = None

            pedestrian = tab.find('uchInfo')
            if pedestrian is not None:
                record['n_pedestrians'] += 1


            # loop over pedestrians (outside vehicles)
            guilty_peds = 0
            for uchInfo in tab.iter('uchInfo'):
                
                # escaped pedestrians
                leaving = uchInfo.find('s_SM')
                if leaving is not None and leaving.text and leaving.text != 'Нет (не скрывался)':
                    record['escaped'] = 1

                is_ped_guily = 0

                # fatal
                for violation in uchInfo.iter('NPDD'):
                    if violation is not None and violation.text and violation.text != 'Нет нарушений':
                        is_ped_guily = 1
                        violations.add(violation.text)
                guilty_peds += is_ped_guily

                # concomitant
                for violation in uchInfo.iter('SOP_NPDD'):
                    if violation is not None and violation.text and violation.text != 'Нет нарушений':
                        violations.add(violation.text)

                injury = uchInfo.find('s_T')
                if injury is not None and injury.text and injury.text != 'Не пострадал':
                    injuries.add(injury.text)

            record['n_guilty'] += guilty_peds
            record['guilty_share'] = record['n_guilty'] / int(kts.text)
            record['violations'] = violations
            record['n_fatal_violations'] = len(violations)

            if len(injuries) != 0:
                record['injury_severity'] = injuries
            else:
                record['injury_severity'] = {'Не пострадал'}

            data.append(record)

    except OSError:
        continue

print(f"Number of dropped outliers: {dropped_rta}")
df = pd.DataFrame(data)

# remove logical meaningless columns
df = df.drop(['color', 'f_sob', 'g_v', 'm_pov', 'm_ts', 'marka_ts', 'n_ts', 'o_pf',
       'r_rul', 't_n', 't_ts', 'ts_s', 'ALCO', 'INJURED_CARD_ID', 'k_UCH',
       'NPDD', 'n_UCH', 'POL', 'SAFETY_BELT', 'SOP_NPDD', 's_SEAT_GROUP',
       's_SM', 's_T', 'v_ST'], axis=1)

# remove unnecessary columns
df = df.drop(['district', 'house', 'street', 'rowNum', 'EMTP_NUMBER', 's_dtp'], axis=1)

print(f"\nNumber of observations: {df.shape[0]}")
print(f"Number of variables: {df.shape[1]}")

new_column_names = {
    'DTPV': 'TYPE',
    'kartId': 'ID',
    'KTS': 'n_VEHICLES',
    'KUCH': 'n_PARTICIPANTS',
    'POG': 'n_DEATHS',
    'RAN': 'n_INJURED',
    'date': 'DATE',
    'time': 'TIME',
    'region': 'REGION',

    'k_ul': 'street_rank',
    'dor': 'road_name',
    'dor_z': 'road_rank',
    'dor_k': 'road_category', # not equal to road_rank
    'km': 'road_km',
    'm': 'road_m',
    'ndu': 'road_defects',
    's_pch': 'road_surface',
    'CHOM': 'traffic_changes',
    'OBJ_DTP': 'adj_objects',
    'sdor': 'site_objects',

    'factor': 'cause_factors',
    'spog': 'weather',
    'osv': 'lighting'
}

df = df.rename(columns=new_column_names)

df.to_csv('data/DTP_DATA_2025.csv')

Processing all files: 100%|██████████| 10200/10200 [49:57<00:00,  3.40it/s]    


Number of dropped outliers: 11193

Number of observations: 1475674
Number of variables: 59
