# 1. DrugBank 数据集处理
## 1.1 数据集介绍
## 1.2 数据集解析

In [37]:
import xmltodict

# 忽略命名空间
namespaces = {
    'http://www.drugbank.ca': None
}

with open('data/drugbank.xml', 'r', encoding="utf-8") as file:
    xml_data = file.read()

# XML转换为dict
xml_dict = xmltodict.parse(xml_data, process_namespaces=True, namespaces=namespaces)

## 1.3 数据集切片测试

In [38]:
import copy

drugs: list[dict] = xml_dict.get('drugbank').get('drug')
# drug_test= drugs[15001:]
drug_test= drugs
drug_source = copy.deepcopy(drug_test)

## 1.4 数据集基础信息处理

In [39]:
import re

def potential_str_to_list(drug: dict, val: str|list, property_str: str) -> None:
    """Convert the potential str to a list and assign it to the property."""
    drug[property_str] = [val] if type(val) == str else val

def process_drug(drug: dict) -> None:
    """Process the drug dictionary."""
    process_meta(drug)
    process_without_old_ids(drug)
    process_list(drug)

def process_meta(drug: dict) -> None:
    """Process the drug metadata and format"""
    drug['type'] = drug.get('@type')
    del drug['@type']
    drug['created'] = drug.get('@created')
    del drug['@created']
    drug['updated'] = drug.get('@updated')
    del drug['@updated']
    # 补足缺失的属性
    if 'state' not in drug:
        drug['state'] = None
    
def process_with_old_ids(drug: dict) -> None:
    """Process the list of IDs into a dictionary with the ID type as the key."""
    ids = drug.get('drugbank-id')
    if type(ids) == dict:
        drug['id'] = ids.get('#text')
    else:
        for drugbank_id in ids:
            if type(drugbank_id) != str:
                drug['id'] = drugbank_id.get('#text')
                continue
            column = re.match(r'([A-Za-z]+)(\d+)', drugbank_id).group(1).lower()+'_id'
            drug[column] = drugbank_id
    del drug['drugbank-id']

def process_without_old_ids(drug: dict) -> None:
    """Process the list of IDs into a dictionary with the ID type as the key."""
    ids = drug.get('drugbank-id')
    if type(ids) == dict:
        drug['id'] = ids.get('#text')
    else:
        for drugbank_id in ids:
            if type(drugbank_id) != str:
                drug['id'] = drugbank_id.get('#text')
                break
    del drug['drugbank-id']

def process_list(drug: dict) -> None:
    """Destructure the list of single property into a list of strings."""
    
    # groups
    groups = drug.get('groups').get('group')
    potential_str_to_list(drug, groups, 'groups')
    # affected-organisms
    organism = drug.get('affected-organisms')
    if organism:
        organisms = drug.get('affected-organisms').get('affected-organism')
        potential_str_to_list(drug, organisms, 'affected-organisms')
    # food-interactions
    food_interactions = drug.get('food-interactions')
    if food_interactions:
        food_interaction = food_interactions.get('food-interaction')
        potential_str_to_list(drug, food_interaction, 'food-interactions')

## 1.5 数据集部分属性独立处理

In [40]:
from typing import Dict, List, Set


def iterate_references_to_module(drug: dict, reference_set: Set[str], rel_list: List[dict]) -> None:
    """Destructure the list of references into an independent module."""
    references_temp: Set[str] = set()
    relationship_temp: List[dict] = []
    origin_references = drug.get('general-references')
    articles: Dict[str,list]|None = origin_references.get('articles')
    textbooks: Dict[str,list]|None = origin_references.get('textbooks')
    links: Dict[str,list]|None = origin_references.get('links')
    attachments: Dict[str,list]|None = origin_references.get('attachments')
    drug_id = drug.get('id')
    if articles:
        val = articles.get('article')
        if type(val) != list:
            val = [val]
        for article in val:
            relationship_temp.append(
                {'drug_id': drug_id,
                 'ref_id': article.get('ref-id'),
                 'citation': article.get('citation')
                 })
            references_temp.add(json.dumps(
                {'ref_id': article.get('ref-id'),
                 'type': 'article',
                 'core_sign': article.get('pubmed-id')
                 }))
    if textbooks:
        val = textbooks.get('textbook')
        if type(val) != list:
            val = [val]
        for textbook in val:
            relationship_temp.append(
                {'drug_id': drug_id,
                 'ref_id': textbook.get('ref-id'),
                 'citation': textbook.get('citation')})
            references_temp.add(json.dumps(
                {'ref_id': textbook.get('ref-id'),
                 'type': 'textbook',
                 'core_sign': textbook.get('isbn')
                 }))
    if links:
        val = links.get('link')
        if type(val) != list:
            val = [val]
        for link in val:
            relationship_temp.append(
                {'drug_id': drug_id,
                 'ref_id': link.get('ref-id'),
                 'citation': link.get('url')})
            references_temp.add(json.dumps(
                {'ref_id': link.get('ref-id'),
                 'type': 'link',
                 'core_sign': link.get('title')
                 }))
    if attachments:
        val = attachments.get('attachment')
        if type(val) != list:
            val = [val]
        for attachment in val:
            relationship_temp.append(
                {'drug_id': drug_id,
                 'ref_id': attachment.get('ref-id'),
                 'citation': attachment.get('url')})
            references_temp.add(json.dumps(
                {'ref_id': attachment.get('ref-id'),
                 'type': 'attachment',
                 'core_sign': attachment.get('title')
                 }))
    reference_set.update(references_temp)
    rel_list.extend(relationship_temp)
    del drug['general-references']

# 合并packager与manufacturer属性，generic作为连接属性
def iterate_source_to_module(drug: dict, source_set: Set[str], packager_rel_list: List[dict], manufacturer_rel_list: List[dict]) -> None:
    """Process the packagers and manufacturers of the drug"""
    sources_temp: Set[str] = set()
    packager_rel_temp: List[dict] = []
    manufacturer_rel_temp: List[dict] = []
    drug_id = drug.get('id')
    packagers = drug.get('packagers')
    manufacturers = drug.get('manufacturers')
    if packagers:
        val = packagers.get('packager')
        if type(val) != list:
            val = [val]
        for packager in val:
            sources_temp.add(json.dumps({
                'name': packager.get('name'),
                'url': packager.get('url')
            }))
            packager_rel_temp.append({
                'drug_id': drug_id,
                'generic': None,
                'name': packager.get('name'),
            })
    if manufacturers:
        val = manufacturers.get('manufacturer')
        if type(val) != list:
            val = [val]
        for manufacturer in val:
            sources_temp.add(json.dumps({
                'name': manufacturer.get('#text'),
                'url': manufacturer.get('@url')
            }))
            manufacturer_rel_temp.append({
                'drug_id': drug_id,
                'generic': 1 if manufacturer.get('@generic')=='true' else 0,
                'name': manufacturer.get('#text'),
            })
    source_set.update(sources_temp)
    packager_rel_list.extend(packager_rel_temp)
    manufacturer_rel_list.extend(manufacturer_rel_temp)
    del drug['packagers']
    del drug['manufacturers']

def iterate_price_to_module(drug: dict, price_list: List[dict], rel_list: List[dict], id_list: List[int]) -> None:
    prices_temp: List[dict] = []
    rel_temp: List[dict] = []
    drug_id = drug.get('id')
    drug_prices = drug.get('prices')
    if drug_prices:
        val = drug_prices.get('price')
        if type(val) != list:
            val = [val]
        for drug_price in val:
            price_id = id_list[0]
            cost = drug_price.get('cost')
            prices_temp.append({
                'id': price_id,
                'cost': cost.get('#text'),
                'currency': cost.get('@currency'),
                'unit': drug_price.get('unit'),
                'description': drug_price.get('description')
            })
            rel_temp.append({
                'drug_id': drug_id,
                'price_id': price_id
            })
            id_list[0] += 1
    price_list.extend(prices_temp)
    rel_list.extend(rel_temp)
    del drug['prices']

# 根据ndc-product-code，dpd-id和ema-ma-number三合一做唯一ID
def iterate_product_to_module(drug:dict, product_set: Set[str], rel_list: List[dict]) -> None:
    products_temp: Set[str] = set()
    rel_temp: List[dict] = []
    drug_id = drug.get('id')
    drug_products = drug.get('products')
    if drug_products:
        val: List[dict]|dict = drug_products.get('product')
        if type(val) != list:
            val = [val]
        for product in val:
            temp = product.copy()
            temp['id'] = (product.get('ndc-product-code') or 
                          product.get('dpd-id') or
                          product.get('ema-ma-number'))
            products_temp.add(json.dumps(temp))
            rel_temp.append({
                'drug_id': drug_id,
                'product_id': temp['id']
            })
    product_set.update(products_temp)
    rel_list.extend(rel_temp)
    del drug['products']

# 副作用可以设置成两个drug之间的联系，联系属性为副作用的description
def iterate_drug_interaction_to_module(drug: dict, interaction_list: List[dict]) -> None:
    interactions_temp: List[dict] = []
    drug_id = drug.get('id')
    drug_interactions = drug.get('drug-interactions')
    if drug_interactions:
        val = drug_interactions.get('drug-interaction')
        if type(val) != list:
            val = [val]
        for interaction in val:
            interactions_temp.append({
                'drug_id': drug_id,
                'interaction_drug_id': interaction.get('drugbank-id'),
                'description': interaction.get('description')
            })
    interaction_list.extend(interactions_temp)
    del drug['drug-interactions']

# atc-code作为主键
def iterate_atc_code_to_module(drug: dict, atc_code_set: Set[str], rel_list: List[dict]) -> None:
    atc_codes_temp: Set[str] = set()
    rel_temp: List[dict] = []
    drug_id = drug.get('id')
    drug_atc_codes = drug.get('atc-codes')
    if drug_atc_codes:
        val = drug_atc_codes.get('atc-code')
        if type(val) != list:
            val = [val]
        for atc_code in val:
            level: List[dict] = atc_code.get('level')
            atc_codes_temp.add(json.dumps({
                'code': atc_code.get('@code'),
                'first_level': level[3].get('@code'),
                'first_level_description': level[3].get('#text'),
                'second_level': level[2].get('@code'),
                'second_level_description': level[2].get('#text'),
                'third_level': level[1].get('@code'),
                'third_level_description': level[1].get('#text'),
                'fourth_level': level[0].get('@code'),
                'fourth_level_description': level[0].get('#text')
            }))
            rel_temp.append({
                'drug_id': drug_id,
                'atc_code': atc_code.get('@code')
            })
    atc_code_set.update(atc_codes_temp)
    rel_list.extend(rel_temp)
    del drug['atc-codes']

# 会出现同一个name不同的company
def iterate_brand_to_module(drug: dict, brand_list: List[dict],rel_list: List[dict], id_list: List[int]) -> None:
    brands_temp: List[dict] = []
    rel_temp: List[dict] = []
    drug_id = drug.get('id')
    drug_international_brands = drug.get('international-brands')
    if drug_international_brands:
        val = drug_international_brands.get('international-brand')
        if type(val) != list:
            val = [val]
        for brand in val:
            brand_id = id_list[0]
            brands_temp.append({
                'id': brand_id,
                'name': brand.get('name'),
                'company': brand.get('company')
            })
            rel_temp.append({
                'drug_id': drug_id,
                'brand_id': brand_id,
            })
            id_list[0] += 1
    brand_list.extend(brands_temp)
    rel_list.extend(rel_temp)
    del drug['international-brands']
    
# name属性作为主键，注意需要区分大小写
def iterate_synonym_to_module(drug: dict, synonym_set: Set[str], rel_list: List[dict]) -> None:
    synonyms_temp: Set[str] = set()
    rel_temp: List[dict] = []
    drug_id = drug.get('id')
    drug_synonyms = drug.get('synonyms')
    if drug_synonyms:
        val = drug_synonyms.get('synonym')
        if type(val) != list:
            val = [val]
        for synonym in val:
            synonyms_temp.add(json.dumps({
                'name': synonym.get('#text'),
                'language': synonym.get('@language'),
                'coder': synonym.get('@coder')
            }))
            rel_temp.append({
                'drug_id': drug_id,
                'name': synonym.get('#text')
            })
    synonym_set.update(synonyms_temp)
    rel_list.extend(rel_temp)
    del drug['synonyms']

# 使用name作为主键
def iterate_category_to_module(drug: dict, category_set: Set[str], rel_list: List[dict]) -> None:
    categories_temp: Set[str] = set()
    rel_temp: List[dict] = []
    drug_id = drug.get('id')
    drug_categories = drug.get('categories')
    if drug_categories:
        val = drug_categories.get('category')
        if type(val) != list:
            val = [val]
        for category in val:
            categories_temp.add(json.dumps({
                'category': category.get('category'),
                'mesh_id': category.get('mesh-id')
            }))
            rel_temp.append({
                'drug_id': drug_id,
                'category': category.get('category')
            })
    category_set.update(categories_temp)
    rel_list.extend(rel_temp)
    del drug['categories']

# TODO 自己设置一个id作为主键，记得去重
def iterate_classification_to_module(drug: dict, classification_list: List[dict], alternative_parent_list: List[dict], substituent_list: List[dict]) -> None:
    classifications_temp: List[dict] = []
    substituents_temp: List[dict] = []
    alternative_parents_temp: List[dict] = []
    drug_id = drug.get('id')
    drug_classification: dict|None = drug.get('classification')
    if not drug_classification:
        return
    temp = copy.deepcopy(drug_classification)
    temp.pop('substituent', None)
    temp.pop('alternative-parent', None)
    temp['drug_id'] = drug_id
    classifications_temp.append(temp)
    drug_substituent = drug_classification.get('substituent')
    drug_alternative_parent = drug_classification.get('alternative-parent')
    if drug_substituent:
        for substituent in drug_substituent:
            substituents_temp.append({
                'drug_id': drug_id,
                'substituent': substituent
            })
    if drug_alternative_parent:
        for alternative_parent in drug_alternative_parent:
            alternative_parents_temp.append({
                'drug_id': drug_id,
                'alternative_parent': alternative_parent
            })
    classification_list.extend(classifications_temp)
    substituent_list.extend(substituents_temp)
    alternative_parent_list.extend(alternative_parents_temp)
    del drug['classification']

## 1.6 数据集属性分离(基础信息与学术性信息)

In [41]:
from typing import Tuple


def copy_dict(source:dict, target:dict, property_str: str) -> None:
    target[property_str] = source.get(property_str) if property_str in source else None

def separate_academic(drug: dict) -> Tuple[dict,dict]:
    academic = {}
    academic_properties = ['cas-number','unii','average-mass','monoisotopic-mass',
                           'synthesis-reference','pharmacodynamics',
                           'mechanism-of-action','toxicity','metabolism',
                           'absorption','half-life','protein-binding',
                           'route-of-elimination','volume-of-distribution',
                           'clearance','ahfs-codes','pdb-entries','transporters',
                           'carriers','enzymes','targets',
                           'snp-adverse-drug-reactions','snp-effects',
                           'reactions','pathways','external-links',
                           'external-identifiers','experimental-properties',
                           'calculated-properties','msds','salts','mixtures',
                           'dosages','fda-label','patents','sequences']
    
    for academic_property in academic_properties:
        copy_dict(drug,academic,academic_property)
        if academic_property in drug:
            del drug[academic_property]

    copy_dict(drug,academic,'name')
    copy_dict(drug,academic,'id')
    copy_dict(drug,academic,'type')

    return drug,academic

## 1.7 数据集属性输出(csv 或 json)

In [42]:
import csv
import json
def module_to_csv(file_path: str, data: List[dict]):
    if not data:
        print('data is empty, file_path: ',file_path)
        return
    with open(file_path,'w',newline='',encoding='utf-8') as f:
        writer = csv.DictWriter(f,fieldnames=list(data[0].keys()))
        writer.writeheader()
        for row in data:
            for key, value in row.items():
                # 处理列表属性，将列表转换为以分号分隔的字符串
                if isinstance(value, list):
                    row[key] = ';'.join(value)
                # 处理字典属性，将字典转换为 JSON 字符串
                if isinstance(value, dict):
                    row[key] = json.dumps(value, ensure_ascii=False).replace(',','|')
            writer.writerow(row)
def module_to_json(file_path: str, data: List[dict]):
    with open(file_path, 'w', encoding='utf-8') as f:
        json.dump(data, f, ensure_ascii=False, indent=4)

## 1.8 数据集处理结果测试

In [43]:
references: Set[str] = set()
references_rel: List[dict] = []

sources: Set[str] = set()
packager_rel : List[dict] = []
manufacturer_rel : List[dict] = []

prices: List[dict] = []
price_rel: List[dict] = []

products: Set[str] = set()
product_rel: List[dict] = []

interactions: List[dict] = []

atc_codes: Set[str] = set()
atc_code_rel: List[dict] = []

brands: List[dict] = []
brand_rel: List[dict] = []

synonyms: Set[str] = set()
synonym_rel: List[dict] = []

categories: Set[str] = set()
category_rel: List[dict] = []

classifications: List[dict] = []
substituents: List[dict] = []
alternative_parents: List[dict] = []

drug_basic: List[dict] = []
drug_academic: List[dict] = []

prices_id = [1]
brands_id = [1]

def json_to_dict(str_data: Set[str]) -> List[dict]:
    """Convert the list of dict str to a list of dict."""
    return [json.loads(data) for data in str_data]

for drug_item in drug_test:
    process_drug(drug_item)
    iterate_references_to_module(drug_item,references,references_rel)
    iterate_source_to_module(drug_item,sources,packager_rel,manufacturer_rel)
    iterate_price_to_module(drug_item,prices,price_rel,prices_id)
    iterate_product_to_module(drug_item,products,product_rel)
    iterate_drug_interaction_to_module(drug_item,interactions)
    iterate_atc_code_to_module(drug_item,atc_codes,atc_code_rel)
    iterate_brand_to_module(drug_item,brands,brand_rel,brands_id)
    iterate_synonym_to_module(drug_item,synonyms,synonym_rel)
    iterate_category_to_module(drug_item,categories,category_rel)
    iterate_classification_to_module(drug_item,classifications,alternative_parents,substituents)
    basic_drug, academic_drug = separate_academic(drug_item)
    drug_basic.append(basic_drug)
    drug_academic.append(academic_drug)


module_to_csv('data/processed/references.csv',json_to_dict(references))
module_to_csv('data/processed/references_rel.csv',references_rel)
module_to_csv('data/processed/sources.csv',json_to_dict(sources))
module_to_csv('data/processed/packager_rel.csv',packager_rel)
module_to_csv('data/processed/manufacturer_rel.csv',manufacturer_rel)
module_to_csv('data/processed/prices.csv',prices)
module_to_csv('data/processed/price_rel.csv',price_rel)
module_to_csv('data/processed/products.csv',json_to_dict(products))
module_to_csv('data/processed/product_rel.csv',product_rel)
module_to_csv('data/processed/interactions.csv',interactions)
module_to_csv('data/processed/atc_codes.csv',json_to_dict(atc_codes))
module_to_csv('data/processed/atc_code_rel.csv',atc_code_rel)
module_to_csv('data/processed/brands.csv',brands)
module_to_csv('data/processed/brand_rel.csv',brand_rel)
module_to_csv('data/processed/synonyms.csv',json_to_dict(synonyms))
module_to_csv('data/processed/synonym_rel.csv',synonym_rel)
module_to_csv('data/processed/categories.csv',json_to_dict(categories))
module_to_csv('data/processed/category_rel.csv',category_rel)

# module_to_csv('data/processed/classifications.csv',classifications)
# module_to_csv('data/processed/substituents.csv',substituents)
# module_to_csv('data/processed/alternative_parents.csv',alternative_parents)
module_to_csv('data/processed/drug_basic.csv',drug_basic)
# TODO 暂无法有效处理 drug_academic 中的dict属性
# module_to_csv('data/processed/drug_academic.csv',drug_academic)