## covid-19-au-health-aged-care.ipynb

Downloads pdf and Word docx files from the sub-pages below a targetted page into a local directory (datadir). Only downloads if local file does not exist.  
Reads the text from each local pdf file, extracting key fields. 
Reads the local Word docx files (dated after a specified date), extracting all table data.

Writes the collected data out as an Excel file:
- sheet: Treatments - data on treatments scraped from pdf files
- sheet: National Snapshot - data from 1st table in Word docx files
- sheet: Active Outbreak Summary - data from 2nd table in Word docx files
- sheet: Workforce Resources - data from 3rd table in Word docx files
- sheet: Vaccinations - data from 4th table in Word docx files
- sheet: Regulatory Activities - data from 5th table in Word docx files
- sheet: Active Outbreaks - data from 6th table in Word docx files

In all sheets, the source_file_name and source_file_date (derived from the file name) are added as columns. Rows are sorted by source_file_date (descending).

In the sheets sourced from Word docx tables, the column headers are not promoted so the columns are numbered from 0 instead. This is to avoid a sparse table if column headings change among files.  The column headers will appear repeated for each source file, which any downstream analysis can filter out.

In [1]:

from bs4 import BeautifulSoup
import datetime
import dateutil # pip install python-dateutil
from docx import Document
import pymupdf
import os
import pandas
import random
import re
import requests
import time

site = "https://www.health.gov.au"
main_url = "https://www.health.gov.au/resources/collections/covid-19-outbreaks-in-australian-residential-aged-care-facilities"
links_to_check = ["2025"]

datadir = 'c:/dev/covid-19-au-vaccinations/health-aged-care/'
output_filename = datadir + "health-aged-care.xlsx"

In [2]:
def download_pdf(pdf_url, local_dir):
    # Get the filename from the URL
    filename = os.path.basename(pdf_url)
    local_path = os.path.join(local_dir, filename)

    # Check if the file already exists in the local directory
    if not os.path.exists(local_path):
        # sleep for a random time before downloading 
        time.sleep(2 + ( random.randrange( 0, 30) / 10 ) )
        # Download the PDF file
        response = requests.get(pdf_url)

        # Save the PDF file to the local directory
        with open(local_path, "wb") as file:
            file.write(response.content)
        print(f"Downloaded {filename} to {local_dir}")
    else:
        print(f"{filename} already exists in {local_dir}")


In [3]:
def extract_data_from_pdf(pdf_file):

# open a pdf file, search for the key fields and return them
    
    with pymupdf.open(pdf_file) as doc:  # open document
        text = chr(12).join([page.get_text() for page in doc])

        lagevrio_treatment_courses = 0
        lagevrio_prescriptions = 0
        paxlovid_prescriptions = 0
        end_date = ''

        # search for: and up to DD MMMM YYYY (allowing for extra spaces around the month)
        pattern = r'Lagevrio.*?up\s+to\s+(\d+)(.*?)(\d{4})'
        match = re.search(pattern, text, re.DOTALL)
        if match:
            # assign result, removing excess whitespace around the month portion
            end_date = match.group(1) + ' ' + match.group(2) + ' ' + match.group(3)
            # print(f"DEBUG: End dates: {end_date}")

        # search for: deployed NNN treatment courses of Lagevrio
        pattern = r"deployed\s*(\d+(?:,\d+)?)\s*treatment\s+courses\s+of\s+Lagevrio"
        match = re.search(pattern, text, re.DOTALL)
        if match:
            # assign result
            lagevrio_treatment_courses = match.group(1)
            # print(f"DEBUG: Text: {text}")
            # print(f"DEBUG: Lagevrio treatment courses: {lagevrio_treatment_courses}")

        # search for: NNN prescriptions for Lagevrio
        pattern = r"(\d+(?:,\d+)?)\s*prescriptions\s+for\s+Lagevrio"
        match = re.search(pattern, text, re.DOTALL)
        if match:
            # assign result
            lagevrio_prescriptions = match.group(1)
            # print(f"DEBUG: Lagevrio prescriptions: {lagevrio_prescriptions}")

        # search for: NNN prescriptions for Paxlovid 
        pattern = r"further\s*(\d+(?:,\d+)?)\s*prescriptions\s+for\s+Paxlovid"
        match = re.search(pattern, text, re.DOTALL)
        if match:
            # assign result 
            paxlovid_prescriptions = match.group(1)
            # print(f"DEBUG: Paxlovid prescriptions: {paxlovid_prescriptions}")

        return end_date, lagevrio_treatment_courses, lagevrio_prescriptions, paxlovid_prescriptions

#### Get the main source page, make a list of links to check

In [None]:
response = requests.get(main_url)
soup = BeautifulSoup(response.content, "html.parser")

# Find and extract the links with text (wrapped in a span tag) containing with "COVID-19 outbreaks in Australian residential aged care facilities"
soup_page_select = "a:has(span:-soup-contains('COVID-19 outbreaks in Australian residential aged care'))"
links_year_pages = [link.get("href") for link in soup.select(soup_page_select)]
len ( links_year_pages ) 

#### Get each sub-page and download it's pdf and Word docx files 

In [None]:
# browse through the list of year pages. See if the link URL partially matches the links_to_check list (recent years).
link_files_to_download = ['PDF', 'Word']
for each_link_year_page_candidate in links_year_pages:
    each_link_year_page_list = [each_link_year_page_candidate for sub_string in links_to_check if(sub_string in each_link_year_page_candidate)]
    if len(each_link_year_page_list) > 0:
        # sleep for a random time before getting the sub-page 
        time.sleep(1 + ( random.randrange( 0, 20) / 10 ) )
        # get and process the sub-page from each qualifying link
        each_link_year_page = each_link_year_page_list[0]
        year_page_url = site + each_link_year_page
        year_page_response = requests.get(year_page_url)
        year_page_soup = BeautifulSoup(year_page_response.content, "html.parser")

        links = [link.get("href") for link in year_page_soup.select(soup_page_select)]

        # browse through the list of links. See if the link URL partially matches the links_to_check list (recent years).
        for each_link_candidate in links:
            each_link_list = [each_link_candidate for sub_string in links_to_check if(sub_string in each_link_candidate)]
            if len(each_link_list) > 0:
                # sleep for a random time before getting the sub-page 
                time.sleep(1 + ( random.randrange( 0, 20) / 10 ) )
                # get and process the sub-page from each qualifying link
                each_link = each_link_list[0]
                sub_page_url = site + each_link
                sub_page_response = requests.get(sub_page_url)
                sub_page_soup = BeautifulSoup(sub_page_response.content, "html.parser")

                # Find and extract the links to files
                for each_link_file_to_download in link_files_to_download:
                    soup_select = "a:has(span:-soup-contains('" + each_link_file_to_download + "'))"
                    file_links = [file_links.get("href") for file_links in sub_page_soup.select(soup_select)]

                    # downloading the first pdf file link to the local directory (if it doesnt already exist)
                    download_pdf(site + file_links[0], datadir)

#### Process local files

In [11]:
start_source_file_date_for_table_output = datetime.date(2024, 4, 1) # process files dated 1 April 2024 onwards (March 2024 files show a format change)

output_treatments_df = pandas.DataFrame(columns=['source_file_name', 'source_file_date', 'end_date', 'lagevrio_courses', 'lagevrio_prescriptions', 'paxlovid_prescriptions'])
  
# Initialize an empty list to store dataframes from Word tables
dfs = [pandas.DataFrame() for _ in range(6)]

for file in os.listdir(datadir):
    filename = os.fsdecode(file)
    # try to derive the source_file_date from the end of the file name
    try:
        filename_for_source_file_date = filename.split('.')[0]
        # handle file names that end with _0, _1 etc
        re_does_filename_for_source_file_date_end_in_underscore_number = re.search(r'_\d$', filename_for_source_file_date)
        if re_does_filename_for_source_file_date_end_in_underscore_number is not None:
            filename_for_source_file_date = filename_for_source_file_date[:len(filename_for_source_file_date) - 2]
        source_file_date_str = ' '.join(filename_for_source_file_date.split('-')[-3:])
        source_file_date = dateutil.parser.parse(source_file_date_str, default=datetime.date(2000, 1, 1))
    except:
        source_file_date = datetime.date(2000, 1, 1)
    
    # browse through all the local pdf files, gathering the search results into a dataframe for output
    if filename.endswith('.pdf'):
        pdf_file = datadir + filename
        end_date, lagevrio_courses, lagevrio_prescriptions, paxlovid_prescriptions  = extract_data_from_pdf(pdf_file)
        # print(f"DEBUG: End date: {end_date}")
        # print(f"DEBUG: Lagevrio treatment courses: {lagevrio_courses}")
        # print(f"DEBUG: Lagevrio prescriptions: {lagevrio_prescriptions}")
        # print(f"DEBUG: Paxlovid prescriptions: {paxlovid_prescriptions}")

        # construct the output row and add it to the dataframe
        output_row = [filename, source_file_date, end_date, lagevrio_courses, lagevrio_prescriptions, paxlovid_prescriptions]
        output_treatments_df.loc[len(output_treatments_df.index)] = output_row

    # browse through all the local docx files, gathering the tables into dataframes for output
    if filename.endswith('.docx') and source_file_date >= start_source_file_date_for_table_output:
        # Load the Word document
        doc = Document(datadir + filename)
        table_counter = -1

        # Iterate through each table in the document
        for table in doc.tables:
            table_counter = table_counter + 1

            # Create a DataFrame structure with empty strings, sized by the number of rows and columns in the table
            df = [['' for _ in range(len(table.columns))] for _ in range(len(table.rows))]
            
            # Iterate through each row in the current table
            for i, row in enumerate(table.rows):
                # Iterate through each cell in the current row
                for j, cell in enumerate(row.cells):
                    # If the cell has text, store it in the corresponding DataFrame position
                    if cell.text:
                        df[i][j] = cell.text.replace('\n',' ')
            
            # Convert the list of lists (df) to a pandas DataFrame and add it to the tables list
            table_df = pandas.DataFrame(df)
            table_df['source_file_name'] = filename
            table_df['source_file_date'] = source_file_date
            dfs[table_counter] = pandas.concat([dfs[table_counter], table_df])



#### Gather result dataframes and write out to Excel sheets.

In [12]:

# sort each df by source_file_date desc
output_treatments_df = output_treatments_df.sort_values(['source_file_date'], ascending=[False]).reset_index(drop=True)
for df_index, each_df in enumerate(dfs.copy()):
   each_df.index.name = 'row_index_per_file'
   dfs[df_index] = each_df.sort_values(['source_file_date', 'row_index_per_file'], ascending=[False, True]).reset_index()

# write all the output...dfs to an Excel file with Sheet names
dfs.insert(0,output_treatments_df)

writer = pandas.ExcelWriter(output_filename, engine='xlsxwriter')
sheet_names = ["Treatments", "National Snapshot", "Active Outbreak Summary", "Workforce Resources", "Vaccinations","Regulatory Activities","Active Outbreaks"]
for df_index, frame in enumerate(dfs):
   frame.to_excel(writer, sheet_name = sheet_names[df_index])
writer.close()


#### Debug dataframe outputs

In [None]:
output_treatments_df

In [None]:
dfs[1] # National Snapshot

In [None]:
dfs[2] # Active Outbreaks

In [None]:
dfs[3] # Workforce Resources

In [None]:
dfs[4] # Vaccinations

In [None]:
dfs[5] # Regulatory Activities

In [None]:
dfs[6] # Active Outbreaks