-
Notifications
You must be signed in to change notification settings - Fork 0
/
twitch_archiver.py
171 lines (154 loc) · 7.26 KB
/
twitch_archiver.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
import asyncio, time, os, logging
from streamlink import Streamlink, PluginError
class Stream:
_url = None
_session = None
_stream = None
_title = None
_filepath = None
is_live = False
def __init__(self, url) -> None:
self._url = url
self._session = self.setSession()
def setSession(self):
session = Streamlink()
if config["oauth_token"] != "":
session.set_option("api-header", {"Authorization":f'OAuth {config["oauth_token"]}'})
#session.set_plugin_option("twitch", "record-reruns", config["record_reruns"]) // currently accidentally removed from streamlink twitch plugin
session.set_option("disable-hosting", config["disable_hosting"])
session.set_option("disable-ads", config["disable_ads"])
return session
async def setStream(self):
log.info("waiting for stream to go live")
streamformats = self._session.streams(self._url)
while len(streamformats) == 0 and streamformats.get("best", None) == None:
await asyncio.sleep(5)
streamformats = self._session.streams(self._url)
self._stream = streamformats["best"].open()
self.is_live = True
log.info("stream is live")
return
async def setTitle(self):
log.info("attempting to resolve stream title")
while (title := self._session.resolve_url(self._url)[1](self._session, self._url).get_title()) == None:
# .resolve_url() instantiates a new plugin.Twitch class, returns a tuple(str, type(Plugin), str)
# .get_title() returns the (re?)initialised title metadata from the (new) plugin.Twitch class
await asyncio.sleep(5)
self._title = self._sanitiseString(title)
log.info("resolved stream title")
self._updateFilepath()
return
def updateTitle(self, title):
self._title = title
self._updateFilepath()
# because _updateFilepath() assumes that the filepath being updated ends in 'live.ts'
# updateTitle() should only be called if setTitle() fails, this means that updateTitle()
# should only ever be called once per instance of the Stream class
return
def setFilepath(self, config):
directory, streamer, date = config["out_dir"], config["streamer"], self._sanitiseString(time.strftime(config["time_format"]))
self._filepath = f'{directory}{streamer}_{date}.live'
return
def _updateFilepath(self):
new_filepath = f'{self._filepath[:-5]}_{self._title}.ts'
# [:-5] slices '.live' from the end of the temporary filename
while True:
try:
if os.path.exists(new_filepath):
raise FileExistsError(f"'{new_filepath}' already exists")
os.rename(self._filepath, new_filepath)
break
except FileExistsError as message:
log.warning(message)
log.info("appending current time to filepath")
new_filepath = f'{self._filepath[:-5]}_{self._title}_{time.strftime("%H-%M-%S")}.ts'
log.info("renamed '%s' to '%s'", self._filepath, new_filepath)
self._filepath = new_filepath
return
def _sanitiseString(self, input) -> str:
forbiddenchars = r'<>:"/\|!?*'
input = "".join(char for char in input if char not in forbiddenchars)
input = input.strip()
return input
async def checkIsLive(self, timeout):
while True:
await asyncio.sleep(timeout)
if len(self._session.streams(self._url)) != 0:
self.is_live = True
else:
self.is_live = False
async def writeToFile(self):
try:
data = self._stream.read(1024)
with open(self._filepath, "ab") as vod:
vod.write(data)
except OSError as message:
# OSError '[Errno 12] not enough space' will crash system if allowed to loop
# forcing an exit with success code 0 allows systemd services to restart only if exiting on failure
if("Errno 12" in message):
log.critical(message)
exit(0)
# self._stream.read() raises `OSError("Read Timeout")` rarely on stream ended
log.error(message)
self.is_live = False
return
def setConfig(config_file):
with open(config_file, 'r') as f:
lines = f.readlines()
config = {}
for line in lines:
if line.startswith('#') == False and len(line.strip()) > 0 :
line = line.split('=')
config.update({line[0].strip():line[1].strip()})
return config
config = setConfig(r'./twitch_archiver.config')
logging.basicConfig(level=config["log_level"], format='%(asctime)s [%(name)s] [%(levelname)s] %(message)s', datefmt="%Y-%m-%d %H:%M:%S")
log = logging.getLogger()
for option in config:
if config[option] is config["oauth_token"]: value = f'{config["oauth_token"][:6]}{(max(0, len(config["oauth_token"]))-6)*"*"}'
else: value = config[option]
log.info("config.%s=%s", option, value)
if os.path.isdir(config["out_dir"]) == False:
message = "'out_dir' in twitch_archiver.config is not a directory or does not exist"
log.critical(message)
raise Exception(message)
if config["streamer"] == "":
message = "'streamer' not set in twitch_archiver.config"
log.critical(message)
raise Exception(message)
url = f'https://twitch.tv/{config["streamer"]}'
async def mainloop():
while True:
stream = Stream(url)
# delayed initialisation that doesn't fit neatly into Stream.__init__() due to async shenanigans,
# once the `Stream._stream` file object property has been set, parallel tasks are utilised to set
# additional properties without 'blocking' the event handler, this allows writeToFile() to start
# recording as soon as possible in relation to the start of the stream
try:
await stream.setStream()
except PluginError as message:
# setStream() raises PluginError on reconnect from internet failure, very strange behaviour
log.warning(message)
log.info("reinitialising 'Stream' class")
continue
stream.setFilepath(config)
fetch_title = asyncio.create_task(stream.setTitle())
fetch_is_live = asyncio.create_task(stream.checkIsLive(30))
log.info("writing stream to '%s'", stream._filepath)
while stream.is_live:
await stream.writeToFile()
await asyncio.sleep(0)
# asyncio runs on a single thread so without the previous line writeToFile() would always have the
# highest priority in the event handler, effectively blocking other tasks from executing
log.info("stream ended")
stream._stream.close()
# task handling once the stream has concluded to prevent current loop's Stream class properties
# from interacting with the next loop's as of yet unset properties
while fetch_is_live.cancel():
await asyncio.sleep(0)
stream.is_live = False
while fetch_title.cancel():
await asyncio.sleep(0)
log.error("unable to retrieve stream title")
stream.updateTitle("title-error")
asyncio.run(mainloop())