# Wikipedia Access & Album Parsing Code

In [1]:
import os
import re
from urllib.parse import urljoin
import datetime as dt

from bs4 import BeautifulSoup
import pandas as pd

In [3]:
class WikipediaAlbumInfo:
    WP_ALBUM_YEAR_INDEX_URL = 'https://en.wikipedia.org/wiki/Category:Lists_of_albums_by_release_date'
    
    def __init__(self, url_cache):
        self.url_cache = url_cache

    def _get_year_urls_raw(self):
        r = self.url_cache.get(self.WP_ALBUM_YEAR_INDEX_URL)
        return r['content']

    def _parse_year_list_urls(self, html_text):
        html_doc = BeautifulSoup(html_text)
        results = []
        for a_tag in html_doc.find_all('a'):
            if 'href' in a_tag.attrs and 'title' in a_tag.attrs is not None:
                href = a_tag.attrs['href']
                title = a_tag.attrs['title']
                m = re.search(r'List of (\d+) albums', title)
                if m:
                    year = int(m.group(1))
                    results.append({
                        'year': year,
                        'title': title,
                        'url': urljoin(self.WP_ALBUM_YEAR_INDEX_URL, href)
                    })
        results = pd.DataFrame(results).sort_values(by='year', ascending=False)
        return results
    
    def get_year_urls(self):
        html_text = self._get_year_urls_raw()
        return self._parse_year_list_urls(html_text)
    
class TableExtractor:
    
    @staticmethod
    def get_dataframes(h):
        if not isinstance(h, BeautifulSoup):
            h = BeautifulSoup(h)
            
        dfs = []
        for tag_table in h.find_all('table'):
            try:
                dfs.append(TableExtractor.to_dataframe(tag_table))
            except Exception as e:
                print(f"[WARN] Could not extract table - {e}")
                #raise(e)
        return dfs
    
    @staticmethod
    def to_dataframe(tag_table):
        rows = TableExtractor.extract_rows(tag_table)
        return pd.DataFrame(TableExtractor.to_table_dict(rows))
    
    @staticmethod
    def extract_cell(tag):
        cell = {
            'type': tag.name,
            'text': tag.text
        }
        if 'rowspan' in tag.attrs:
            cell['rowspan'] = int(tag.attrs['rowspan'])
        if 'colspan' in tag.attrs:
            cell['colspan'] = int(tag.attrs['colspan'])

        links = []
        for tag_a in tag.find_all('a'):
            if "href" in tag_a.attrs:
                links.append({ 'href': tag_a.attrs['href'], 'text': tag_a.text})
        cell['Links'] = links
        return cell
    
    @staticmethod
    def extract_row(tag_tr):
        row = []
        for tag in tag_tr.children:
            if tag.name in {'td', 'th'}:
                row.append(TableExtractor.extract_cell(tag))
        return row

    @staticmethod
    def extract_rows(tag_table):
        rows = []
        
        rows = [TableExtractor.extract_row(tag_tr) for tag_tr in tag_table.find_all('tr')]
        rows = TableExtractor.apply_expansions(rows)
        return rows
        
    @staticmethod
    def apply_expansions(rows):
        rows = TableExtractor.apply_colspan(rows)
        rows = TableExtractor.apply_rowspan(rows)
        return rows

    @staticmethod
    def apply_rowspan(rows):
        n_rows = [row.copy() for row in rows]
        
        for row_no in range(0, len(n_rows)):
            cur_row = n_rows[row_no]
            for col_no in range(0, len(cur_row)):
                cell = cur_row[col_no]
                if 'rowspan' in cell:
                    #print(f"Applying rowspan: {cell}")
                    rowspan = cell['rowspan']
                    cell = cell.copy()
                    del(cell['rowspan'])
                    n_rows[row_no][col_no] = cell
                    for row_delta in range(1, rowspan):
                        if (row_no + row_delta) < len(n_rows):
                            n_rows[row_no + row_delta].insert(col_no, cell.copy())
        
        return n_rows
    
    @staticmethod
    def apply_colspan(rows):
        n_rows = []
        for row in rows:
            n_row = []
            for cell in row:
                if 'colspan' in cell:
                    cell = cell.copy()
                    colspan = cell['colspan']
                    del(cell['colspan'])
                    for i in range(0, colspan):
                        n_row.append(cell.copy())
                else:
                    n_row.append(cell)
            n_rows.append(n_row)
        return n_rows
    
    @staticmethod
    def find_header_row_no(rows):
        for row_no in range(0, len(rows)):
            row = rows[row_no]
            if len(row) > 0 and row[0]['type'] == 'th':
                return row_no
            
    @staticmethod
    def to_table_dict(rows):
        hrow_no = TableExtractor.find_header_row_no(rows)
        row_start = 0
        if hrow_no is not None:
            row_start = hrow_no + 1
            col_names = [cell['text'].strip() for cell in rows[hrow_no]]
        else:
            col_names = [f"Unnamed:{i}" for i in range(0, len(rows[0]))]
        
        data_rows = []
        for row_no in range(row_start, len(rows)):
            row = rows[row_no]
            data_row = {}
            links = {}
            for h_i, h_n in enumerate(col_names):
                if h_i < len(row):
                    data_row[h_n] = row[h_i]['text'].strip()
                    if 'Links' in row[h_i] and len(row[h_i]['Links']) > 0:
                        links[h_n] = row[h_i]['Links']
            data_row['Links'] = links
            data_rows.append(data_row)

        return data_rows

class AlbumListExtractor:
    
    HEADER_RE = re.compile(r'h[1-7]')
    
    @staticmethod
    def get_album_dataframe(h):
        albums = []
        for ls in AlbumListExtractor.get_album_lists(h):
            for item in ls['items']:
                a = AlbumListExtractor.parse_list_album_item(item)
                if a is not None:
                    albums.append(a)
        return pd.DataFrame(albums)
    
    @staticmethod
    def parse_list_album_item(item):
        parts = re.split(r'[\u2013\-]', item['text'], 2)
        if len(parts) < 2:
            print(f"[WARN] - {item}")
            return None

        album = {}
        album['Album'] = parts[0].strip()
        album['Artist'] = parts[1].strip()

        album_link = None
        artist_link = None

        if 'Links' in item:
            for link in item['Links']:
                t = link['text'].strip()
                href = link['href']
                if t == album['Album']:
                    if album_link is None or "album" in href.lower():
                        album_link = link
                if t == album['Artist']:
                    if not("album" in href.lower()):
                        artist_link = link

        album['Links'] = {}
        if album_link is not None:
            album['Links']['Album'] = [album_link]
        if artist_link is not None:
            album['Links']['Artist'] = [artist_link]

        return album   
    
    @staticmethod
    def get_album_lists(h):
        ls = AlbumListExtractor.get_all_lists(h)
        return [l for l in ls if "album" in l['title'].lower()]
    
    @staticmethod
    def get_all_lists(h):
        if not isinstance(h, BeautifulSoup):
            h = BeautifulSoup(h)
        
        lists = []
        for ul_tag in h.find_all('ul'):
            lists.append(AlbumListExtractor.get_list(ul_tag))
        return lists
    
    @staticmethod
    def get_list(list_tag):
        prev_header = list_tag.find_previous(AlbumListExtractor.HEADER_RE)
        if prev_header is not None:
            list_title = prev_header.text
        else:
            list_title = None
            
        items = []
        for li_tag in list_tag.find_all('li'):
            item = {
                'text': li_tag.text
            }
            links = []
            for a_tag in li_tag.find_all('a'):
                if 'href' in a_tag.attrs:
                    links.append({ 'href': a_tag.attrs['href'], 'text': a_tag.text })
            if len(links) > 0:
                item['Links'] = links

            items.append(item)

        return { 'title': list_title, 'items': items }
            
    
class WikiListPageAlbumExtractor:
    
    def __init__(self, url_cache):
        self.url_cache = url_cache
        
    def get_albums(self, url):
        r = self.url_cache.get(url)
        h = r['content']
        h_doc = BeautifulSoup(h)
        
        dfs = TableExtractor.get_dataframes(h_doc)
        dfs = [df for df in dfs if "Album" in df.columns]
    
        # If no albums were found - fall back to list extractor
        if len(dfs) == 0:
            df = AlbumListExtractor.get_album_dataframe(h_doc)
            if df is not None:
                dfs = [df]
                
        for df in dfs:
            df['Source'] = url
                
        return dfs

    def get_all_dataframes(self, year_lists):
        all_album_dfs = []
        for _, row in year_lists.iterrows():
            for album_df in self.get_albums(row.url):
                album_df['Year'] = row['year']
                all_album_dfs.append(album_df)
        return all_album_dfs
    
    def get_all_albums(self, year_lists):
        all_dfs = self.get_all_dataframes(year_lists)
        album_df = (pd.concat(
            [df for df in all_dfs if "Album" in df.columns],
            ignore_index=True
        )[['Artist', 'Album', 'Genre', 'Label', 'Year', 'Links', 'Source']])
        
        for _, row in album_df.iterrows():
            self.clean_links(row['Source'] ,row['Links'])
        
        return album_df

    def clean_links(self, source_url, links):
        for cat, cat_links in links.items():
            for link in cat_links:
                link['href'] = urljoin(source_url, link['href'])
        