diff --git a/crawler/crawler.py b/crawler/crawler.py index 46c2982..25e61f9 100644 --- a/crawler/crawler.py +++ b/crawler/crawler.py @@ -20,7 +20,7 @@ """This module contains main logic for crawler.""" import asyncio -from datetime import datetime +from datetime import datetime, timedelta from traceback import format_exc from crawler.data_loader import loader @@ -28,24 +28,35 @@ class Crawler: """ - Crawler that checks new prices on various coins/exchanges pairs and update this price in DB. + Crawler that checks new prices on various coins/exchanges pairs, update this price in DB, and create + price history. Attributes: - :param coin_map: where keys is coin name (in BD) and values is - with structure like {exchange name (in BD): coin name (on exchange), ...} - :param bd: to interact with BD that contain next functions: - update_exchange(coin, exchange, price), get_coin(coin), add_coin(coin), - get_exchange(coin, exchange, price) add_exchange(coin, exchange, price) + :param coin_map: where keys is coin name (in db) and values is + with structure like {exchange name (in db): coin name (on exchange), ...} + :param db: to interact with DB that contain next functions: + get_coin(coin), add_coin(coin), + get_exchange(coin, exchange), add_exchange(coin, exchange, price), update_exchange(coin, exchange, price), + get_coin_h(coin), add_coin_h(coin), + get_exchange(coin, exchange), add_exchange_h(coin, exchange), get_exchange_history(coin, exchange), + add_price_to_exchange_h(coin, exchange, time, price), update_exchange_h(coin, exchange, history) :param logger: crawler's logger - :param timeout: [optional] frequency (in seconds) of requesting new data + :param timeout: [optional=1] frequency (in seconds) of requesting new data + :param history: [optional=True] will write coins price history if True + :param h_update_time: [optional=3600] frequency (in seconds) of history cleaning + :param h_threshold_time: [optional=1] time (in days) that price will exist in history """ - def __init__(self, coin_map, bd, logger, timeout=1): + + def __init__(self, coin_map, db, logger, timeout=1, history=True, h_update_time=3600, h_threshold_time=1): self.coin_map = coin_map - self.bd = bd + self.db = db self.timeout = timeout self.loop = asyncio.get_event_loop() self.logger = logger + self.history = history + self.h_update_time = h_update_time + self.h_threshold_time = h_threshold_time async def _load(self, coin, exchange): """ @@ -64,42 +75,52 @@ async def _load(self, coin, exchange): def _update(self, coin, exchange, price): """ - Update price of specific coin on specific exchange in BD + Update price of specific coin on specific exchange in db Args: - :param coin: coin name (in BD) + :param coin: coin name (in db) :param exchange: exchange name :param price: price """ - self.bd.update_exchange(coin=coin, exchange=exchange, price=price) + self.db.update_exchange(coin=coin, exchange=exchange, price=price) + if self.history: + self.db.add_price_to_exchange_h(coin=coin, exchange=exchange, time=datetime.utcnow(), price=price) def _check_existing(self, coin, exchange): """ - Check existence of coin/exchange pair in BD and if not exist add them. + Check existence of coin/exchange pair in db and if not exist add them. Args: - :param coin: coin name (in BD) + :param coin: coin name (in db) :param exchange: exchange name """ - db_coin = self.bd.get_coin(coin=coin) - if not db_coin: - self.bd.add_coin(coin=coin) - db_exchange = self.bd.get_exchange(coin=coin, exchange=exchange) - if not db_exchange: - self.bd.add_exchange(coin=coin, exchange=exchange, price=None) + if not self.db.get_coin(coin=coin): + self.db.add_coin(coin=coin) + if not self.db.get_exchange(coin=coin, exchange=exchange): + self.db.add_exchange(coin=coin, exchange=exchange, price=None) + + if self.history: + if not self.db.get_coin_h(coin=coin): + self.db.add_coin_h(coin=coin) + if not self.db.get_exchange_h(coin=coin, exchange=exchange): + self.db.add_exchange_h(coin=coin, exchange=exchange) async def load_and_update(self, coin, exchange): """ Check availability of coin/exchange pair. If available start infinite loop - that gets new coin price from exchange site and updates this price in BD + that gets new coin price from exchange site and updates this price in db Args: - :param coin: coin name (in BD) + :param coin: coin name (in db) :param exchange: exchange name """ + if exchange == 'bitfinex': + sleep_time = len(self.coin_map) + else: + sleep_time = self.timeout try: coin_name = self.coin_map[coin].get(exchange) if coin_name is not None: @@ -109,16 +130,33 @@ async def load_and_update(self, coin, exchange): stime = datetime.now() value = await self._load(coin=coin_name, exchange=exchange) self._update(coin=coin, exchange=exchange, price=value) - await asyncio.sleep(self.timeout - (datetime.now() - stime).total_seconds()) + await asyncio.sleep(sleep_time - (datetime.now() - stime).total_seconds()) except Exception as e: - self.logger.warning('Exception in thread\'s loop ({}{}): {}' + self.logger.warning('Exception in thread\'s loop ({} {}): {}' '{}'.format(exchange, coin, str(e), format_exc())) - await asyncio.sleep(self.timeout) + await asyncio.sleep(sleep_time) except Exception as e: - self.logger.critical('Exception in crawler\'s thread ({}{}): {}' + self.logger.critical('Exception in crawler\'s thread ({} {}): {}' '{}'.format(exchange, coin, str(e), format_exc())) self.loop.stop() + async def history_cleaner(self): + """Clean history from old prices""" + while True: + for coin in self.coin_map.keys(): + for exchange in self.coin_map[coin].keys(): + try: + new_history = [] + time = datetime.utcnow() - timedelta(days=self.h_threshold_time) + for timestamp in self.db.get_exchange_history(coin=coin, exchange=exchange): + if timestamp['time'] > time: + new_history.append(timestamp) + self.db.update_exchange_h(coin=coin, exchange=exchange, history=new_history) + except Exception as e: + self.logger.warning('Exception in history_cleaner ({} {}): {}' + '{}'.format(exchange, coin, str(e), format_exc())) + await asyncio.sleep(self.h_update_time) + def launch(self): """ Start function of Crawler (start asynchronous load_and_update() tasks for @@ -129,6 +167,8 @@ def launch(self): for exchange in self.coin_map[coin].keys(): asyncio.ensure_future(self.load_and_update(coin=coin, exchange=exchange)) + if self.history: + asyncio.ensure_future(self.history_cleaner()) self.loop.run_forever() except Exception as e: self.logger.critical('Exception in creating crawler\'s threads: {}' diff --git a/crawler_launch.py b/crawler_launch.py index 26adc0a..1f98f8f 100644 --- a/crawler_launch.py +++ b/crawler_launch.py @@ -28,5 +28,5 @@ crawler_logger = logging.getLogger(__name__) if __name__ == '__main__': - crawler = Crawler(coin_map=mq.coin_map, bd=mq, logger=crawler_logger) + crawler = Crawler(coin_map=mq.coin_map, db=mq, logger=crawler_logger) crawler.launch() \ No newline at end of file diff --git a/mongo_queries.py b/mongo_queries.py index dc6b15e..c1c8102 100644 --- a/mongo_queries.py +++ b/mongo_queries.py @@ -28,16 +28,31 @@ db = mongo_client[local_config.MONGO_DB] db.authenticate(local_config.MONGO_BOT_USER, local_config.MONGO_BOT_PASSWORD) + # Dictionary that compares coin name with coin name on specific exchange -coin_map = db.settings.find_one()['coin_map'] +def _coin_map(): + return db.settings.find_one()['coin_map'] +coin_map = _coin_map() + + # All available coins -coins = db.settings.find_one()['coin_map'].keys() +def _coins(): + return db.settings.find_one()['coin_map'].keys() +coins = _coins() + + # Dictionary that compares exchange name from DB with real exchange name -exchange_map = db.settings.find_one()['exchange_map'] +def _exchange_map(): + return db.settings.find_one()['exchange_map'] +exchange_map = _exchange_map() # All available exchanges exchanges = list(set([exchange for coin in coins for exchange in coin_map[coin].keys()])) + + # Default settings for users -default_settings = db.settings.find_one()['default_settings'] +def _default_settings(): + return db.settings.find_one()['default_settings'] +default_settings = _default_settings() # Users collection @@ -50,7 +65,7 @@ def get_users(): :return: of dict with user's data """ - return db['users'].find({}) + return list(db['users'].find({})) def add_user(msg, email): @@ -195,7 +210,7 @@ def add_exchange(coin, exchange, price): """ db['coins'].update({'name': coin}, {'$push': {'exchanges': {'name': exchange, - 'price': price}}}) + 'price': price}}}) def update_exchange(coin, exchange, price): @@ -209,7 +224,109 @@ def update_exchange(coin, exchange, price): """ db['coins'].find_and_modify(query={'name': coin, 'exchanges.name': exchange}, - update={"$set": {'exchanges.$.price': price}}) + update={'$set': {'exchanges.$.price': price}}) + + +# Coins history collection + +def get_coin_h(coin): + """ + Return coin's data + + Args: + :param coin: coin name + + Returns: + :return: with coin's data or None if coin doesn't exist + + """ + return db['coins_history'].find_one({'name': coin}) + + +def add_coin_h(coin): + """ + Add new coin with emtpy exchange list to collection + + Args: + :param coin: coin name + + """ + db['coins_history'].insert_one({'name': coin, 'exchanges': []}) + + +def get_exchange_h(coin, exchange): + """ + Return coin's data where coin contains specific exchange + + Args: + :param coin: coin name + :param exchange: exchange name + + Returns: + :return: with coin's data or None if that coin doesn't exist + + """ + return db['coins_history'].find_one({'name': coin, 'exchanges.name': exchange}) + + +def add_exchange_h(coin, exchange): + """ + Add new exchange with empty coin's price history to coin's list + + Args: + :param coin: coin name + :param exchange: exchange name + + """ + db['coins_history'].update({'name': coin}, + {'$push': {'exchanges': {'name': exchange, + 'history': []}}}) + + +def add_price_to_exchange_h(coin, exchange, time, price): + """ + Add coin's price on exchange and time when this price get to history + + Args: + :param coin: coin name + :param exchange: exchange name + :param time: UTC time + :param price: coin's price on exchange + + """ + db['coins_history'].find_and_modify(query={'name': coin, 'exchanges.name': exchange}, + update={'$push': {'exchanges.$.history': {'time': time, + 'price': price}}}) + + +def get_exchange_history(coin, exchange): + """ + Return coin's price history on exchange + + Args: + :param coin: coin name + :param exchange: exchange name + + Returns: + :return: of coin's price history + + """ + return db['coins_history'].find_one({'name': coin, 'exchanges.name': exchange}, + {'exchanges.$': 1})['exchanges'][0]['history'] + + +def update_exchange_h(coin, exchange, history): + """ + Update coin's history on exchange + + Args: + :param coin: coin name + :param exchange: exchange name + :param history: of dictionaries with next structure: {'time': , 'price': } + + """ + db['coins_history'].find_and_modify(query={'name': coin, 'exchanges.name': exchange}, + update={'$set': {'exchanges.$.history': history}}) # Subscribers collection @@ -225,8 +342,8 @@ def get_user_email(key): :return: user's e-mail or None if subscriber doesn't exist """ - user = db.subscribers.find_one({"key": str(key)}) + user = db.subscribers.find_one({'key': str(key)}) if user: - return user["email"] + return user['email'] else: - return None \ No newline at end of file + return None