1+ import datetime
12import json
23import os
3- import time
44
55import pika
66
7- from utils .db_service import insert_stock_data
7+ from utils .db_service import insert_stock_data , stock_get_id
8+ from utils .logger import workers_logger as logger
89from utils .yahoo_finance import data_for_stocks_data_update
910
1011
1112def worker_function (ch , method , properties , body ): # pylint: disable=C0103, W0613
1213 body = json .loads (body )
13- data = data_for_stocks_data_update (body ["stock_name" ], body ["from" ], body ["to" ])
14+ logger .info (f'Worker Task { body } was received' )
15+ stock_id = stock_get_id (body ['stock_name' ])
16+ start = datetime .datetime .fromisoformat (body ['from' ])
17+ finish = datetime .datetime .fromisoformat (body ['to' ])
18+ data = data_for_stocks_data_update (body ["stock_name" ], start , finish )
1419 for stock in data :
15- insert_stock_data (stock ["stock_id" ], stock ["price" ], stock ["created_at" ])
20+ insert_stock_data (stock_id , stock ["price" ], stock ["created_at" ])
21+ logger .info (f'Worker Task { body } was succeseful done' )
1622 ch .basic_ack (delivery_tag = method .delivery_tag )
1723
1824
@@ -23,4 +29,5 @@ def worker_function(ch, method, properties, body): # pylint: disable=C0103, W0
2329 channel = connection .channel ()
2430 channel .queue_declare (queue = 'worker_queue' , durable = True )
2531 channel .basic_consume (queue = 'worker_queue' , on_message_callback = worker_function )
32+ logger .info ('Worker connection was created' )
2633 channel .start_consuming ()
0 commit comments