In [1]:
%matplotlib inline
from IPython.display import display, HTML
import requests 
from lxml import html
import pandas as pd
import numpy as np
from datetime import date, datetime, time
from os import path, mkdir
import re


In [2]:
cache_dir = './cache'
playlist_cache_dir = path.join(cache_dir, 'playlists')
a2z_cache_dir = path.join(cache_dir, 'a2z')
a2z70s_cache_dir = path.join(cache_dir, 'a2z70s')
a2z80s_cache_dir = path.join(cache_dir, 'a2z80s')
xpn2020_cache_dir = path.join(cache_dir, 'xpn2020')
musicbrainz_cache_dir = path.join(cache_dir, 'musicbrainz')
data_dir = './data'

for d in (cache_dir, playlist_cache_dir, a2z_cache_dir, a2z70s_cache_dir,
          a2z80s_cache_dir, data_dir, musicbrainz_cache_dir):
    if not path.exists(d): mkdir(d)


In [5]:
def fetch_daily_playlist(day, cache_dir=None, verbose = False):
    """
    Fetches the XPN playlist for a given date
    
    Args:
        day (datetime.date) : The day to fetch the playlist for
        cache_dir (string)  : Path to the cache directory, or None to avoid caching
    
    Returns:
        DataFrame containing Artist and Title as Strings and Airtime as Timestamp
    """
    songs = pd.DataFrame(None, columns=['Artist', 'Title', 'Air Time'])
    if cache_dir is not None:
        cache_file =  path.join(cache_dir, "%04d-%02d-%02d.csv" % \
                                (day.year, day.month, day.day))
    if cache_file is not None and path.exists(cache_file):
        songs = pd.read_csv(cache_file)
        songs['Air Time'] = pd.to_datetime(songs['Air Time'], errors='coerce')
        if verbose: print "Got %d rows from %s" % (len(songs), cache_file)
    else:
        day_s = '%02d-%02d-%04d' % (day.month, day.day, day.year)
        page = requests.post('https://xpn.org/playlists/xpn-playlist',
                                 data = {'playlistdate': day_s})
        if verbose: print "fetching %s returned status %s" % (day_s, page.status_code)
        
        # play list pages claim to be utf-8, but the rare non-ascii character
        # is always latin-1
        #tree = html.fromstring(page.content.decode('latin-1'))
        tree = html.fromstring(page.content)
        tracks = tree.xpath('//h3/a/text()')
        # not all rows are tracks, some are membership callouts
        # but real tracks start with times and are formatted
        # HH:MM [am|pm] Artist - Title
        # Note that I've seen titles with embedded dashes,
        # but so far no artist names with them.  This may be luck.
        # Special programs like World Cafe, Echos, ...
        # also start with an air time, but don't have useful track info
        # but those list the program inside bars
        # eg |World Cafe| -  "Wednesday 11-2-2016 Hour 2, Part 7"
        date_regex = re.compile("^\d{2}:\d{2}\s")
        line_count= 0
        track_count = 0
        for track in tracks:
            line_count += 1
            if date_regex.match(track) and track[9:10] != '|':
                (artist, title) = track[9:].split(' - ', 1)
                dt = datetime.strptime(track[:8], '%I:%M %p')
                air_time = datetime.combine(day, dt.time())
                if verbose: print "adding %s %s %s" % (artist, title, air_time)
                songs = songs.append({'Artist': artist,
                                      'Title': title,
                                      'Air Time': air_time},
                                     ignore_index = True)
                if verbose: print "size = %d" % len(songs)
                track_count += 1
            
        if verbose: print 'read %d line and added %d tracks' % (line_count, track_count)
        # Drop any duplicates, which are not uncommon
        songs = songs.drop_duplicates()
        if cache_file is not None:
            songs.to_csv(cache_file, index=False, encoding='utf-8')
            if verbose: print 'write %d rows to %s' % (len(songs), cache_file)
    
    return songs

In [6]:
def fetch_playlist(start, end, cache_dir=None):
    """
    Fetch all the playlist entries for a range of time.
    
    Args:
        start (datetime.datetime) : The inclusive start time to fetch entries for
        end (datetime.datetime)   : The exclusive end time to fetch entries for
        cache_dir (string)        : path to the cache directory, or None to avoid caching
    
    Returns:
        Dataframe containing Artist and Title as strings, and Airtime as timestamp
    """
    songs = pd.DataFrame(None, columns=['Artist', 'Title', 'Air Time'])
    for day in pd.date_range(start.date(), end.date()):
        songs = songs.append(fetch_daily_playlist(day, cache_dir), ignore_index=True)
    songs = songs[songs['Air Time'] >= start]
    songs = songs[songs['Air Time'] < end]
    # sometimes the playlist entries are duplicated
    song = songs.drop_duplicates()
    songs = songs.sort_values(by = 'Air Time')
    return songs

In [7]:
xpn2020 = fetch_playlist(datetime(2020, 12, 10, 8, 0), datetime.now(),
                          playlist_cache_dir)
print "got %d rows" % len(xpn2020)

got 1052 rows


In [8]:
HTML(xpn2020.head(5).to_html())

Unnamed: 0,Artist,Title,Air Time
194,Booker T. & The MG's,Time Is Tight,2020-12-10 08:02:00
193,AC/DC,T.N.T.,2020-12-10 08:05:00
192,Peter Frampton,Show Me the Way,2020-12-10 08:11:00
191,The Drifters,Under The Boardwalk,2020-12-10 08:16:00
190,Adele,Rumor Has It,2020-12-10 08:19:00


In [9]:
HTML(xpn2020.tail(5).to_html())

Unnamed: 0,Artist,Title,Air Time
884,The Moody Blues,Question,2020-12-13 19:32:00
883,Lucinda Williams,Right In Time,2020-12-13 19:39:00
882,Four Tops,Bernadette,2020-12-13 19:44:00
881,Foghat,Slow Ride,2020-12-13 19:47:00
880,Faces,Stay With Me,2020-12-13 19:56:00


In [10]:
def first_char(s):
    for c in s:
        if type(c) is str and c.isalpha():
            return c.upper()
    return s[0]
    

xpn2020 = xpn2020.join(xpn2020.apply(lambda x: first_char(x[1]), axis=1).to_frame('Letter'))

In [12]:
from nltk.tokenize import RegexpTokenizer
custom_tokenize = RegexpTokenizer("[\w'\-]+|[^\w'\s\-]").tokenize

xpn2020 = xpn2020.join(xpn2020.apply(lambda x: custom_tokenize(x[1])[0], axis=1).to_frame('First Word'))

In [13]:
def estimate_durations(playlist, end_time=None):
    """
    Estimate the song durations
    Args: 
        playlist (DataFrame): playlist with minimally an 'Air Time' attribute
        end_time (datetime): end time of the play list, or None if still going
    Return:
        modified DataFrame with 'Duration' attribute added.
    """
    
    playlist['Duration'] = pd.Series([0 for x in range(len(playlist.index))], index=playlist.index)
    previous = None
    last_idx = None
    for idx, row in playlist.iterrows():
        if not previous is None:
            if row['Air Time'].date().weekday() == 4 and previous.hour == 11 and row['Air Time'].hour == 12:
                # We just fell into a free at noon
                playlist.loc[last_idx, 'Duration'] = 60 - previous.minute
            else:
                # just subtract this start from the previous
                delta = row['Air Time'] - previous
                playlist.loc[last_idx, 'Duration'] = delta.seconds / 60
        previous = row['Air Time']
        last_idx = idx

    # fixup the last row
    if end_time is not None:    
        delta = end_time - playlist.loc[last_idx,'Air Time']
        playlist.loc[last_idx, 'Duration'] = delta.seconds / 60
    
    return playlist

In [14]:
def add_musicbrainz_data(playlist, min_year = 1900, cache_file = None):
    """
    Add data from the musicbrainz database.  Currently just first year of publication.
    The input data frame should contain at least Title and Artist fields
    and the resulting dataframe will have a new Year field.
    The cache file if used, should have been generated by a previous run of
    this function.
    Using a cache is strongly encouraged,
    as the MusicBrainz search interface is rate limited to one search per second
    so this can be very slow for large playlists.
    
    Args:
        playlist (Dataframe) : playlist to update
        min_year (int)       : miminum year to consider
        cache_file (string)  : path to cache file
         
    Returns:
        Dataframe containing the augmented playlist
    """
    import musicbrainzngs as mb
    mb.set_useragent('xpn-a2z', '0.1','https://github.com/asudell/a2z')
    
    # keep a list of artists named differently
    # at MusicBrainz than XPN, so we can 'fix' them
    artist_names = {
        "R. E. M.": "REM",
        "Run-DMC": "Run-D.M.C.",
        "The Ramones": "Ramones"
    }
    
    # load the cache if we have one
    if cache_file is not None and path.exists(cache_file):
        years = pd.read_csv(cache_file)
        years = years.drop_duplicates()
    else:
        years = pd.DataFrame(None, columns=('Title','Artist', 'Year', 'Album'))
    
    augmented = playlist.merge(years, how = 'left')
    
    # Lookup any unaugmented rows
    new_mb_rows = []
    for index, row in augmented[augmented['Year'].isnull()].iterrows():
        if row['Artist'] in artist_names:
            artist = artist_names[row['Artist']]
        else:
            artist = row['Artist']
        result = mb.search_recordings(row['Title'],
                                      artist = artist,
                                      status = 'official',
                                      strict = True,
                                      limit = 25)
        rel_year = None
        album_name = None
        for recording in result['recording-list']:
            if recording['release-list']:
                for release in recording['release-list']:
                    if 'date' in release and len(release['date']) > 0:
                        y = int(release['date'].split('-')[0])
                        if rel_year is None or rel_year > y:
                            if y >= min_year:
                                # assume years before 1900 are typos
                                rel_year = y
                                if release[]
        if rel_year is not None:
            new_mb_rows.append([row['Title'], row['Artist'], rel_year])
    
    new_years = pd.DataFrame(new_mb_rows, columns=('Title','Artist', 'Year'))
    # if we found new data, resave the cache and rebuild the augmented data
    if len(new_years) > 0:
        years = years.append(new_years, ignore_index=True)
        years = years.drop_duplicates()
        if cache_file is not None:
            years.to_csv(cache_file, index=False, encoding='utf-8')
        augmented = playlist.merge(years, how = 'left')
    
    return augmented