The code below scrapes zipped pgn files from PGN Mentor then downloads them to `.` and extracts them `./pgns/`.

In [1]:
import os
import json
import time
import shutil
import requests

from time import time, sleep
from pprint import pprint
from zipfile import ZipFile, BadZipFile
from datetime import timedelta
from multiprocessing import Pool

import bs4
from bs4 import BeautifulSoup as BS

In [2]:
PGN_SRC_URL = 'https://www.pgnmentor.com'
PGN_FILES_URL = f'{PGN_SRC_URL}/files.html'

#PGN_SRC_CACHE_FNAME = 'main_png_site.html'

In [32]:
# Object that downloads a file from the internet if it is not present
# on disk or (optionally) if it is "old", as defined by the user.
class WebFile():
    # @url: web url
    # @max_age: datetime.timedelta object (ex: timedelta(days=20))
    # @filename: filename to write to disk; the class will find a
    #            filename if this is empty
    # @force: will force a download to disk if True
    def __init__(self, url, max_age=None, filename=None, force=False):
        self.url = url
        self.max_age = max_age
        self.contents = None
        
        if filename:
            self.file = filename
        else:
            self.file = self.url_to_filename(url)
            
        self.initialize_file(force)
        
    def __str__(self):
        return f'{self.file} ({self.getAge()}/{self.max_age})'

    # Converts a url to a file name. Some guess work is involved in
    # creating an extension for the file name, which might do more
    # harm than good sometimes.
    def url_to_filename(self, url):
        basename = os.path.basename(url)
        if '.' not in basename:
            basename = f'{basename}.html'
        return basename

    # Makes sure file is on disk and up to date
    def initialize_file(self, force=False):
        if force or \
           not os.path.exists(self.file) or \
           self.isOutOfDate():
            self.fetch()

    # Downloads file to disk
    def fetch(self):
        r = requests.get(self.url, stream=True)
        with open(self.file, 'wb') as f:
            for chunk in r.iter_content(chunk_size=1024):
                f.write(chunk)
        
    # Returns a file's age in the form of a timedelta value
    # Will raise an error if the file isn't on disk
    def getAge(self):
        diff = time() - os.path.getmtime(self.file)
        return timedelta(seconds=diff)
        
    # Compares current time and file's last modification time
    def isOutOfDate(self):
        if not self.max_age:
            return False

        if self.getAge() > self.max_age:
            return True
        return False

    # Reads whole file into memory if not already in memory
    def read(self):
        if not self.contents:
            with open(self.file) as f:
                self.contents = f.read()
        return self.contents

    # Overwrites existing file with prettified html output (not general purpose)
    def _prettify(self):
        soup = BS(self.read())
        self.contents = soup.prettify()
        with open(self.file, 'w') as f:
            f.write(self.contents)

In [33]:
# Class to facilitate downloading and extracting files from PGNMentor
class PGNMentor():
    EXTRACT_DIR = 'pgns'

    def __init__(self):
        wf = WebFile(PGN_FILES_URL, max_age=timedelta(days=5))
        #wf._prettify()
        self.soup = BS(wf.read())
        
        self.files = [] # populated in load_all
        
    # input a filename and the url will be determined and fetched
    # WARNING: every file has to be downloaded for this to happen currently
    def refreshOne(self, name):
        if not self.files:
            self.load_all()

        fnames = [wf.file for wf in self.files]
        if name not in fnames:
            raise FileNotFoundError(name)
            
        wf = self.files[fnames.index(name)]
        wf.fetch()

    # returns list of urls found ex: ['players/Philidor.zip', ...]
    def get_all_base_urls(self):
        ret = set()
        for tag in self.soup.find_all('a'):
            try:
                ret.add(tag['href'])
            except KeyError:
                continue
        return ret
    
    def get_zip_urls(self):
        ret = set()
        for url in self.get_all_base_urls():
            if url.endswith('.zip'):
                ret.add(f'{PGN_SRC_URL}/{url}')
        return ret
    
    def get_opening_urls(self):
        ret = set()
        for url in self.get_all_base_urls():
            if url.startswith('openings') and url.endswith('.zip'):
                ret.add(f'{PGN_SRC_URL}/{url}')
        return ret
    
    def get_player_urls(self):
        ret = set()
        for url in self.get_all_base_urls():
            if url.startswith('players') and url.endswith('.zip'):
                ret.add(f'{PGN_SRC_URL}/{url}')
        return ret
        
    # Download files from the site if they're not on disk or if they are old.
    def _load_urls(self, urls, sleep_len=2, quiet=True):
        start_time = time()

        for url in urls:
            wfile = WebFile(url, max_age=timedelta(days=90))
            
            # Sleep if we accessed the web and updated the file
            if wfile.getAge() < timedelta(seconds=10):
                sleep(sleep_len)

            self.files.append(wfile)
            if not quiet:
                print('.', end='')
        if not quiet:
            print()

        diff = time() - start_time
        if not quiet:
            print(f'Elapsed time: {int(diff)}s')
            
    def load_all(self, **kwargs):
        return self._load_urls(self.get_zip_urls(), **kwargs)
            
    def load_openings(self, **kwargs):
        return self._load_urls(self.get_opening_urls(), **kwargs)
            
    def load_players(self, **kwargs):
        return self._load_urls(self.get_player_urls(), **kwargs)

    @staticmethod
    def _unzip(zip_filename):
        try:
            with ZipFile(zip_filename) as pgn_zip:
                # Figure out what file we might want to extract. Don't use extractall
                # because of the scary vulnerabilities it presents.
                pgn_files = [fname for fname in pgn_zip.namelist() if fname.endswith('.pgn')]
                for pgn_file in pgn_files: # should only be 1 file tbh
                    pgn_zip.extract(pgn_file, path=PGNMentor.EXTRACT_DIR)
        except:
            # Don't raise any exception because there are just bad files sometimes
            print(f'ERROR with: {zip_filename}')
            return 0

        return len(pgn_files)

    # Extract all files listed on the website or in the current directory.
    # WARNING: Silently overwrites existing files.
    #
    # @cpus: number of parallel unzips to run
    # @quiet: display some info
    # @file_list: extract from this list (self.files otherwise)
    def extract_all(self, cpus=1, quiet=True, file_list=None):
        try:
            os.mkdir(PGNMentor.EXTRACT_DIR)
        except FileExistsError:
            pass

        start_time = time()

        # Get list of file names
        if not file_list:
            file_list = [f.file for f in self.files]
            
        # User hasn't called _load_urls yet, so just extract from the current
        # directory.
        if not file_list:
            file_list = []
            for root, dirs, files in os.walk('.'):
                if root != '.':
                    continue
                file_list.extend((f for f in files if f.endswith('.zip')))
                
        p = Pool(cpus)

        res = p.map(self._unzip, file_list)
        
        if not quiet:
            diff = time() - start_time
            print(f'Elapsed time: {int(diff)}s')
            print(f'Extracted {sum(res)} .pgn files.')


In [40]:
p = PGNMentor()
p.load_openings() # populates p.files and refreshes old files
p.extract_all(quiet=False, cpus=4)

ERROR with: ScotchGambit.zip
Elapsed time: 7s
Extracted 231 .pgn files.


In [64]:
p = PGNMentor()
p.load_all()
#p.refreshOne('ScotchGambit.zip')
p.extract_all(quiet=False, cpus=4)

ERROR with: ScotchGambit.zip
Elapsed time: 8s
Extracted 476 .pgn files.
