-
Notifications
You must be signed in to change notification settings - Fork 3
/
common.py
227 lines (173 loc) · 7.85 KB
/
common.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
import functools
import logging
import re
import peewee
import youtube_dl
# set up the logger
log = logging.getLogger('ddmbot.database')
# database object
_database = peewee.SqliteDatabase(None, pragmas=[('journal_mode', 'WAL'), ('foreign_keys', 'ON')])
class DdmBotSchema(peewee.Model):
class Meta:
database = _database
# Class to store timestamp in the database
class CreditTimestamp(DdmBotSchema):
last = peewee.DateTimeField()
# Class representing a song table in the database
class Song(DdmBotSchema):
# we use integer primary keys to represent songs in the database
id = peewee.PrimaryKeyField()
# song unique URI for consistent lookup and search
uuri = peewee.CharField(index=True, unique=True)
# title can be changed eventually
title = peewee.CharField()
# constrains
duration = peewee.IntegerField()
is_blacklisted = peewee.BooleanField(default=False)
# overplaying protection
last_played = peewee.DateTimeField()
credit_count = peewee.IntegerField()
# automatic playlist
listener_count = peewee.IntegerField(default=0)
skip_vote_count = peewee.IntegerField(default=0)
has_failed = peewee.BooleanField(default=False)
# song may be duplicated using multiple sources
duplicate = peewee.ForeignKeyField('self', null=True)
# we will need this to resolve a foreign key loop
DeferredUser = peewee.DeferredRelation()
DeferredLink = peewee.DeferredRelation()
# Table for storing playlists, as many as user wants
class Playlist(DdmBotSchema):
id = peewee.PrimaryKeyField()
# playlist is owned by a user
user = peewee.ForeignKeyField(DeferredUser)
# for an identifier, we choose a "nice enough" name
name = peewee.CharField()
# the first song of the playlist
head = peewee.ForeignKeyField(DeferredLink, null=True, default=None)
# playlist may be set to repeat itself, this is default except to implicit one
repeat = peewee.BooleanField(default=True)
class Meta:
# we want the couple (user, name) to be unique (so no user has two playlists with the same name)
constraints = [peewee.SQL('UNIQUE(user_id, name)')]
# Table for storing songs in playlist -- linked list approach
class Link(DdmBotSchema):
id = peewee.PrimaryKeyField()
playlist = peewee.ForeignKeyField(Playlist)
song = peewee.ForeignKeyField(Song)
next = peewee.ForeignKeyField('self', null=True)
DeferredLink.set_model(Link)
# Finally, table for storing information about users
class User(DdmBotSchema):
# we will re-use discord snowflakes (64-bit integers) as primary keys
id = peewee.BigIntegerField(primary_key=True)
# not everyone has to have a playlist
active_playlist = peewee.ForeignKeyField(Playlist, null=True, default=None)
play_count = peewee.IntegerField(default=0)
listen_count = peewee.IntegerField(default=0)
# for checking if the user should be ignored by the ddmbot
is_ignored = peewee.BooleanField(default=False)
DeferredUser.set_model(User)
# Model to retrieve failed foreign key constrains
class ForeignKeyCheckModel(DdmBotSchema):
table = peewee.CharField()
rowid = peewee.BigIntegerField()
parent = peewee.CharField()
fkid = peewee.IntegerField()
class DBInterface:
def __init__(self, loop):
if _database.is_closed():
raise RuntimeError('Database must be initialized and opened before instantiating interfaces')
self._loop = loop
self._database = _database
# decorator for DBInterface methods
def in_executor(method):
def wrapped_method(self, *args, **kwargs):
func = functools.partial(method, self, *args, **kwargs)
return self._loop.run_in_executor(None, func)
return wrapped_method
class DBSongUtil:
# some class (static) constant variables
_yt_regex = re.compile(r'^(https?://)?(www\.)?youtu(\.be/|be.com/.+?[?&]v=)(?P<id>[a-zA-Z0-9_-]+)')
_sc_regex = re.compile(r'^(https?://)?soundcloud.com/(?P<artist>[^/]+)/(?P<track>[^/?]+)')
_bc_regex = re.compile(r'^(https?://)?(?P<artist>[^.]+).bandcamp.com/track/(?P<track>[^/?]+)')
_list_regex = re.compile(
r'^(https?://)?(www\.youtube\.com/.*[?&]list=.+|soundcloud\.com/[^/]+/sets/.+|[^.:/]+\.bandcamp.com/album/.+)$')
_url_base = {'yt': 'https://www.youtube.com/watch?v={}',
'sc': 'https://soundcloud.com/{}/{}',
'bc': 'https://{}.bandcamp.com/track/{}'}
_ytdl = youtube_dl.YoutubeDL({'extract_flat': 'in_playlist', 'format': 'bestaudio/best', 'quiet': True,
'no_color': True})
@staticmethod
def _make_url(song_uuri):
uuri_parts = song_uuri.split(':')
return DBSongUtil._url_base[uuri_parts[0]].format(*uuri_parts[1:])
@staticmethod
def _is_list(input_url):
return DBSongUtil._list_regex.match(input_url) is not None
@staticmethod
def _make_uuri(song_url):
# makes unique URI from URLs suitable for database storage
# method will return URI in one of the following formats:
# yt:<youtube_id> for youtube video
# sc:<artist>:<track> for soundcloud
# bc:<artist>:<track> for bandcamp
match = DBSongUtil._yt_regex.match(song_url)
if match:
return 'yt:{}'.format(match.group('id'))
match = DBSongUtil._sc_regex.match(song_url)
if match:
return 'sc:{}:{}'.format(match.group('artist'), match.group('track'))
match = DBSongUtil._bc_regex.match(song_url)
if match:
return 'bc:{}:{}'.format(match.group('artist'), match.group('track'))
return None
class DBPlaylistUtil:
_playlist_regex = re.compile(r'^[a-zA-Z0-9_-]{1,32}$')
@staticmethod
def _get_playlist(user_id, playlist_name):
try:
playlist = Playlist.select().where(Playlist.user == user_id, Playlist.name == playlist_name).get()
except Playlist.DoesNotExist as e:
raise KeyError('You don\'t have a playlist called {}'.format(playlist_name)) from e
return playlist
@staticmethod
def _get_playlist_ex(user_id, *, playlist_name=None, create_default=False):
created = False
# if the name is given, we can get the playlist in a typical way
if playlist_name is not None:
return DBPlaylistUtil._get_playlist(user_id, playlist_name), created
# else we're gonna try to get an active playlist and possibly create it
with _database.atomic():
try:
playlist = Playlist.select(Playlist).join(User, on=(User.active_playlist == Playlist.id)) \
.where(User.id == user_id).get()
except Playlist.DoesNotExist as e:
if create_default and Playlist.select().where(Playlist.user == user_id).count() == 0:
playlist = Playlist.create(user=user_id, name='default', repeat=False)
User.update(active_playlist=playlist.id).where(User.id == user_id).execute()
created = True
else:
raise LookupError('You don\'t have an active playlist') from e
return playlist, created
#
# Function to initialize and open database connection to a given file
#
# Integrity check is performed.
#
def initialize(filename):
if not _database.is_closed():
raise RuntimeError('Database is opened already')
_database.init(filename)
_database.connect()
_database.create_tables([CreditTimestamp, Song, Playlist, Link, User], safe=True)
# check for the failed foreign key constrains
failed_query = ForeignKeyCheckModel.raw('PRAGMA foreign_key_check;')
if len(failed_query.execute()):
_database.close()
raise RuntimeError('Foreign key constrains check failed, database is corrupted and needs to be fixed')
#
# Function taking care of properly closing database
#
def close():
_database.close()