forked from mopidy/mopidy
/
playlists.py
133 lines (110 loc) · 4.8 KB
/
playlists.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
from __future__ import absolute_import, division, unicode_literals
import glob
import logging
import os
import sys
from mopidy import backend
from mopidy.models import Playlist
from .translator import local_playlist_uri_to_path, path_to_local_playlist_uri
from .translator import parse_m3u
logger = logging.getLogger(__name__)
class LocalPlaylistsProvider(backend.PlaylistsProvider):
def __init__(self, *args, **kwargs):
super(LocalPlaylistsProvider, self).__init__(*args, **kwargs)
self._media_dir = self.backend.config['local']['media_dir']
self._playlists_dir = self.backend.config['local']['playlists_dir']
self.refresh()
def create(self, name):
playlist = self._save_m3u(Playlist(name=name))
old_playlist = self.lookup(playlist.uri)
if old_playlist is not None:
index = self._playlists.index(old_playlist)
self._playlists[index] = playlist
else:
self._playlists.append(playlist)
logger.info('Created playlist %s', playlist.uri)
return playlist
def delete(self, uri):
playlist = self.lookup(uri)
if not playlist:
logger.warn('Trying to delete unknown playlist %s', uri)
return
path = local_playlist_uri_to_path(uri, self._playlists_dir)
if os.path.exists(path):
os.remove(path)
else:
logger.warn('Trying to delete missing playlist file %s', path)
self._playlists.remove(playlist)
def lookup(self, uri):
# TODO: store as {uri: playlist}?
for playlist in self._playlists:
if playlist.uri == uri:
return playlist
def refresh(self):
playlists = []
encoding = sys.getfilesystemencoding()
for path in glob.glob(os.path.join(self._playlists_dir, b'*.m3u')):
relpath = os.path.basename(path)
name = os.path.splitext(relpath)[0].decode(encoding)
uri = path_to_local_playlist_uri(relpath)
tracks = []
for track in parse_m3u(path, self._media_dir):
tracks.append(track)
playlist = Playlist(uri=uri, name=name, tracks=tracks)
playlists.append(playlist)
self.playlists = playlists
logger.info(
'Loaded %d local playlists from %s',
len(playlists), self._playlists_dir)
def save(self, playlist):
assert playlist.uri, 'Cannot save playlist without URI'
uri = playlist.uri
# TODO: require existing (created) playlist - currently, this
# is a *should* in https://docs.mopidy.com/en/latest/api/core/
try:
index = self._playlists.index(self.lookup(uri))
except ValueError:
logger.warn('Saving playlist with new URI %s', uri)
index = -1
playlist = self._save_m3u(playlist)
if index >= 0 and uri != playlist.uri:
path = local_playlist_uri_to_path(uri, self._playlists_dir)
if os.path.exists(path):
os.remove(path)
else:
logger.warn('Trying to delete missing playlist file %s', path)
if index >= 0:
self._playlists[index] = playlist
else:
self._playlists.append(playlist)
return playlist
def _write_m3u_extinf(self, file_handle, track):
title = track.name.encode('latin-1', 'replace')
runtime = track.length // 1000 if track.length else -1
file_handle.write('#EXTINF:' + str(runtime) + ',' + title + '\n')
def _sanitize_m3u_name(self, name, encoding=sys.getfilesystemencoding()):
name = name.encode(encoding, errors='replace')
name = os.path.basename(name)
name = name.decode(encoding)
return name
def _save_m3u(self, playlist, encoding=sys.getfilesystemencoding()):
if playlist.name:
name = self._sanitize_m3u_name(playlist.name, encoding)
uri = path_to_local_playlist_uri(name.encode(encoding) + b'.m3u')
path = local_playlist_uri_to_path(uri, self._playlists_dir)
elif playlist.uri:
uri = playlist.uri
path = local_playlist_uri_to_path(uri, self._playlists_dir)
name, _ = os.path.splitext(os.path.basename(path).decode(encoding))
else:
raise ValueError('M3U playlist needs name or URI')
extended = any(track.name for track in playlist.tracks)
with open(path, 'w') as file_handle:
if extended:
file_handle.write('#EXTM3U\n')
for track in playlist.tracks:
if extended and track.name:
self._write_m3u_extinf(file_handle, track)
file_handle.write(track.uri + '\n')
# assert playlist name matches file name/uri
return playlist.copy(uri=uri, name=name)