-
Notifications
You must be signed in to change notification settings - Fork 0
/
danmu_server.py
340 lines (310 loc) · 12.4 KB
/
danmu_server.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
from __future__ import unicode_literals
import socket
import time
import re
import requests
from bs4 import BeautifulSoup
import json
import pytz
from datetime import datetime
import sys
import asyncio
import websockets
import queue
from queue import Queue
import threading
from concurrent.futures import ThreadPoolExecutor
import sqlite3
import os
import io
import traceback
import ssl
import pathlib
import logging
logger = logging.getLogger('websockets')
logger.setLevel(logging.INFO)
logger.addHandler(logging.StreamHandler())
in_q = Queue()
connected = set()
history_barrages = list()
HISTORY_BARRAGES_MAX_SIZE = 24
IN_QUEUE_MAX_SIZE = 20
TABLE_NAME="barrage"
isExit = False
# 获取用户昵称及弹幕信息的正则表达式
danmu = re.compile(b'type@=chatmsg.*?/uid@=(.*?)/nn@=(.*?)/txt@=(.*?)/.*?/level@=(.*?)/.*?/cst@=(.*?)/bnn@=(.*?)/bl@=(.*?)/brid@=(.*?)/')
keeplivePattern = re.compile(b'type@=keeplive.*')
# 获取中国-上海时区
tz = pytz.timezone('Asia/Shanghai')
def sendmsg(client, msgstr):
'''
客户端向服务器发送请求的函数,集成发送协议头的功能
msgHead: 发送数据前的协议头,消息长度的两倍,及消息类型、加密字段和保密字段
使用while循环发送具体数据,保证将数据都发送出去
'''
msg = msgstr.encode('utf-8')
data_length = len(msg) + 8
code = 689
msgHead = int.to_bytes(data_length, 4, 'little') \
+ int.to_bytes(data_length, 4, 'little') + int.to_bytes(code, 4, 'little')
client.send(msgHead)
sent = 0
while sent < len(msg):
tn = client.send(msg[sent:])
sent = sent + tn
def opendb(roomid):
conn = sqlite3.connect('barrage_{0}.db'.format(roomid))
cursor = conn.cursor()
# cursor.execute('''drop table barrage''')
cursor.execute('''SELECT COUNT(*) FROM sqlite_master WHERE type='table' AND name = "barrage";''')
if cursor.fetchone()[0] == 0:
cursor.execute('''CREATE TABLE barrage
(id INTEGER PRIMARY KEY NOT NULL,
uid INTEGER NOT NULL,
nickname TEXT NOT NULL,
content TEXT NOT NULL,
level INTEGER,
bnn TEXT,
bl INTEGER,
brid INTEGER,
cst INTEGER);''')
cursor.execute('''create index idx_nickname on barrage (nickname)''')
text_file = 'barrage_{0}.txt'.format(roomid)
if os.path.exists(text_file):
with open(text_file, "r", encoding='utf-8') as f:
for line in f:
dmDict = json.loads(line)
cursor.execute('''INSERT INTO barrage (uid,nickname,content,level,bnn,bl,brid,cst) VALUES(?,?,?,?,?,?,?,?)''',
(cast_to_int(dmDict['uid']), dmDict['nickname'], dmDict['content'], cast_to_int(dmDict['level']), dmDict['bnn'], cast_to_int(dmDict['bl']), cast_to_int(dmDict['brid']), dmDict['cst']))
conn.commit()
return (conn, cursor)
def cast_to_int(str_value):
try:
return int(str_value)
except:
return 0
def start(client, roomid, in_q):
'''
发送登录验证请求后,获取服务器返回的弹幕信息,同时提取昵称及弹幕内容
登陆请求消息及入组消息末尾要加入\0
'''
global history_barrages
global isExit
msg = 'type@=loginreq/roomid@={}/\0'.format(roomid)
sendmsg(client, msg)
msg_more = 'type@=joingroup/rid@={}/gid@=-9999/\0'.format(roomid)
sendmsg(client, msg_more)
print('---------------欢迎连接到{}的直播间---------------'.format(get_name(roomid)))
try:
conn, cursor = opendb(roomid)
cursor.execute("select * from barrage order by id desc limit ?", (HISTORY_BARRAGES_MAX_SIZE,))
history_list = cursor.fetchall()
history_barrages = []
for i in range(len(history_list)-1,-1,-1):
v = history_list[i]
dmDict = {'id':v[0], 'uid':v[1], 'nickname':v[2], 'content':v[3], 'level':v[4], 'bnn':v[5], 'bl':v[6], 'brid':v[7], 'cst':v[8]}
history_barrages.append(json.dumps(dmDict, ensure_ascii=False))
except Exception as e:
isExit = True
traceback.print_exc()
return
recvdata_time = time.time()
try:
dataCache = []
while not isExit:
try:
data = client.recv(4096)
if not data:
print("error: no data")
isExit = True
break
#if keeplivePattern.search(data) != None:
recvdata_time = time.time()
dataCache.append(data)
if len(dataCache) >= 10:
debug_log_filename = "debug_barrage_{0}.txt".format(roomid)
with io.open(debug_log_filename, "w", encoding='utf-8') as f:
for xdata in dataCache:
f.write(xdata[12:].decode(encoding='utf-8', errors='ignore') + "\n")
dataCache = []
danmu_more = danmu.findall(data)
if danmu_more != None:
try:
for i in danmu_more:
msg_content = data.decode(encoding='utf-8', errors='ignore').replace("@S", "/").replace("@A=", ":").replace("@=", ":")
dmDict={}
idx = 0
dmDict['uid'] = cast_to_int(i[idx].decode(encoding='utf-8', errors='ignore'))
idx += 1
dmDict['nickname'] = i[idx].decode(encoding='utf-8', errors='ignore')
idx += 1
dmDict['content'] = i[idx].decode(encoding='utf-8', errors='ignore')
idx += 1
dmDict['level'] = cast_to_int(i[idx].decode(encoding='utf-8', errors='ignore'))
idx += 1
dmDict['cst'] = cast_to_int(i[idx].decode(encoding='utf-8', errors='ignore'))
idx += 1
dmDict['bnn'] = i[idx].decode(encoding='utf-8', errors='ignore')
idx += 1
dmDict['bl'] = cast_to_int(i[idx].decode(encoding='utf-8', errors='ignore'))
idx += 1
dmDict['brid'] = cast_to_int(i[idx].decode(encoding='utf-8', errors='ignore'))
#override cst
dmDict['cst'] = round(time.time() * 1000)
# time_str = datetime.now(tz).strftime('%Y-%m-%d %H:%M:%S')
# print("<" + time_str + "> " + dmDict['nickname'] + ": "+ dmDict['content'])
cursor.execute('''INSERT INTO barrage (uid,nickname,content,level,bnn,bl,brid,cst) VALUES(?,?,?,?,?,?,?,?)''',
(dmDict['uid'], dmDict['nickname'], dmDict['content'], dmDict['level'], dmDict['bnn'], dmDict['bl'], dmDict['brid'], dmDict['cst']))
dmDict['id'] = cursor.lastrowid
dmJsonStr = json.dumps(dmDict, ensure_ascii=False)+'\n'
if in_q.qsize() > IN_QUEUE_MAX_SIZE:
try:
# remove one
in_q.get(False)
except queue.Empty:
# consume speed is too high!
pass
in_q.put(dmJsonStr)
# print(dmJsonStr)
conn.commit()
except Exception as e:
traceback.print_exc()
continue
except BlockingIOError:
time.sleep(0.1)
if time.time() - recvdata_time > 40:
print("error: keeplive timeout")
isExit = True
break
except:
isExit = True
traceback.print_exc()
return
finally:
conn.close()
def sleep(duration):
count = 0
global isExit
while not isExit and count < duration:
time.sleep(1)
count += 1
def keeplive(client):
'''
发送心跳信息,维持TCP长连接
心跳消息末尾加入\0
'''
global isExit
sleep(5)
while not isExit:
msg = 'type@=keeplive/tick@=' + str(int(time.time())) + '/\0'
# print("sendmsg...", msg)
sendmsg(client, msg)
# print("sendmsg end",)
sleep(20)
def get_name(roomid):
'''
利用BeautifulSoup获取直播间标题
'''
r = requests.get("http://www.douyu.com/" + roomid)
soup = BeautifulSoup(r.text, 'lxml')
find_res = soup.find('a', {'class', 'Title-anchorName'})
if find_res:
return find_res.string
else:
return "unknow"
async def async_producer():
while True:
try:
return in_q.get(False) #doesn't block
except queue.Empty: #raised when queue is empty
return
async def producer_handler(websocket, path):
global history_barrages
global isExit
#print("connected", websocket, path)
# Register.
connected.add(websocket)
#print("connected count: ", len(connected))
try:
#send history
for message in history_barrages:
await websocket.send(message)
if len(connected) == 1:
while len(connected) > 0 and not isExit:
try:
message = await async_producer()
if message:
history_barrages.append(message)
if len(history_barrages) > HISTORY_BARRAGES_MAX_SIZE:
history_barrages.pop(0)
await asyncio.wait([ws.send(message) for ws in connected if not ws.closed])
if websocket.closed:
#print("closed", websocket, path)
connected.remove(websocket)
else:
await asyncio.sleep(0.1)
except websockets.exceptions.ConnectionClosed:
continue
except:
continue
else:
while True:
try:
await asyncio.sleep(1)
if websocket.closed:
break
except websockets.exceptions.ConnectionClosed:
break
except websockets.exceptions.ConnectionClosed:
time_str = datetime.now(tz).strftime('%Y-%m-%d %H:%M:%S')
print("excepttion occurred at " + time_str)
traceback.print_exc()
finally:
#print("closed", websocket, path)
# Unregister.
if websocket in connected:
connected.remove(websocket)
async def wakeup():
while True:
await asyncio.sleep(1)
if isExit:
time_str = datetime.now(tz).strftime('%Y-%m-%d %H:%M:%S')
print("exit at " + time_str)
exit(1)
break
def connect_to_openbarrage():
# 配置socket的ip和端口
client = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
# host = socket.gethostbyname("openbarrage.douyutv.com")
host = socket.gethostbyname("124.95.155.51")
port = 8601
client.connect((host, port))
client.settimeout(0)
return client
# 启动程序
if __name__ == '__main__':
# room_id = input('请输入房间ID: ')
room_id = sys.argv[1]
client = connect_to_openbarrage()
thread1 = threading.Thread(target=start, args=(client, room_id, in_q,))
thread2 = threading.Thread(target=keeplive, args=(client,), daemon=True)
thread1.start()
thread2.start()
try:
#ssl
#ssl_context = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
#localhost_pem = pathlib.Path("cert/lovehanser.live.pem")
#ssl_context.load_cert_chain(localhost_pem)
start_server = websockets.serve(producer_handler, '', 8765)
loop = asyncio.get_event_loop()
loop.run_until_complete(start_server)
loop.create_task(wakeup())
loop.run_forever()
except KeyboardInterrupt as e:
print("KeyboardInterrupt")
isExit = True
except:
isExit = True
time_str = datetime.now(tz).strftime('%Y-%m-%d %H:%M:%S')
print("excepttion occurred at " + time_str)
traceback.print_exc()