-
Notifications
You must be signed in to change notification settings - Fork 43
/
userbot.py
173 lines (144 loc) · 5.64 KB
/
userbot.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
from telethon import TelegramClient, events
from utils import write_chat_members, load_chat_members, update_userbot_admin_id
from database import DBSession, Message, User, Chat
import os
import time
import logging
import pathlib
SESSION_FILE = './config/anon.session'
SESSION_LOCK_FILE = './config/anon.session.lock'
def get_enabled_chat_ids():
session = DBSession()
chat_ids = [chat.id for chat in session.query(Chat) if chat.enable]
session.close()
return chat_ids
def insert_message(msg_id, msg_link, msg_text, from_id, from_chat, date):
new_msg = Message(id=msg_id, link=msg_link, text=msg_text, video='', photo='', audio='',
voice='', type='text', category='', from_id=from_id, from_chat=from_chat, date=date)
session = DBSession()
session.add(new_msg)
session.commit()
session.close()
def update_message(from_chat, from_id, msg_id, msg_text):
session = DBSession()
session.query(Message) \
.filter(Message.from_chat.is_(from_chat)) \
.filter(Message.id.is_(msg_id)) \
.update({"text": msg_text})
session.commit()
session.close()
def insert_or_update_user(user_id, fullname, username):
session = DBSession()
target_user = session.query(User).get(user_id)
if not target_user:
new_user = User(id=user_id, fullname=fullname, username=username)
session.add(new_user)
session.commit()
elif target_user.fullname != fullname or target_user.username != username:
target_user.fullname = fullname
target_user.username = username
session.commit()
session.close()
async def handle_new_message(event, client):
current_chat = await event.get_chat()
# 跳过非群组消息
if not hasattr(current_chat, 'title'):
return
chat_id = current_chat.id
listen_chat_ids = get_enabled_chat_ids()
if chat_id > 0:
fixed_id = int('-100' + str(chat_id))
if fixed_id in listen_chat_ids:
chat_id = fixed_id
# 跳过无关群组消息
if chat_id not in listen_chat_ids:
return
chat_title = current_chat.title
# 更新群组成员列表作为查询白名单
chat_members = load_chat_members()
members = await client.get_participants(current_chat)
member_ids = [member.id for member in members if not member.deleted]
chat_id_str = str(chat_id)
chat_members[chat_id_str] = {}
chat_members[chat_id_str]['title'] = chat_title
chat_members[chat_id_str]['members'] = member_ids
write_chat_members(chat_members)
sender = await event.get_sender()
# 排除bot和inline消息
if not sender.bot and not event.via_bot_id:
sender_username = sender.username
sender_fullname = ''
if sender.first_name:
sender_fullname += sender.first_name + ' '
if sender.last_name:
sender_fullname += sender.last_name
if sender_fullname == '':
sender_fullname = sender_username
user_id = from_id = event.from_id.user_id
msg_id = event.id
msg_link = 'https://t.me/c/{}/{}'.format(str(chat_id)[4:], event.id)
msg_text = event.message.message
msg_date = event.date
logging.debug('new_message: chat{} user{} "{}"'.format(
chat_id, from_id, msg_text))
# 存储消息
insert_message(msg_id, msg_link, msg_text, from_id, chat_id, msg_date)
# 更新用户信息
insert_or_update_user(user_id, sender_fullname, sender_username)
async def handle_edit_message(event):
current_chat = await event.get_chat()
# 跳过非群组消息
if not hasattr(current_chat, 'title'):
return
chat_id = current_chat.id
listen_chat_ids = get_enabled_chat_ids()
if chat_id > 0:
fixed_id = int('-100' + str(chat_id))
if fixed_id in listen_chat_ids:
chat_id = fixed_id
# 跳过无关群组消息
if chat_id not in listen_chat_ids:
return
# 修改后的消息
edited_message = event.message
if (edited_message.edit_date - edited_message.date).seconds > 120:
return
msg_id = edited_message.id
from_id = event.from_id.user_id
msg_text = edited_message.message
logging.debug('edit_message: chat{} user{} "{}"'.format(
chat_id, from_id, msg_text))
update_message(chat_id, msg_id, msg_text)
async def run_telethon():
while not os.path.exists(SESSION_FILE) or os.path.exists(SESSION_LOCK_FILE):
logging.info('尚未登陆,等待10秒重试...')
time.sleep(10)
api_id = int(os.getenv("USER_BOT_API_ID"))
api_hash = os.getenv("USER_BOT_API_HASH")
client = TelegramClient(SESSION_FILE, api_id, api_hash)
# 监听处理新消息
client.add_event_handler(lambda event: handle_new_message(
event, client), events.NewMessage)
# 监听处理消息修改
client.add_event_handler(handle_edit_message, events.MessageEdited)
# 启动客户端
await client.start()
# 保存登陆用户到本地,作为管理用户(仅userbot模式下)
me = await client.get_me()
admin_id = me.id
update_userbot_admin_id(admin_id)
await client.run_until_disconnected()
def run_once():
if os.path.exists(SESSION_FILE):
print('session已存在,如需重新登陆,删除配置文件夹下的anon.session文件')
else:
pathlib.Path(SESSION_LOCK_FILE).touch()
api_id = int(os.getenv("USER_BOT_API_ID"))
api_hash = os.getenv("USER_BOT_API_HASH")
client = TelegramClient(SESSION_FILE, api_id, api_hash)
# 启动客户端
client.start()
time.sleep(5)
os.remove(SESSION_LOCK_FILE)
if __name__ == '__main__':
run_once()