Skip to content

Commit

Permalink
fixes
Browse files Browse the repository at this point in the history
  • Loading branch information
ALEXSSS committed May 25, 2021
1 parent ba70b67 commit 20c4b5e
Show file tree
Hide file tree
Showing 7 changed files with 120 additions and 40 deletions.
7 changes: 4 additions & 3 deletions PROPERTY.py
Expand Up @@ -7,10 +7,11 @@
class PROPS:
api_id = 123
api_hash = 'hash'
client_account_manager = TelegramClient('client_account_manager', api_id, api_hash)
client_extractor = TelegramClient('extractor_session', api_id, api_hash)
client_bot = TelegramClient('bot_session', api_id, api_hash)
sleep_time_approaches = 15*60
client_account_manager = TelegramClient('client_account_manager', api_id, api_hash)
client_bot = TelegramClient('bot_session_with_bot_as_user', api_id, api_hash)
client_notifier = TelegramClient('client_notifier_bot', api_id, api_hash)
sleep_time_approaches = 15 * 60
sleep_time_channel = 3


Expand Down
35 changes: 20 additions & 15 deletions bot/bot.py
Expand Up @@ -11,7 +11,7 @@
from PROPERTY import ONO_BOT_PROPS, PROPS
from dao_layer import retrieve_all_channels, add_anchor, add_user_channel_row, retrieve_all_messages_with_channel, \
delete_user_channel_row, retrieve_all_channels_for_user
from index import InvertedIndex
from index import InvertedIndex, rebuild_index
from properties_bot import EMOJI

from telethon import TelegramClient, events
Expand Down Expand Up @@ -55,7 +55,7 @@ async def send_welcome(event):
f"\n\n{EMOJI.POINT_RIGHT} Напишите :"
"\n/search <набор слов, любое выражение> "
"\nи я поищу новости этому отвечающие в ваших выбранных ранние каналах."

f"\n\n{EMOJI.POINT_RIGHT} Напишите :"
"\n/gsearch <набор слов, любое выражение> "
"\nи я поищу новости этому отвечающие глобально среди всех мной отслеживаемых."
Expand All @@ -67,6 +67,18 @@ async def send_welcome(event):
"\nто есть /status можно писать #status (и остальные команды)"

f"\n\nЖелаем вам приятного поиска!"

f"\n\n----------------------------"
f"\n\n{EMOJI.POINT_RIGHT} более искушенные пользователи могут воспользоваться нашей notify функциональностью"
f"\nпросто напишите /notify и набор слов в свободной форме (NOTIFY РАБОТАЕТ ТОЛЬКО ПО ВАШИМ КАНАЛАМ) /notifyg по всем каналам"
f"\nили используйте /snotify и набор слов в ввиде логических выражений, который позволит вам лучше специфицировать ваш запрос"
f"\nпример /snotify иван иванов and криптовалюта или пример для бьюти блоггера /snotify наташа голубой глаз and тени"
f"\nтакже можно использовать операцию or и () скобочки для объяденения поисковых запросов, /snotifyg чтобы писать запросы по всем каналам"

f"\n\n{EMOJI.POINT_RIGHT} используйте #set_email <email> команду, чтобы записать ваш текущий емаил для уведомлений,"
f"\nпо умолчанию результаты поиска и нотификации приходят только в личные сообщения от бота"

f"\n\n{EMOJI.POINT_RIGHT} используйте #set_size <number> команду, чтобы поменять дефолтный размер выборки поиска."
)


Expand Down Expand Up @@ -113,8 +125,10 @@ async def status(event):

@bot.on(events.NewMessage(pattern='^(#|/)search\s(\w|\W)+'))
async def search(event):
global index, last_time_query
index, last_time_query = rebuild_index(index, last_time_query)

query = event.message.message[len('#search'):].strip()
rebuild_index()
available_channels = {channel[0] for channel in retrieve_all_channels_for_user(event.sender_id)}
if len(available_channels) == 0:
await event.reply(f"Вы еще не подписались ни на один канал, попробуйте #gsearch, чтобы поискать глобально!")
Expand Down Expand Up @@ -143,8 +157,10 @@ async def search(event):

@bot.on(events.NewMessage(pattern='^(#|/)gsearch\s(\w|\W)+'))
async def gsearch(event):
global index, last_time_query
index, last_time_query = rebuild_index(index, last_time_query)

query = event.message.message[len('#gsearch'):].strip()
rebuild_index()
result = [res for res in index.search_phrase(query, limit=100)][:10]
for ((channel_name, msg_id), match) in result:
msg = (await bot.get_messages(entity=channel_name, ids=int(msg_id)))
Expand All @@ -166,17 +182,6 @@ async def gsearch(event):
# ))
await event.reply(f"Нашли результат \n{result}!")

def rebuild_index():
log("rebuild_index is started!")
global index, last_time_query
curr_time = time.time()
if index is None or curr_time - last_time_query > PROPS.sleep_time_approaches:
temp = InvertedIndex()
temp.create_index(retrieve_all_messages_with_channel())
index = temp
last_time_query = curr_time
log("rebuild_index is ended!")


async def subscribe_if_not_subscribed(channel_to_check, client):
async for dialog in client.iter_dialogs():
Expand Down
37 changes: 17 additions & 20 deletions dao_layer.py
Expand Up @@ -65,6 +65,16 @@ def retrieve_all_channels_for_user(cursor, user_id):
return list(cursor.fetchall())


@with_postgres_cursor
def retrieve_all_users_notifies(cursor):
cursor.execute(
"""
SELECT USER_ID, GLOBALLY, QUERY_TEXT FROM USER_NOTIFY;
"""
)
return list(cursor.fetchall())


@with_postgres_cursor
def retrieve_all_messages_with_ids(cursor, ids=[]):
cursor.execute(
Expand Down Expand Up @@ -103,7 +113,7 @@ def delete_user_channel_row(cursor, channel, user_id):


@with_postgres_cursor
def insert_messages(cursor, channel, messages):
def insert_messages_tg(cursor, channel, messages):
for message in messages:
# message.chat.id
# message.chat.title
Expand All @@ -127,7 +137,8 @@ def insert_messages(cursor, channel, messages):
SENDER_ID,
PUBLISH_DATE,
CONTENT
CONTENT,
SOURCE_ID
) VALUES (
'{}',
Expand All @@ -138,25 +149,11 @@ def insert_messages(cursor, channel, messages):
'{}',
'{}',
'{}',
'{}',
'{}'
);
""".format(message.id, message.chat.id, channel, message.chat.title if hasattr(message.chat, 'title') else "",
message.sender.username, message.sender.id, message.date, message.text)
""".format(message.id, message.chat.id, channel,
message.chat.title if hasattr(message.chat, 'title') else "",
message.sender.username, message.sender.id, message.date, message.text, "TG")
)

# CREATE TABLE MESSAGE
# (
# MESSAGE_ID TEXT,
#
# CHANNEL_ID TEXT,
# CHANNEL_NAME TEXT,
# CHANNEL_TITLE TEXT,
#
# SENDER_NAME TEXT,
# SENDER_USERNAME TEXT,
#
# PUBLISH_DATE TEXT,
# CONTENT TEXT NOT NULL,
# PRIMARY KEY (MESSAGE_ID, CHANNEL_NAME)
# );
4 changes: 2 additions & 2 deletions extractor/extractor.py
@@ -1,7 +1,7 @@
import time

from PROPERTY import PROPS
from dao_layer import retrieve_all_channels, initialise_anchor, insert_messages
from dao_layer import retrieve_all_channels, initialise_anchor, insert_messages_tg
from logger import log


Expand All @@ -22,7 +22,7 @@ def aggregate_massages():
messages = [*tg_client.iter_messages(entity=channel, min_id=int(last_message) + 1)]
if len(messages) > 0:
initialise_anchor(channel, max([message.id for message in messages]))
insert_messages(channel, messages)
insert_messages_tg(channel, messages)
for item in messages:
log(f"Taken from the chat {channel},\n text: {item.text}")
time.sleep(PROPS.sleep_time_approaches)
Expand Down
22 changes: 22 additions & 0 deletions index.py
@@ -1,7 +1,11 @@
import re
import time

from nltk.stem.snowball import SnowballStemmer

from PROPERTY import PROPS
from dao_layer import retrieve_all_messages_with_channel
from logger import log
from russian_extrac_configs import russian_stop_words, punctuation


Expand Down Expand Up @@ -78,6 +82,24 @@ def search_phrase(self, query, limit=3):
result = sorted(result.items(), key=lambda x: x[1], reverse=True)[:limit]
return result


def rebuild_index(index, last_time_query):
log("rebuild_index is started!")
try:
curr_time = time.time()
if index is None or curr_time - last_time_query > PROPS.sleep_time_approaches:
temp = InvertedIndex()
temp.create_index(retrieve_all_messages_with_channel())
return temp, curr_time
return index, last_time_query
finally:
log("rebuild_index is ended!")


class FullSet(set):
def __contains__(self, item):
return True

# index = InvertedIndex()
# rows = [
# ["Григорий пошёл домой на селиваново!", "1", "канал1"],
Expand Down
42 changes: 42 additions & 0 deletions notifier/notifier.py
@@ -0,0 +1,42 @@
import asyncio
import time

from dao_layer import retrieve_all_users_notifies, retrieve_all_channels_for_user
from index import InvertedIndex, rebuild_index, FullSet
from logger import log
from PROPERTY import PROPS, ONO_BOT_PROPS

client = PROPS.client_notifier
client.start(bot_token=ONO_BOT_PROPS.token)

last_time_query = 0
index: InvertedIndex = None


async def notify_users():
global index, last_time_query, client

while True:
index, last_time_query = rebuild_index(index, last_time_query)

notifies = retrieve_all_users_notifies()

for notify in notifies:
user_id = notify[0]
globally = notify[1]
query = notify[2]

entity = await client.get_entity(user_id)

available_channels = {channel[0] for channel in
retrieve_all_channels_for_user(user_id)} if not globally else FullSet()

result = [res for res in index.search_phrase(query, limit=100) if res[0][0] in available_channels][:10]
await client.send_message(entity=entity, message=str(result))
time.sleep(PROPS.sleep_time_approaches * 2)


if __name__ == '__main__':
log("Start user notification!")
client.loop.run_until_complete(notify_users())

13 changes: 13 additions & 0 deletions sql_scripts/init_scripts/SEED_BASE.sql
Expand Up @@ -99,6 +99,19 @@ INSERT INTO ONO_ANCHORS(CHANEL_NAME) VALUES ('stalin_gulag');
-- INSERT INTO ONO_ANCHORS(CHANEL_NAME) VALUES ('sns_internships');
-- INSERT INTO ONO_ANCHORS(CHANEL_NAME) VALUES ('startupneversleeps');

ALTER TABLE MESSAGE ADD SOURCE_ID TEXT;

DROP TABLE IF EXISTS USER_NOTIFY;
CREATE TABLE USER_NOTIFY
(
USER_ID TEXT,
GLOBALLY BOOLEAN,
QUERY_TEXT TEXT,
PRIMARY KEY (USER_ID, QUERY_TEXT)
);




-- ADD READ USER
CREATE USER DIMA WITH PASSWORD 'ono_dima';
Expand Down

0 comments on commit 20c4b5e

Please sign in to comment.