-
Notifications
You must be signed in to change notification settings - Fork 112
/
ratebypass.py
449 lines (377 loc) · 15.2 KB
/
ratebypass.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
# -*- coding: utf-8 -*-
"""
Copyright (C) 2012-2021 https://github.com/pytube/pytube/
SPDX-License-Identifier: Unlicense
Copyright (C) 2021 plugin.video.youtube
SPDX-License-Identifier: GPL-2.0-only
See LICENSES/GPL-2.0-only for more information.
"""
import re
from copy import copy
try:
from ....kodion import logger
except:
class logger:
@staticmethod
def log_debug(txt):
print(txt)
def throttling_reverse(arr):
"""Reverses the input list.
Needs to do an in-place reversal so that the passed list gets changed.
To accomplish this, we create a reversed copy, and then change each
indvidual element.
"""
reverse_copy = copy(arr)[::-1]
for i in range(len(reverse_copy)):
arr[i] = reverse_copy[i]
def throttling_push(d, e):
"""Pushes an element onto a list."""
d.append(e)
def throttling_mod_func(d, e):
"""Perform the modular function from the throttling array functions.
In the javascript, the modular operation is as follows:
e = (e % d.length + d.length) % d.length
We simply translate this to python here.
"""
return (e % len(d) + len(d)) % len(d)
def throttling_unshift(d, e):
"""Rotates the elements of the list to the right.
In the javascript, the operation is as follows:
for(e=(e%d.length+d.length)%d.length;e--;)d.unshift(d.pop())
"""
e = throttling_mod_func(d, e)
new_arr = d[-e:] + d[:-e]
del d[:]
for el in new_arr:
d.append(el)
def throttling_cipher_function(d, e):
"""This ciphers d with e to generate a new list.
In the javascript, the operation is as follows:
var h = [A-Za-z0-9-_], f = 96; // simplified from switch-case loop
d.forEach(
function(l,m,n){
this.push(
n[m]=h[
(h.indexOf(l)-h.indexOf(this[m])+m-32+f--)%h.length
]
)
},
e.split("")
)
"""
h = list('ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789-_')
f = 96
# by naming it "this" we can more closely reflect the js
this = list(e)
# This is so we don't run into weirdness with enumerate while
# we change the input list
copied_list = copy(d)
for m, l in enumerate(copied_list):
bracket_val = (h.index(l) - h.index(this[m]) + m - 32 + f) % len(h)
this.append(
h[bracket_val]
)
d[m] = h[bracket_val]
f -= 1
def throttling_nested_splice(d, e):
"""Nested splice function in throttling js.
In the javascript, the operation is as follows:
function(d,e){
e=(e%d.length+d.length)%d.length;
d.splice(
0,
1,
d.splice(
e,
1,
d[0]
)[0]
)
}
While testing, all this seemed to do is swap element 0 and e,
but the actual process is preserved in case there was an edge
case that was not considered.
"""
e = throttling_mod_func(d, e)
inner_splice = js_splice(
d,
e,
1,
d[0]
)
js_splice(
d,
0,
1,
inner_splice[0]
)
def throttling_prepend(d, e):
"""
In the javascript, the operation is as follows:
function(d,e){
e=(e%d.length+d.length)%d.length;
d.splice(-e).reverse().forEach(
function(f){
d.unshift(f)
}
)
}
Effectively, this moves the last e elements of d to the beginning.
"""
start_len = len(d)
# First, calculate e
e = throttling_mod_func(d, e)
# Then do the prepending
new_arr = d[-e:] + d[:-e]
# And update the input list
del d[:]
for el in new_arr:
d.append(el)
end_len = len(d)
assert start_len == end_len
def throttling_swap(d, e):
"""Swap positions of the 0'th and e'th elements in-place."""
e = throttling_mod_func(d, e)
f = d[0]
d[0] = d[e]
d[e] = f
def js_splice(arr, start, delete_count=None, *items):
"""Implementation of javascript's splice function.
:param list arr:
Array to splice
:param int start:
Index at which to start changing the array
:param int delete_count:
Number of elements to delete from the array
:param *items:
Items to add to the array
Reference: https://developer.mozilla.org/en-US/docs/Web/JavaScript/Reference/Global_Objects/Array/splice # noqa:E501
"""
# Special conditions for start value
try:
if start > len(arr):
start = len(arr)
# If start is negative, count backwards from end
if start < 0:
start = len(arr) - start
except TypeError:
# Non-integer start values are treated as 0 in js
start = 0
# Special condition when delete_count is greater than remaining elements
if not delete_count or delete_count >= len(arr) - start:
delete_count = len(arr) - start # noqa: N806
deleted_elements = arr[start:start + delete_count]
# Splice appropriately.
new_arr = arr[:start] + list(items) + arr[start + delete_count:]
# Replace contents of input array
del arr[:]
for el in new_arr:
arr.append(el)
return deleted_elements
def throttling_splice(d, e):
"""Splices array 'd' with remapped start index e.
From this code: function(d,e){e=(e%d.length+d.length)%d.length;d.splice(e,1)}
"""
e = throttling_mod_func(d, e)
return js_splice(d, e, 1)
class CalculateN:
# References:
# https://github.com/ytdl-org/youtube-dl/issues/29326#issuecomment-894619419
# https://github.com/pytube/pytube/blob/fc9aec5c35829f2ebb4ef8dd599b14a666850d20/pytube/cipher.py
# To maintainers: it might be necessary to add more function patterns (and implementations)
# in the future as the "base.js" player code gets changed and updated.
MAPPING_FUNC_PATTERNS = (
(r"{for\(\w=\(\w%\w\.length\+\w\.length\)%\w\.length;\w--;\)\w\.unshift\(\w.pop\(\)\)}", throttling_unshift), # noqa:E501
(r"{\w\.reverse\(\)}", throttling_reverse),
(r"{\w\.push\(\w\)}", throttling_push),
(r";var\s\w=\w\[0\];\w\[0\]=\w\[\w\];\w\[\w\]=\w}", throttling_swap),
(r"case\s\d+", throttling_cipher_function),
(r"\w\.splice\(0,1,\w\.splice\(\w,1,\w\[0\]\)\[0\]\)", throttling_nested_splice), # noqa:E501
(r";\w\.splice\(\w,1\)}", throttling_splice),
(r"\w\.splice\(-\w\)\.reverse\(\)\.forEach\(function\(\w\){\w\.unshift\(\w\)}\)", throttling_prepend), # noqa:E501
(r"for\(var \w=\w\.length;\w;\)\w\.push\(\w\.splice\(--\w,1\)\[0\]\)}", throttling_reverse), # noqa:E501
)
def __init__(self, js):
self.initial_n_list = [ ]
raw_code = self.get_throttling_function_code(js).replace('\n', '')
self.throttling_plan = self.get_throttling_plan(raw_code)
self.throttling_array = self.get_throttling_function_array(self.initial_n_list, raw_code)
@staticmethod
def get_throttling_function_code(js):
"""Extract the raw code for the throttling function.
:param str js:
The contents of the base.js asset file.
:rtype
:returns:
The full string code of the function, in the form "...=function(...){...};"
Note: might include whitespace in the body of the function.
"""
# Only present in the throttle function code.
fiduciary_index = js.find('enhanced_except_')
if fiduciary_index == -1:
logger.log_debug('ratebypass: fiduciary_index not found')
return ''
start_index = js.rfind('=function(', 0, fiduciary_index)
start_index = js.rfind(';', 0, start_index)
if start_index == -1:
logger.log_debug('ratebypass: function code start not found')
return ''
else:
# Skip the semicolon that this index points to.
start_index += 1
end_pattern = '};'
end_index = js.find(end_pattern, fiduciary_index)
if end_index == -1:
logger.log_debug('ratebypass: function code end not found')
return ''
else:
# Skip ahead to include the end pattern in the output string.
end_index += len(end_pattern)
return js[start_index:end_index].strip()
@staticmethod
def get_throttling_plan(raw_code):
"""Extract the "throttling plan".
The "throttling plan" is the list of commands executed to unscramble the n value.
These commands reference items from the "c" array.
:param str raw_code:
The response from get_throttling_function_code(js).
:returns:
A list of tuples, where the first element of each tuple is the
index of the function to call, the remaining elements are arguments to
pass to that function.
"""
plan_start_pattern = r"try{"
plan_start_index = raw_code.find(plan_start_pattern)
if plan_start_index == -1:
logger.log_debug('ratebypass: command block start not found')
return
else:
# Skip the whole start pattern, it's not needed.
plan_start_index += len(plan_start_pattern)
plan_end_pattern = '}'
plan_end_index = raw_code.find(plan_end_pattern, plan_start_index)
if plan_end_index == -1:
logger.log_debug('ratebypass: command block end not found')
return
plan_raw = raw_code[plan_start_index:plan_end_index]
# So far, these commands are either c[x](c[y]) or c[x](c[y],c[z]).
return [
# The following will split a single command of the form
# "c[x](c[y], c[z], ...)" into ['x', 'y', 'z', ...], then convert it
# to a tuple of ints.
tuple(map(int, command[:-1].replace('c[', '').replace(']', '')
.replace('(', ',').split(',')))
for command in plan_raw.split('),')
]
@staticmethod
def array_reverse_split_gen(raw_array):
""" Iterates the comma-split pieces of the stringified list in reverse,
joining pieces that are part of the same longer object that might
have comma characters inside.
:param str raw_array:
The "c" array string, without enclosing brackets.
:returns:
Generates the elements of the stringified array in REVERSE order.
The caller is responsible for reversing it back to normal.
"""
accumulator = None
for piece in reversed(raw_array.split(',')):
if piece.startswith('function') or piece[0] == '"' or piece[0] == "'":
# When the piece starts with "function" or a quote char, yield
# what has been accumulated so far, if anything.
if accumulator:
yield piece + ',' + accumulator
accumulator = None
else:
yield piece
elif piece.endswith('}') or piece[-1] == '"' or piece[-1] == "'":
# When the piece ends with a curly bracket or quote char but
# didn't start with "function" or a quote char, start
# accumulating with the next pieces until it's closed.
accumulator = piece
else:
if accumulator:
accumulator = piece + ',' + accumulator
else:
yield piece
@classmethod
def get_throttling_function_array(cls, initial_n_list, raw_code):
"""Extract the "c" array that comes with values and functions
used to unscramble the initial n value.
:param list initial_list_func:
MUTABLE list of the characters of the "initial n" parameter.
:param str raw_code:
The response from get_throttling_function_code(js).
:returns:
The array of various integers, arrays, and functions.
"""
array_start_pattern = ",c=["
array_start_index = raw_code.find(array_start_pattern)
if array_start_index == -1:
logger.log_debug('ratebypass: "c" array pattern not found')
return [ ]
else:
array_start_index += len(array_start_pattern)
array_end_pattern = '];'
array_end_index = raw_code.rfind(array_end_pattern)
if array_end_index == -1:
logger.log_debug('ratebypass: "c" array end not found')
return [ ]
raw_array = raw_code[array_start_index:array_end_index]
str_array = tuple(cls.array_reverse_split_gen(raw_array))[::-1]
#logger.log_debug('STR_ARRAY:\n' + '\n'.join(str(i)+' >'+c+'<' for i,c in enumerate(str_array))+'\n\n')
converted_array = [ ]
for tempindex, el in enumerate(str_array):
try:
converted_array.append(int(el))
continue
except ValueError:
# Not an integer value.
pass
if el == 'null':
# As per the JavaScript, replace null elements in this array
# with a reference to the array itself.
converted_array.append(converted_array)
continue
if el[0] == '"' and el[-1] == '"':
# Convert e.g. '"abcdef"' to string without quotation marks, 'abcdef'
converted_array.append(el.strip('\'"'))
continue
if el.startswith('function'):
found = False
for pattern, fn in cls.MAPPING_FUNC_PATTERNS:
if re.search(pattern, el):
converted_array.append(fn)
found = True
break
else:
logger.log_debug('ratebypass: mapping function not yet listed: {unknown_func}'
.format(unknown_func=el))
if found:
continue
# Probably the single "b" references (references to the initial 'n'
# value as a MUTABLE list of characters). Append a reference to the
# list sent by the caller.
converted_array.append(initial_n_list)
return converted_array
def calculate_n(self, initial_n):
if not self.throttling_plan or not self.throttling_array:
return ''
logger.log_debug('Attempting to calculate `n` from initial: {initial_n}'
.format(initial_n=''.join(initial_n)))
# Clear (in-place) and refill this list with the characters from 'initial_n'.
# Note that references to this list is already in self.throttling_array.
del self.initial_n_list[:] # https://stackoverflow.com/a/30087221
self.initial_n_list.extend(initial_n)
# For each step in the plan, get the first item of the step as the
# index of the function to call, and use the remaining items as
# arguments.
for step in self.throttling_plan:
step_iter = iter(step)
curr_func = self.throttling_array[next(step_iter)]
args = (self.throttling_array[i] for i in step_iter)
curr_func(*args)
calculated_n = ''.join(self.initial_n_list)
logger.log_debug('Calculated `n`: {calculated_n}'
.format(calculated_n=calculated_n))
return calculated_n