Skip to content


Subversion checkout URL

You can clone with HTTPS or Subversion.

Download ZIP
branch: master
Fetching contributors…

Cannot retrieve contributors at this time

executable file 241 lines (184 sloc) 8.112 kb
#!/usr/bin/env python
# coding: utf-8
import os
import re
import sys
from subprocess import Popen, PIPE
from flask import (Flask, request, render_template, jsonify, abort, redirect,
Response, url_for)
from flask.ext.login import (LoginManager, UserMixin, current_user,
login_required, login_user)
from flask.ext.browserid import BrowserID
from wsgi_utils import PipeWrapper
from models import Track, Album, db
app = Flask(__name__)
DEBUG=(True if os.environ.get('DEBUG') in ['1', 'True'] else False),
NO_LOGIN=(True if os.environ.get('NO_LOGIN') in ['1', 'True'] else False),
PORT=int(os.environ.get('PORT', 5000)),
SQLALCHEMY_DATABASE_URI=(os.environ.get('DB_URI', 'sqlite:///tracks.db')),
MUSIC_DIR=(os.environ.get('MUSIC_DIR', 'static/music')),
ADMIN_EMAIL=(os.environ.get('ADMIN_EMAIL', None)),
# Insecure, from the Flask manual - for testing and development only.
# Checked below to make sure it's not the default in production.
app.secret_key = os.environ.get('SECRET_KEY', DEFAULT_SECRET_KEY)
class User(UserMixin):
def __init__(self, user_id):
self.user_id = user_id
self.admin = (self.user_id == app.config['ADMIN_EMAIL'])
def get_id(self):
return unicode(self.user_id)
def get_user_by_id(user_id):
return User(user_id)
def get_user(resp):
""" Return a User object based on a BrowserID response. """
if resp['status'] != 'okay':
return None # Login failed for some reason
# If an admin email is set, and the BrowserID login doesn't match,
# deny access.
if (app.config['ADMIN_EMAIL'] and
app.config['ADMIN_EMAIL'] != resp['email']):
return None
return User(resp['email']) # Either admin, or anyone is allowed.
login_manager = LoginManager()
login_manager.login_view = "login_view"
browser_id = BrowserID()
def search_results():
""" Perform a general search encompassing artist, track, albums. """
search_term = request.args.get('q', '')
# split search term into up to 10 tokens (anything further is ignored)
tokens = filter(None, re.split('\s+', search_term))[:10]
filters = [Track.title.contains(token) | Track.artist.contains(token)
for token in tokens]
track_results = Track.query.filter(*filters).limit(30).all()
album_filters = [Album.title.contains(token) |
Album.artist.contains(token) for token in tokens]
album_results = Album.query.filter(*album_filters).limit(10).all()
tracks_to_include = set([ for t in track_results])
albums_to_include = set([t.album_id for t in track_results])
albums_to_include.discard(None) # Whoops, some tracks aren't on an album.
albums_to_include |= set([ for a in album_results])
tracks_to_include |= set(
[t for a in album_results for t in a.serialize['track_ids']]
search_results = [['album',] for a in album_results]
search_results += [['track',] for t in track_results]
response = {
'albums': [Album.query.filter( == a).one().serialize for a in albums_to_include],
'tracks': [Track.query.filter( == t).one().serialize for t in tracks_to_include],
'search_results': search_results
return jsonify(response)
def get_artists():
# Return artists lexicographically after 'start', if provided.
# TODO: Come up with a solution for artists who only have non-album
# tracks. For now, this only returns artists who have albums.
start = request.args.get('start', '')
limit = request.args.get('limit', 30)
albums = (Album.query.filter(Album.artist > start).group_by(Album.artist)
return jsonify(objects=[a.artist for a in albums])
def get_artist_albums(artist):
# Return a list of an artist's albums.
# TODO: Come up with a solution for surfacing non-album tracks as well.
albums = (Album.query.filter(Album.artist == artist)
return jsonify(objects=[a.serialize for a in albums])
def get_album(album_id):
""" Given an album ID, return its info, with a "tracks" attribute added
that lists all the tracks. """
album = Album.query.filter_by(id=album_id).first()
if album is None:
tracks = Track.query.filter_by(album_id=album_id)\
response = album.serialize
response['tracks'] = [t.serialize for t in tracks]
return jsonify(response)
def get_album_art(album_id):
album = Album.query.filter_by(id=album_id).first()
if album is None or album.cover_art is None:
return redirect(os.path.join('/' + app.config['MUSIC_DIR'],
def get_track(track_id):
track = Track.query.filter_by(id=track_id).first()
if track is None:
return jsonify(track.serialize)
def get_track_audio(track_id, wanted_formats):
""" Get a track's audio.
If `wanted_formats` (a comma-separated list) includes the file's actual
format, a redirect is sent (so the static file can be handled as such).
Otherwise, if `wanted_formats` includes ogg, it's transcoded on the fly.
TRANSCODABLE_FORMATS = ['mp3', 'ogg', 'flac', 'm4a', 'wav']
wanted_formats = re.split(',', wanted_formats)
track = Track.query.filter_by(id=track_id).first()
if track is None:
actual_format ='\.([^.]+)$', track.filename).group(1)
if actual_format in wanted_formats:
# No need to transcode. Just redirect to the static file.
return redirect(os.path.join('/' + app.config['MUSIC_DIR'],
if (actual_format not in TRANSCODABLE_FORMATS
or 'ogg' not in wanted_formats):
# Can't transcode this. We only go from TRANSCODABLE_FORMATS to ogg.
# Transcode to ogg.
# Note that track.filename came out of the DB and is *not* user-specified
# (through the web interface), so can be trusted.
command = ['avconv', '-v', 'quiet',
'-i', os.path.join(app.config['MUSIC_DIR'], track.filename),
'-f', 'ogg', '-acodec', 'libvorbis', '-aq', '5', '-']
pipe = Popen(command, stdout=PIPE)
return Response(PipeWrapper(pipe),
mimetype='audio/ogg', direct_passthrough=True)
def front_page():
return render_template('app.html', nologin=app.config['NO_LOGIN'])
def login_view():
if app.config['NO_LOGIN']:
# Log the user in as a fake user object, to bypass the actual login
# screen.
# FIXME: If you restart the server without NO_LOGIN, we should
# invalidate "" users — currently they can still
# reuse the session.
if current_user.is_authenticated():
return redirect(url_for('front_page'))
return render_template('login.html')
def check_secret_key():
if app.secret_key == DEFAULT_SECRET_KEY and not app.config['DEBUG']:
sys.stderr.write("Error: You need to specify a SECRET_KEY\n")
if __name__ == '__main__':
Jump to Line
Something went wrong with that request. Please try again.