# Webscrape AEMO data from nemweb

M1 APPLIED ECONOMETRICS, Spring 2024

Applied Econometrics - Master TSE 1 - 2023/2024

> Exploring the Influence of Daylight Saving Time on CO2 Emissions 
> and Electricity Consumption in Australia's Electricity Grid

LAST MODIFIED: 29/02/2024 

LAST MODIFIED BY: Matthew Davis

Script duration: multiple days!

Disk storage requirement: 300GB

Bandwidth: Fast download speed makes a big difference here. e.g. at 10MBi/s this script will take 2 days.

Memory requirement: negligible. Any modern laptop/desktop will be sufficient.

--------------------

This script downloads data files from https://www.nemweb.com.au/REPORTS/ and https://www.nemweb.com.au/Data_Archive/Wholesale_Electricity/MMSDM/ .

1. Crawl those pages to find a list of URLs for CSV and .zip files
2. Filter that list. e.g. delete large files we don't need
3. Download those files. (Suprisingly hard. The server is unreliable. Throttles us, gives corrupt files etc)

Note that this playbook takes about 1 day to run, and produces about 140GB of data, even though the dataset we care about is only a few hundred MB. This is because AEMO mixes up the data we want with lots of data we don't want. It's quite difficult to map their SQL 'tables' to the original files on their website. For most tables we can guess which source files have the data. But for some (in particular, CO2 emissions intensity per generator) I can't find which particular file has that data. So we download them all, process them, then choose just the ones we want (we choose in the next script, not this playbook).  (Actually, we don't download all. We skip files we definitely know don't have the data we want, e.g. gas market data. But that still leaves a lot we're unsure about.)

The code may seem more complex than you expected. That's because :

* I want to do multiprocessing to speed it up. (Some laptops apparently can't handle multiprocessing. So if `use_multiprocessing=False` then we do a `for` loop instead.)
* our downloads get throttled, so we need to sleep and retry
* The whole thing takes so long that we want to be able to stop the kernel and restart, without the state ending up broken (e.g. writing the same data 1.5 times to and output file.)
* Sometimes you can download 1000 files, and then file 1001 fails. I want to keep processing the remainder, gather up all the success/failures, and then compare them. So there's some try/catch stuff to handle that.
 
When faced with a decision between making this code fast vs understandable, I tried to focus on understandable. (Except for multiprocessing)

To run this on your laptop, you'll want to change `base_data_dir` to point to a directory with >140GB of space. It does not have to be inside this git repo.

If you have been given a copy of files already downloaded, you can change `base_data_dir` such that `raw_files_path` points to the directory of already-downloaded files. Then this script will avoid re-downloading those files, and will only download new/missing/corrupt files. If you are marking this script based on whether it 'just runs', grab the hard drive with 140GB of already-downloaded stuff, and point the folder variables at that hard drive, then run. Alternatively, if you want to assess that this code 'just runs', set `max_files_per_page=2` to only download a small subset of files.

There are many logs generated while running, to see what's happening. That's in `log_file`. (If this script printed them, it would be overwhelming and hard to scroll.)

## Imports

If you don't have these libraries installed, run `pip install -r requirements.txt`.

In [1]:
%pip install -r requirements.txt

Note: you may need to restart the kernel to use updated packages.


In [2]:
import os
from multiprocessing import Pool
import re
import urllib3
import shutil
from urllib.parse import  urljoin, urlparse
from time import sleep, time
from random import randrange, shuffle
import json
import importlib
import sys

# type annotations
# they don't do anything in python. It's just a concise way of documenting function input/output
from typing import Set, List, Dict, Tuple, Optional

from tqdm import tqdm # progress bar animation
import requests
from bs4 import BeautifulSoup # webscraping

# utils is our local utility module
# if we change utils.py, and re-run a normal 'import'
# python won't reload it by default. (Since it's already loaded.)
# So we force a reload
import utils
importlib.reload(utils)

<module 'utils' from '/home/matthew/applied_repo/utils.py'>

In [3]:
assert sys.version_info >= (3, 6), "Python version too low."

## Constants and Configuration

In [4]:
base_data_dir = 'data'

# the list of URLs and file sizes will be saved here
# relative to the repo (because it's small and we want to save this.)
urls_file_path = os.path.join(base_data_dir, '01-urls.json')

# the downlaoded files will be saved here
raw_files_path = os.path.join(base_data_dir, '01-A-raw/')

# logs will be written here
# on linux, view this live with `tail -f logs.txt`
log_file = os.path.join(base_data_dir, 'logs/01a.txt')

In [5]:
# set to an int >0 to download only a representative sample of files
# useful for debugging
max_files_per_page = None

In [6]:
# webscrape recursively from each of these pages
start_urls = [
    'https://www.nemweb.com.au/REPORTS/CURRENT/', # last year
    'https://www.nemweb.com.au/REPORTS/ARCHIVE/', # last year
    #'https://www.nemweb.com.au/Data_Archive/Wholesale_Electricity/MMSDM/2023/MMSDM_2023_10/', # one particular month
    'https://nemweb.com.au/Data_Archive/Wholesale_Electricity/MMSDM/' # 1-10 years old, in a different folder and zip structure
]

# ignore files other than these
# (case insensitive)
file_suffixes = ['.zip', '.csv']

# security check - don't follow links to other domains
expected_domains = ('www.nemweb.com.au', 'nemweb.com.au')

# ignore any URL containing one of these
# because we definitely don't need them.
# Skipping them will save time and disk space.
# Most of these are about gas instead of electricity,
# or bidding (financial stuff, not relevant for our research).
url_substrings_to_skip = [
    '/NEMDE/', 
    '/Gas_Supply_Guarantee/',
    '/VicGas/',
    '_GAS_'
    '/DWGM/',
    'Predispatch', # predictions (very large)
    'FCAS', # unrelated to our purposes, and quite large
    'BIDPEROFFER', # large, unrelated
    'BIDDAYOFFER', # large, unrelated
    'BIDOFFER',
    'BID_OFFER',
    'BIDMOVE',
    'BIDTYPES',
    'BIDDUIDDETAILS',
    'RESIDUE_PRICE_FUNDS_BID',
    'DISPATCHCONSTRAINT',
    'DAYOFFER',
    'MTPASA_OFFER',
    'MTPASA',
    'PDPASA',
    'STPASA',
    'PRUDENTIAL',
    'MMSDM_CLI_', # not data
    'MMSDM_GUI_', # not data
    'CONSTRAINTSOLUTION', # detailed electrical constraints
]


# process files with multiprocessing, to be faster.
# If set to False, will use a for loop, which gives clearer traceback error messages.
# We found on some laptops multiprocessing didn't work because of Python installation issues.
# Defaulting to False to ensure compatability. 
# Set this to True if you know your python installation can do multiprocessing.
# This script will leave one CPU spare so you can keep using your laptop for other stuff.
use_multiprocessing = False


## Preparation

In [7]:
# decrease the priority of this script given by the OS
# so you can keep using your laptop for other stuff
utils.renice()

In [8]:
logger = utils.Logger(log_file)
logger.info("Initialising Logger")
logger.info(f"Running with config {use_multiprocessing=} {max_files_per_page=}")

## Discover which files to download

We don't actually download the .zip or .csv files yet.
We just get a list of the URLs of the .zip and .csv files.

In [9]:
# create a session object to re-use between requests
# to hopefully speed up downloads by not re-doing the TLS handshake for each HTTP request
# (unsure if this actually speeds things up)
session = requests.Session()

In [10]:
urls = []

progress_bar = tqdm(leave=False)

# nemweb sometimes redirects from https to http if you forget a traling slash
# undo that.
def force_https(url):
    p = urlparse(url)
    if p.scheme != 'https':
        assert url.count('http://') == 1, f"Strange URL: {url}" # abort, for security reasons
        url = url.replace('http://', 'https://')
    return url

def is_child_of(parent, child):
    p = urlparse(parent)
    c = urlparse(child)
    return (p.hostname in expected_domains) and (c.hostname in expected_domains) \
        and c.path.rstrip('/').startswith(p.path.rstrip('/')) \
        and c.path.rstrip('/') != p.path.rstrip('/')

# unit tests
assert is_child_of('https://www.nemweb.com.au/REPORTS/', 'https://www.nemweb.com.au/REPORTS/CURRENT/')
assert not is_child_of('https://www.nemweb.com.au/REPORTS/CURRENT', 'https://www.nemweb.com.au/REPORTS/')
assert not is_child_of('https://www.nemweb.com.au/REPORTS/CURRENT/', 'https://www.nemweb.com.au/REPORTS/CURRENT/')
assert not is_child_of('https://www.nemweb.com.au/REPORTS/CURRENT/', 'https://www.nemweb.com.au/REPORTS/ARCHIVE/')


# returns a tuple of (url, size)
# where url is a string
# size is number of bytes, as an int. But None if unknown
def extract_links(url) -> List[Tuple[str, Optional[int]]]:
    logger.info(f"Checking for links in {url}")
    if any(ss in url for ss in url_substrings_to_skip):
        # skip this. Don't bother looking inside it.
        logger.info(f"Skipping {url} because of substrings")
        return []
    # this is a web page listing other files or pages of other files
    logger.info(f"Downloading {url}")
    r = session.get(url)
    if r.status_code >= 300:
        sleep(5)
        r = session.get(url)
    r.raise_for_status()
    html = r.text

    # it's called "soup" because the python webscraping library is called "beautiful soup"
    soup = BeautifulSoup(html)

    links = []
    files_this_page = 0
    for a in soup.find_all('a'):
        u = a['href']

        # convert potentially relative URLs to absolute
        u = urljoin(start_url, u)

        # ignore links that aren't files on this page or subpages
        # watch out, the domain can sometimes change (www. removed or added)
        if not is_child_of(parent=url, child=u):
            continue # skip the rest of this for loop iteration, go to the next one

        elif u.endswith('/'):
            # this is not a link to a file. This is a link to another page of files.

            # call this function recursively
            links_of_children = extract_links(u)
            links.extend(links_of_children)

        else:
            # this is a link to a file

            if not any(u.lower().endswith(ext.lower()) for ext in file_suffixes):
                logger.debug(f"Ignoring {u} because of file extension")
                continue

            
            # iognore any links which contain the blacklisted substrings
            if any(ss in u for ss in url_substrings_to_skip):
                for ss in url_substrings_to_skip:
                    logger.debug(f"Ignoring {u} because of blacklisted substring {ss}")
                # watch out, we want to 'continue' the outer for loop, not the inner one, which is only for debugging
                continue
            
            # Each link should have a string that precedes URL itself
            # e.g. Tuesday, January 2, 2024  8:00 AM       168243
            # let's get this size
            # in this example, 168243
            # use None if we can't get the size successfully
            text = a.previous_sibling
            if not isinstance(text, str):
                logger.error(f"No suitable text preceding URL. Previous sibling not text, is {a.previous_sibling} of type {type(text)}, for url {u} found on page {url}")
                size = None
            else:
                text = text.strip()
                match = re.match(r"\w+, \w+ \d{1,2}, \d{4} +\d{1,2}:\d{1,2} [AP]M\s+(\d+)", text)
                if match:
                    size = match.group(1)
                    try:
                        size = int(size)
                    except ValueError:
                        logger.error(f"Unable to extract file size from text: {text}, for url {u} found on page {url}")
                        size = None
                else:
                    size = None

            # only a few files per page
            # except MMSDM pages, which have one file of each type
            if max_files_per_page and (max_files_per_page > 0) and ('/MMSDM/' not in url) and (files_this_page >= max_files_per_page):
                logger.info(f"Ignoring {u} because of max_files_per_page")
            else:
                links.append((u, size))
                files_this_page += 1

    progress_bar.update()
    logger.info(f"Found {len(links)} links at/under {url}")
    return links

urls = []
for start_url in start_urls:
    urls.extend(extract_links(start_url))

progress_bar.close()

                         

Now we filter the list of URLs, to discard the ones we don't care about. (Large ones, gas ones etc)

Just delete URLs with string from `url_substrings_to_skip` in them.

In [11]:
# For the older files, each year is available as one big file, or lots of little files
# e.g. https://nemweb.com.au/Data_Archive/Wholesale_Electricity/MMSDM/2017/
# We don't want to download both
# drop the few large files, keep the little ones
urls = [(u, size) for (u, size) in  urls if not re.search(r"MMSDM_\d{4}_\d{2}.zip", u)]

In [12]:
# should have been filtered already, but just in case.
# (Usefull if you want to add a substring to skip without re-crawling nemweb)
urls = [(u, size) for (u, size) in urls if not any(s.lower() in u.lower() for s in url_substrings_to_skip)]
len(urls)

86366

In [13]:
# check there are no duplicate files in different locations on the website

# deduplicate based on everything after the last slash
urls = list({u.split('/')[-1]:(u, sz) for (u, sz) in urls}.values())
len(urls)

86244

In [14]:
# sort in descending order of size before writing
# it makes it easier to analyse whether there's any large files we can skip
# later on we'll shuffle the list again
urls.sort(key=lambda u: u[1], reverse=True)

In [15]:
print(f"Writing {len(urls)} urls to {urls_file_path}")

with open(urls_file_path, 'w') as f:
    json.dump(urls, f, indent=2)

Writing 86244 urls to /media/matthew/Tux/AppliedEconometrics/data/01-urls.json


## Download the files


Since we're downloading a lot of files, from the other side of the world, over slow wifi, we can get an error.
We don't want to redownload all the files when one has an error. So let's just carry on, then retry the whole lot.
When retrying, this code checks if the file already exists on disk. If not, dowload it.  If yes, it might be an incomplete download. To check, we compare the expected size of the file (obtained earlier) to the size of the file on disk. If it's the same, do not re-download the file.

We get throttled a lot, which appears as 403 errors. Sleeping and retrying fixes that. 

This takes hours to run. You can stop the playbook, and then just re-run from here onwards. The code will not re-download files it already has. You can even restart the kernel (e.g. if you shut off your laptop) and re-run. (But then you'll have to re-run the imports up the top of the playbook, and the subsequent constant definitions. But you don't need to rerun the URL detection section. The next cell gets the saved list of URLs from `urls.txt`.)

In [16]:
# if you want to skip re-indexing, run the playbook from here
with open(urls_file_path, 'r') as f:
    urls = json.load(f)

In [17]:
# If you have not got a folder full of pre-downloaded files, this is how many bytes you're about to download.
# If you do have a folder of downloaded files already, and are just re-running to check the script works,
# you may download somewhere between 0 and this amount.  (Probably close to zero.)
sum(u[1] for u in urls)

155313288698

In [18]:
# create raw folder if it doesn't exist
utils.create_dir(raw_files_path)

In [19]:
# shuffle, so that large files aren't clumped together.
# this makes the progress bar estimate more accurate (compared to downloading all the big files last/first)
# and if we don't download all the small files consecutively, we're less likely to be throttled
shuffle(urls) # mutates list in place

In [20]:
# using urllib3 instead of requests
# because it's better for streaming large files to disk
# https://stackoverflow.com/a/62075390/5443120
http = urllib3.PoolManager(retries=8)

In [None]:
# perform an HTTP HEAD request to ask the server for the size of the file
# without downloading the whole thing
def get_remote_size(url, retries=6):
    r = http.request('HEAD', url)
    if r.status >= 300:
        if retries > 0:
            logger.warning(f"Retrying after bad status ({r.status}) for HEAD {url}")
            sleep(randrange(5))
            return get_remote_size(url, retries=retries-1)
        else:
            logger.error(f"Not retrying after bad status ({r.status}) for HEAD {url}")
            raise ValueError(f"bad status ({r.status}) for HEAD {url}")
    return int(r.headers['Content-Length'])

# have we already downloaded this url?
# don't just check based on a file with that name existing locally,
# it may be an incomplete download.
# So also check based on size.
# expected_size is the number from the nemweb page, just prior to the URL itself
# it may be None, if we couldn't extract it
# We can send a quick HEAD HTTP request to check file size without downloading a file
def already_downloaded(url, expected_size, local):
    if not os.path.exists(local):
        return False
    else:
        local_size = os.path.getsize(local)
        if (expected_size is not None) and (local_size == expected_size):
            return True
        else:
            return get_remote_size(url) == local_size

# url_and_size is a tuple, like ('https://...', 123)
# a bit awkward to combine it into one, but passing multiple arguments with multiprocessing is tricky
def download(url_and_size, retries=6):
    (url, size) = url_and_size
    fname = url.split('/')[-1]
    local_path = os.path.join(raw_files_path, fname)
    try:
        if already_downloaded(url, size, local_path):
            logger.info(f"Skipping already downloaded {url}")
            return 'skipped'
        else:
            r = http.request('GET', url, preload_content=False)
            if (r.status == 404) and ("nemweb.com.au/REPORTS/CURRENT" in url):
                # this whole playbook takes so long
                # that some daily files get moved by AEMO into monthly files
                # re-run the playbook to grab those new monthly files.
                # But if this happens, it's probably for data that's only 1 week old
                # which is outside the scope of our study.
                # So continus on for now.
                logger.warning(f"File {url} not found. Probably daily file expired")
                return 'skipped'
            elif r.status >= 300:
                logger.warning(f"Retrying after bad status ({r.status}) for GET {url}")
                sleep(randrange(5))
                return download(url_and_size, retries=retries-1)
            logger.info(f"Downloading {url} to {local_path}")
            with open(local_path, 'wb') as f:
                shutil.copyfileobj(r, f)
            logger.info(f"Downloaded {url} to {local_path}")
    except (urllib3.exceptions.HTTPError, ValueError) as ex:
        # tidy up partial download
        try:
            os.remove(local_path)
        except OSError:
            pass
        if retries <= 0:
            logger.error(f"Not retrying after error with {url}: {ex}")
            return ex
        else:
            logger.warning(f"Retrying after error with {url}: {ex}")
            sleep(randrange(3))
            return download(url_and_size, retries=retries-1)
    return 'ok'
                


if __name__ == '__main__':
    
    # use multiprocessing to process the files concurrently
    # tqdm for a progress bar
    # list(tqdm(imap())) thing explained here: https://stackoverflow.com/a/41921948/5443120

    # remember that urls is a list of tuples of url and size
    if use_multiprocessing:
        with Pool(utils.num_cpu()) as p:
            statuses = list(tqdm(p.imap(download, urls), total=len(urls)))
    else:
        statuses = [download(url) for url in tqdm(urls)]

    # if these are only HEAD 404s for a few files, that's ok. They've probably moved from /CURRENT to /ARCHIVE (under a different file name)
    # Re-run the notebook from the start up to this point once more, and you'll get new files which contain that data
    # to debug this, use the next cell
    assert all(s in ['ok', 'skipped'] for s in statuses), "some files failed to be downloaded. Re-run whole playbook once."

print('done')

  1%|▏                                  | 566/86244 [11:01<106:18:19,  4.47s/it]