Skip to content

Commit

Permalink
added log level config entry
Browse files Browse the repository at this point in the history
reworked uploader to use less persistence calls and get notified if the crawler adds an image
removed unused function
  • Loading branch information
markusressel committed Mar 10, 2020
1 parent a5f0bcd commit 1d54c9f
Show file tree
Hide file tree
Showing 13 changed files with 77 additions and 49 deletions.
1 change: 1 addition & 0 deletions infinitewisdom-example.yaml
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
---

InfiniteWisdom:
log_level: debug
telegram:
admin_usernames:
- "myadminuser"
Expand Down
1 change: 0 additions & 1 deletion infinitewisdom/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@
import threading

LOGGER = logging.getLogger(__name__)
LOGGER.setLevel(logging.DEBUG)


class RegularIntervalWorker:
Expand Down
1 change: 0 additions & 1 deletion infinitewisdom/analysis/tesseract.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@
from infinitewisdom.stats import TESSERACT_FIND_TEXT_TIME

LOGGER = logging.getLogger(__name__)
LOGGER.setLevel(logging.DEBUG)


class Tesseract(ImageAnalyser):
Expand Down
1 change: 0 additions & 1 deletion infinitewisdom/analysis/worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@
download_image_bytes

LOGGER = logging.getLogger(__name__)
LOGGER.setLevel(logging.DEBUG)


class AnalysisWorker(RegularIntervalWorker):
Expand Down
2 changes: 0 additions & 2 deletions infinitewisdom/bot.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,9 +33,7 @@
from infinitewisdom.stats import INSPIRE_TIME, INLINE_TIME, START_TIME, CHOSEN_INLINE_RESULTS, format_metrics
from infinitewisdom.util import send_photo, send_message, cryptographic_hash

logging.basicConfig(level=logging.WARNING, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s')
LOGGER = logging.getLogger(__name__)
LOGGER.setLevel(logging.DEBUG)


def requires_image_reply(func):
Expand Down
12 changes: 11 additions & 1 deletion infinitewisdom/config/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

import logging
import os
import re

from container_app_conf import ConfigBase
from container_app_conf.entry.bool import BoolConfigEntry
Expand All @@ -41,7 +42,6 @@ class AppConfig(ConfigBase):
"""

LOGGER = logging.getLogger(__name__)
LOGGER.setLevel(logging.DEBUG)

def __new__(cls, *args, **kwargs):
yaml_source = YamlSource(CONFIG_FILE_NAME)
Expand All @@ -51,6 +51,16 @@ def __new__(cls, *args, **kwargs):
]
return super(AppConfig, cls).__new__(cls, data_sources=data_sources)

LOG_LEVEL = StringConfigEntry(
description="Log level",
key_path=[
CONFIG_NODE_ROOT,
"log_level"
],
regex=re.compile(f" {'|'.join(logging._nameToLevel.keys())}", flags=re.IGNORECASE),
default="DEBUG",
)

TELEGRAM_BOT_TOKEN = StringConfigEntry(
key_path=[
CONFIG_NODE_ROOT,
Expand Down
8 changes: 6 additions & 2 deletions infinitewisdom/crawler.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,10 +25,10 @@
from infinitewisdom.persistence import ImageDataPersistence
from infinitewisdom.persistence.sqlalchemy import Image, _session_scope
from infinitewisdom.stats import CRAWLER_TIME
from infinitewisdom.uploader import TelegramUploader
from infinitewisdom.util import download_image_bytes, create_hash

LOGGER = logging.getLogger(__name__)
LOGGER.setLevel(logging.DEBUG)


class Crawler(RegularIntervalWorker):
Expand All @@ -37,14 +37,16 @@ class Crawler(RegularIntervalWorker):
"""
URL_CACHE = {}

def __init__(self, config: AppConfig, persistence: ImageDataPersistence, image_analysers: [ImageAnalyser]):
def __init__(self, config: AppConfig, persistence: ImageDataPersistence,
telegram_uploader: TelegramUploader, image_analysers: [ImageAnalyser]):
"""
Creates a crawler instance.
:param persistence: crawled data is added here
"""
super().__init__(config.CRAWLER_INTERVAL.value)
self._persistence = persistence
self._image_analysers = image_analysers
self._telegram_uploader = telegram_uploader

@CRAWLER_TIME.time()
def _run(self):
Expand Down Expand Up @@ -72,11 +74,13 @@ def _add_image_url_to_pool(self, session) -> str or None:
existing.url, url, image_hash))
existing.url = url
self._persistence.update(session, existing, image_data)
self._telegram_uploader.add_image_to_queue(existing.id)
self.URL_CACHE[url] = True
return None

entity = Image(url=url, created=time.time())
self._persistence.add(session, entity, image_data)
self._telegram_uploader.add_image_to_queue(entity.id)
LOGGER.debug('Added image #{} with URL: "{}"'.format(self._persistence.count(session), url))

self.URL_CACHE[url] = True
Expand Down
8 changes: 5 additions & 3 deletions infinitewisdom/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@

logging.basicConfig(level=logging.WARNING, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s')
LOGGER = logging.getLogger(__name__)
LOGGER.setLevel(logging.DEBUG)

if __name__ == '__main__':
from prometheus_client import start_http_server
Expand All @@ -40,6 +39,9 @@

config = AppConfig()

log_level = logging._nameToLevel.get(str(config.LOG_LEVEL.value).upper(), config.LOG_LEVEL.default)
logging.getLogger("infinitewisdom").setLevel(log_level)

LOGGER.debug("Config:\n{}".format(config.print(TomlFormatter())))

persistence = ImageDataPersistence(config)
Expand All @@ -61,9 +63,9 @@
start_http_server(config.STATS_PORT.value)

wisdom_bot = InfiniteWisdomBot(config, persistence, image_analysers)
crawler = Crawler(config, persistence, image_analysers)
analysis_worker = AnalysisWorker(config, persistence, image_analysers)
telegram_uploader = TelegramUploader(config, persistence, wisdom_bot._updater.bot)
crawler = Crawler(config, persistence, telegram_uploader, image_analysers)
analysis_worker = AnalysisWorker(config, persistence, image_analysers)

crawler.start()
analysis_worker.start()
Expand Down
14 changes: 11 additions & 3 deletions infinitewisdom/persistence/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <https://www.gnu.org/licenses/>.
import logging
from typing import List

from sqlalchemy.orm import Session

Expand All @@ -27,7 +28,6 @@
from infinitewisdom.util import create_hash

LOGGER = logging.getLogger(__name__)
LOGGER.setLevel(logging.DEBUG)


class ImageDataPersistence:
Expand Down Expand Up @@ -141,13 +141,13 @@ def find_without_image_data(self, session: Session) -> [Image]:
"""
return self._database.find_without_image_data(session)

def find_not_uploaded(self, session: Session, bot_token: str) -> Image or None:
def get_not_uploaded_image_ids(self, session: Session, bot_token: str) -> List[int]:
"""
Finds an image that has not yet been uploaded to telegram servers
:param bot_token: the bot token
:return: entity or None
"""
return self._database.find_not_uploaded(session, bot_token)
return self._database.get_not_uploaded_image_ids(session, bot_token)

def count(self, session) -> int:
"""
Expand All @@ -156,6 +156,14 @@ def count(self, session) -> int:
"""
return self._database.count(session)

def get_image(self, session: Session, entity_id: int):
"""
Get an image entity by id
:param entity_id: entity id
:return:
"""
return self._database.get(session, entity_id)

def update(self, session: Session, entity: Image, image_data: bytes or None = None) -> None:
"""
Updates the given entity
Expand Down
1 change: 0 additions & 1 deletion infinitewisdom/persistence/image_persistence.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@
from infinitewisdom.util import create_hash

LOGGER = logging.getLogger(__name__)
LOGGER.setLevel(logging.DEBUG)

lock = Lock()

Expand Down
44 changes: 29 additions & 15 deletions infinitewisdom/persistence/sqlalchemy.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,18 +15,19 @@
# along with this program. If not, see <https://www.gnu.org/licenses/>.
import logging
import time
from collections import Counter
from contextlib import contextmanager
from datetime import datetime
from typing import List

from sqlalchemy import create_engine, Column, Integer, String, Float, func, and_, ForeignKey, Table, or_
from sqlalchemy import create_engine, Column, Integer, String, Float, func, and_, ForeignKey, Table
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import sessionmaker, Session, relationship

from infinitewisdom.const import DEFAULT_SQL_PERSISTENCE_URL
from infinitewisdom.util import cryptographic_hash

LOGGER = logging.getLogger(__name__)
LOGGER.setLevel(logging.DEBUG)

Base = declarative_base()

Expand Down Expand Up @@ -148,7 +149,7 @@ def __init__(self, url: str or None = None):
self._migrate_db(url)

global _sessionmaker
engine = create_engine(url, echo=False)
engine = create_engine(url)
_sessionmaker.configure(bind=engine)

with _session_scope() as session:
Expand Down Expand Up @@ -177,6 +178,10 @@ def get_or_add_bot_token(session: Session, bot_token: str) -> BotToken:
session.refresh(bot_token_entity)
return bot_token_entity

@staticmethod
def get(session: Session, entity_id: int):
return session.query(Image).get(entity_id)

@staticmethod
def get_all(session: Session) -> [Image]:
return session.query(Image).order_by(Image.created.desc()).all()
Expand Down Expand Up @@ -250,22 +255,31 @@ def find_without_image_data(session: Session) -> Image or None:
Image.created).all()

@staticmethod
def find_not_uploaded(session: Session, bot_token: str) -> Image or None:
def get_not_uploaded_image_ids(session: Session, bot_token: str) -> List[int]:
hashed_bot_token = cryptographic_hash(bot_token)
return session.query(Image).filter(
and_(
or_(~Image.telegram_file_ids.any(),
~Image.telegram_file_ids.any(
TelegramFileId.bot_tokens.any(BotToken.hashed_token.in_([hashed_bot_token])))),
Image.image_hash.isnot(None))
).first()
# OrderBy causes HUGE load on the postgres process (100% over several minutes without any sign of finishing up)
# so we have to omit this for now
# .order_by(Image.created)
image_ids = session.query(Image.id).all()
bot_token_entity = session.query(BotToken).filter_by(hashed_token=hashed_bot_token).first()
uploaded_image_ids = set(map(lambda x: x.image_id, bot_token_entity.telegram_file_ids))

x = image_ids
y = uploaded_image_ids
remaining = Counter(y)

# out would be the full substraction
out = []
for val in x:
if remaining[val]:
remaining[val] -= 1
else:
out.append(val)

# TODO: this still takes about 30 seconds and shifts part of the load to the client,
# but its the best we have now
return out

@staticmethod
def count(session: Session) -> int:
return session.query(Image.id).count()
return session.query(func.count(Image.id)).scalar()

@staticmethod
def update(session: Session, image: Image) -> None:
Expand Down
16 changes: 14 additions & 2 deletions infinitewisdom/uploader.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@
from infinitewisdom.util import send_photo, download_image_bytes

LOGGER = logging.getLogger(__name__)
LOGGER.setLevel(logging.DEBUG)


class TelegramUploader(RegularIntervalWorker):
Expand All @@ -40,16 +39,29 @@ def __init__(self, config: AppConfig, persistence: ImageDataPersistence, bot: Bo
self._bot = bot
self._chat_id = config.UPLOADER_CHAT_ID.value

with _session_scope() as session:
self._not_uploaded_ids = set(self._persistence.get_not_uploaded_image_ids(session, self._bot.token))

def start(self):
if self._chat_id is None:
LOGGER.debug("No chat id configured, not starting uploader.")
return
super().start()

def add_image_to_queue(self, image_entity_id: int):
self._not_uploaded_ids.add(image_entity_id)

@UPLOADER_TIME.time()
def _run(self):
with _session_scope() as session:
entity = self._persistence.find_not_uploaded(session, self._bot.token)
if len(self._not_uploaded_ids) <= 0:
# sleep for a longer time period to reduce load
time.sleep(60)
return

image_id = self._not_uploaded_ids.pop()

entity = self._persistence.get_image(session, image_id)
if entity is None:
# sleep for a longer time period to reduce load
time.sleep(60)
Expand Down
17 changes: 0 additions & 17 deletions infinitewisdom/util.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@
from infinitewisdom.const import TELEGRAM_CAPTION_LENGTH_LIMIT, REQUESTS_TIMEOUT

LOGGER = logging.getLogger(__name__)
LOGGER.setLevel(logging.DEBUG)


def download_image_bytes(url: str) -> bytes:
Expand Down Expand Up @@ -91,22 +90,6 @@ def select_best_available_analyser(session, analysers: [ImageAnalyser], persiste
return sorted(available, key=lambda x: (-x.get_quality(), -remaining_capacity(session, x, persistence)))[0]


def parse_telegram_command(text: str) -> (str, [str]):
"""
Parses the given message to a command and its arguments
:param text: the text to parse
:return: the command and its argument list
"""
if text is None or len(text) <= 0:
return None, [0]

if " " not in text:
return text[1:], None
else:
command, rest = text.split(" ", 1)
return command[1:], rest


def send_photo(bot: Bot, chat_id: str, file_id: int or None = None, image_data: bytes or None = None,
caption: str = None) -> [str]:
"""
Expand Down

0 comments on commit 1d54c9f

Please sign in to comment.