-
Notifications
You must be signed in to change notification settings - Fork 1
/
app.py
100 lines (76 loc) · 2.69 KB
/
app.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
from time import time
from pytube import YouTube
from threading import Thread
from urllib.error import URLError
from cleaner import clear_directory
from loggers import error_logger, info_logger
from pytube.exceptions import RegexMatchError, AgeRestrictedError
from flask import Flask, render_template, request, send_file, redirect, url_for
# App instance
app = Flask(__name__)
app.secret_key = b'_5#y2L"F4Q8z\n\xec]/'
# Automatically clean the /temp
cleaner_thread = Thread(target=clear_directory)
# Index
@app.route('/')
def index():
message = request.args.get('message') or ""
return render_template('index.html', message=message)
# Get download streams
def get_streams(link: str):
# Instance pytube
pytube = YouTube(link)
# Video Streams
video_streams = pytube.streams.filter(type='video', progressive=True).order_by("resolution").desc()
# Audio Streams
audio_streams = pytube.streams.filter(only_audio=True, subtype='mp4').order_by("abr").desc()
# Return streams
return video_streams, audio_streams
# Stream select endpoint
@app.route('/streams', methods=['POST'])
def streams():
# Video link
link = request.form['link']
try:
# Get streams
video_streams, audio_streams = get_streams(link)
return render_template('streams.html', link=link, video_streams=video_streams, audio_streams=audio_streams)
# Input problem
except RegexMatchError:
message = 'Invalid link!'
# Connection Problem
except URLError:
error_logger.exception('URLError: Connection problem!')
message = 'Connection problem!'
# Age Restricted Video
except AgeRestrictedError:
message = 'Age restricted videos cannot be downloaded!'
return redirect(url_for('index', message=message))
# Download endpoint (Local)
@app.route('/download', methods=['POST'])
def download():
# Stream index
itag = int(request.form['stream'])
# Video link
link = request.form['link']
# Get the stream
stream = YouTube(link).streams.get_by_itag(itag)
# Generate filename
filename = f'temp{round(time() * 1000)}.{stream.subtype if stream.type == "video" else "mp3"}'
# Download
stream.download(output_path='temp', filename=filename)
# Redirect
return render_template('download.html', filename=filename)
# Download endpoint (Browser)
@app.route('/download/<filename>')
def downloadFile(filename):
# Download the file (browser)
return send_file( f'temp/{filename}', as_attachment=True)
# Scope verification
if __name__ == "__main__":
# Start the cleaner
cleaner_thread.start()
# Start the app
app.run()
# Join cleaner before end
cleaner_thread.join()