# WebScrapping ATC/DDD data from WHO Collaborating Centre for Drug Statistics

In [3]:
import importlib
import subprocess
import sys

# Function to check if a library is installed, and install it if not
def install_and_import(package):
    try:
        importlib.import_module(package)
    except ImportError:
        subprocess.check_call([sys.executable, "-m", "pip", "install", package])
    finally:
        globals()[package] = importlib.import_module(package)

# List of required packages
required_packages = ['os', 'requests', 'pandas', 'bs4', 'datetime', 'lxml']

# Install and import required packages
for package in required_packages:
    install_and_import(package)

import os
import requests
import pandas as pd
from bs4 import BeautifulSoup
from datetime import datetime

# Ensure directory exists
def create_directory_if_not_exists(*paths):
    dir_path = os.path.join(*paths)
    if not os.path.exists(dir_path):
        os.makedirs(dir_path)
    return dir_path

# Create output directories if they do not exist
output_directory = create_directory_if_not_exists('output')
cache_directory = create_directory_if_not_exists(output_directory, 'cache')

# Function to wrap RDS equivalent in Python
def cache_or_generate_data(var_name, func, *args, **kwargs):
    cache_file = os.path.join(cache_directory, f'{var_name}.pkl')
    if os.path.exists(cache_file):
        print(f"Reading '{var_name}' from file '{cache_file}'... ")
        var_val = pd.read_pickle(cache_file)
    else:
        print(f'Building {var_name}.')
        var_val = func(*args, **kwargs)
        print(f"{var_name} completed. Saving to file '{cache_file}'... ")
        var_val.to_pickle(cache_file)
    return var_val

# Function to get RDS equivalent in Python
def load_cached_data(var_name):
    cache_file = os.path.join(cache_directory, f'{var_name}.pkl')
    if os.path.exists(cache_file):
        print(f"Reading '{var_name}' from file '{cache_file}'... ")
        return pd.read_pickle(cache_file)
    else:
        raise FileNotFoundError(f'Unable to find file {cache_file}.')

# Scrape data from WHO ATC website
def scrape_atc_data(root_atc_code):
    if not isinstance(root_atc_code, str) or len(root_atc_code) != 1:
        raise ValueError('scrape_atc_data() only accepts single objects, not vectors. Please provide a single valid ATC code as input.')
    
    web_address = f'https://www.whocc.no/atc_ddd_index/?code={root_atc_code}&showdescription=no'
    print(f'Scraping {web_address}.')
    atc_code_length = len(root_atc_code)
    response = requests.get(web_address)
    html_data = BeautifulSoup(response.content, 'html.parser')
    
    if atc_code_length < 5:
        scraped_strings = html_data.select_one("#content > p:nth-of-type(2n)").get_text().split('\n')
        scraped_strings = list(filter(len, scraped_strings))
        
        if not scraped_strings:
            return None
        
        tval = pd.concat([pd.concat([pd.DataFrame({'atc_code': [s.split()[0]], 'atc_name': [' '.join(s.split()[1:])]}), scrape_atc_data(s.split()[0])], ignore_index=True) for s in scraped_strings], ignore_index=True)
        
        if atc_code_length == 1:
            root_atc_code_name = html_data.select("#content a")[2].get_text()
            return pd.concat([pd.DataFrame({'atc_code': [root_atc_code], 'atc_name': [root_atc_code_name]}), tval], ignore_index=True)
        else:
            return tval
    else:
        table = html_data.select_one("ul > table")
        if table is None:
            return None
        
        df = pd.read_html(str(table), header=0)[0]
        df.columns = ['atc_code', 'atc_name', 'ddd', 'uom', 'adm_r', 'note']
        df = df.applymap(lambda x: None if x == '' else x)
        
        for i in range(1, len(df)):
            if pd.isna(df.atc_code[i]):
                df.atc_code[i] = df.atc_code[i-1]
                df.atc_name[i] = df.atc_name[i-1]
        
        return df

# List of root ATC codes to scrape
atc_root_codes = ['A', 'B', 'C', 'D', 'G', 'H', 'J', 'L', 'M', 'N', 'P', 'R', 'S', 'V']

# Process each ATC root code individually
for atc_root in atc_root_codes:
    print(f'Processing ATC root: {atc_root}')
    cache_or_generate_data(f'who_atc_{atc_root}', scrape_atc_data, atc_root)

# Read the files produced by scrape_atc_data()
combined_atc_data = pd.concat([load_cached_data(f'who_atc_{atc_root}') for atc_root in atc_root_codes if load_cached_data(f'who_atc_{atc_root}') is not None], ignore_index=True)

# Write them to a CSV file. Generate file name from current date in year-month-day format.
output_file_name = os.path.join(output_directory, f'WHO ATC-DDD {datetime.now().strftime("%Y-%m-%d")}.csv')
print(f'Writing results to {output_file_name}.')
if os.path.exists(output_file_name):
    print('Warning: file already exists. Will be overwritten.')
combined_atc_data.to_csv(output_file_name, index=False)

# Finish execution
print('Script execution completed.')


Processing ATC root: A
Building who_atc_A.
Scraping https://www.whocc.no/atc_ddd_index/?code=A&showdescription=no.


ValueError: scrape_atc_data() only accepts single objects, not vectors. Please provide a single valid ATC code as input.

In [None]:
import os
import requests
import pandas as pd
from bs4 import BeautifulSoup
from datetime import datetime

# Ensure directory exists
def create_directory_if_not_exists(*paths):
    dir_path = os.path.join(*paths)
    if not os.path.exists(dir_path):
        os.makedirs(dir_path)
    return dir_path

# Create output directories if they do not exist
output_directory = create_directory_if_not_exists('output')
cache_directory = create_directory_if_not_exists(output_directory, 'cache')

# Function to wrap RDS equivalent in Python
def cache_or_generate_data(var_name, func, *args, **kwargs):
    cache_file = os.path.join(cache_directory, f'{var_name}.pkl')
    if os.path.exists(cache_file):
        print(f"Reading '{var_name}' from file '{cache_file}'... ")
        var_val = pd.read_pickle(cache_file)
    else:
        print(f'Building {var_name}.')
        var_val = func(*args, **kwargs)
        print(f"{var_name} completed. Saving to file '{cache_file}'... ")
        var_val.to_pickle(cache_file)
    return var_val

# Function to get RDS equivalent in Python
def load_cached_data(var_name):
    cache_file = os.path.join(cache_directory, f'{var_name}.pkl')
    if os.path.exists(cache_file):
        print(f"Reading '{var_name}' from file '{cache_file}'... ")
        return pd.read_pickle(cache_file)
    else:
        raise FileNotFoundError(f'Unable to find file {cache_file}.')

# Scrape data from WHO ATC website
def scrape_atc_data(root_atc_code):
    if not isinstance(root_atc_code, str) or len(root_atc_code) != 1:
        raise ValueError('scrape_atc_data() only accepts single objects, not vectors. Please provide a single valid ATC code as input.')
    
    web_address = f'https://www.whocc.no/atc_ddd_index/?code={root_atc_code}&showdescription=no'
    print(f'Scraping {web_address}.')
    atc_code_length = len(root_atc_code)
    response = requests.get(web_address)
    html_data = BeautifulSoup(response.content, 'html.parser')
    
    if atc_code_length < 5:
        scraped_strings = html_data.select_one("#content > p:nth-of-type(2n)").get_text().split('\n')
        scraped_strings = list(filter(len, scraped_strings))
        
        if not scraped_strings:
            return None
        
        tval = pd.concat([pd.concat([pd.DataFrame({'atc_code': [s.split()[0]], 'atc_name': [' '.join(s.split()[1:])]}), scrape_atc_data(s.split()[0])], ignore_index=True) for s in scraped_strings], ignore_index=True)
        
        if atc_code_length == 1:
            root_atc_code_name = html_data.select("#content a")[2].get_text()
            return pd.concat([pd.DataFrame({'atc_code': [root_atc_code], 'atc_name': [root_atc_code_name]}), tval], ignore_index=True)
        else:
            return tval
    else:
        table = html_data.select_one("ul > table")
        if table is None:
            return None
        
        df = pd.read_html(str(table), header=0)[0]
        df.columns = ['atc_code', 'atc_name', 'ddd', 'uom', 'adm_r', 'note']
        df = df.applymap(lambda x: None if x == '' else x)
        
        for i in range(1, len(df)):
            if pd.isna(df.atc_code[i]):
                df.atc_code[i] = df.atc_code[i-1]
                df.atc_name[i] = df.atc_name[i-1]
        
        return df

# List of root ATC codes to scrape
atc_root_codes = ['A', 'B', 'C', 'D', 'G', 'H', 'J', 'L', 'M', 'N', 'P', 'R', 'S', 'V']

# Process each ATC root code individually
for atc_root in atc_root_codes:
    print(f'Processing ATC root: {atc_root}')
    cache_or_generate_data(f'who_atc_{atc_root}', scrape_atc_data, str(atc_root))

# Read the files produced by scrape_atc_data()
combined_atc_data = pd.concat([load_cached_data(f'who_atc_{atc_root}') for atc_root in atc_root_codes if load_cached_data(f'who_atc_{atc_root}') is not None], ignore_index=True)

# Write them to a CSV file. Generate file name from current date in year-month-day format.
output_file_name = os.path.join(output_directory, f'WHO ATC-DDD {datetime.now().strftime("%Y-%m-%d")}.csv')
print(f'Writing results to {output_file_name}.')
if os.path.exists(output_file_name):
    print('Warning: file already exists. Will be overwritten.')
combined_atc_data.to_csv(output_file_name, index=False)

# Finish execution
print('Script execution completed.')

In [None]:
import os
import requests
import pandas as pd
from bs4 import BeautifulSoup
from datetime import datetime

# Ensure directory exists
def create_directory_if_not_exists(*paths):
    dir_path = os.path.join(*paths)
    if not os.path.exists(dir_path):
        os.makedirs(dir_path)
    return dir_path

# Create output directories if they do not exist
output_directory = create_directory_if_not_exists('output')
cache_directory = create_directory_if_not_exists(output_directory, 'cache')

# Function to wrap RDS equivalent in Python
def cache_or_generate_data(var_name, func, *args, **kwargs):
    cache_file = os.path.join(cache_directory, f'{var_name}.pkl')
    if os.path.exists(cache_file):
        print(f"Reading '{var_name}' from file '{cache_file}'... ")
        var_val = pd.read_pickle(cache_file)
    else:
        print(f'Building {var_name}.')
        var_val = func(*args, **kwargs)
        print(f"{var_name} completed. Saving to file '{cache_file}'... ")
        var_val.to_pickle(cache_file)
    return var_val

# Function to get RDS equivalent in Python
def load_cached_data(var_name):
    cache_file = os.path.join(cache_directory, f'{var_name}.pkl')
    if os.path.exists(cache_file):
        print(f"Reading '{var_name}' from file '{cache_file}'... ")
        return pd.read_pickle(cache_file)
    else:
        raise FileNotFoundError(f'Unable to find file {cache_file}.')

# Scrape data from WHO ATC website
def scrape_atc_data(root_atc_code):
    if not isinstance(root_atc_code, str) or len(root_atc_code) != 1:
        raise ValueError('scrape_atc_data() only accepts single objects, not vectors. Please provide a single valid ATC code as input.')
    
    web_address = f'https://www.whocc.no/atc_ddd_index/?code={root_atc_code}&showdescription=no'
    print(f'Scraping {web_address}.')
    atc_code_length = len(root_atc_code)
    response = requests.get(web_address)
    html_data = BeautifulSoup(response.content, 'html.parser')
    
    if atc_code_length < 5:
        scraped_strings = html_data.select_one("#content > p:nth-of-type(2n)").get_text().split('\n')
        scraped_strings = list(filter(len, scraped_strings))
        
        if not scraped_strings:
            return None
        
        tval = pd.concat([pd.concat([pd.DataFrame({'atc_code': [s.split()[0]], 'atc_name': [' '.join(s.split()[1:])]}), scrape_atc_data(s.split()[0])], ignore_index=True) for s in scraped_strings], ignore_index=True)
        
        if atc_code_length == 1:
            root_atc_code_name = html_data.select("#content a")[2].get_text()
            return pd.concat([pd.DataFrame({'atc_code': [root_atc_code], 'atc_name': [root_atc_code_name]}), tval], ignore_index=True)
        else:
            return tval
    else:
        table = html_data.select_one("ul > table")
        if table is None:
            return None
        
        df = pd.read_html(str(table), header=0)[0]
        df.columns = ['atc_code', 'atc_name', 'ddd', 'uom', 'adm_r', 'note']
        df = df.applymap(lambda x: None if x == '' else x)
        
        for i in range(1, len(df)):
            if pd.isna(df.atc_code[i]):
                df.atc_code[i] = df.atc_code[i-1]
                df.atc_name[i] = df.atc_name[i-1]
        
        return df

# List of root ATC codes to scrape
atc_root_codes = ['A', 'B', 'C', 'D', 'G', 'H', 'J', 'L', 'M', 'N', 'P', 'R', 'S', 'V']

# Process each ATC root code individually
for atc_root in atc_root_codes:
    print(f'Processing ATC root: {atc_root}')
    cache_or_generate_data(f'who_atc_{atc_root}', scrape_atc_data, str(atc_root))

# Read the files produced by scrape_atc_data()
combined_atc_data = pd.concat([load_cached_data(f'who_atc_{atc_root}') for atc_root in atc_root_codes if load_cached_data(f'who_atc_{atc_root}') is not None], ignore_index=True)

# Write them to a CSV file. Generate file name from current date in year-month-day format.
output_file_name = os.path.join(output_directory, f'WHO ATC-DDD {datetime.now().strftime("%Y-%m-%d")}.csv')
print(f'Writing results to {output_file_name}.')
if os.path.exists(output_file_name):
    print('Warning: file already exists. Will be overwritten.')
combined_atc_data.to_csv(output_file_name, index=False)

# Finish execution
print('Script execution completed.')