In [1]:
import requests
import psycopg2
import json
from datetime import datetime, timedelta
import logging
import traceback
import os
import glob
import gspread

In [2]:
# Задаю параметры подключения к БД
DB_PARAMS = {
    'host': 'localhost',
    'port': 5430,
    'database': "skillfactory",
    'user': "parser",
    'password': "grader_parser_531"
}

In [3]:
# Задаю параметры для подключения через API
api_url = "https://b2b.itresume.ru/api/statistics"
paydate = {
 'client':  'Skillfactory',
 'client_key': 'M2MGWS',
 'start': '2023-04-01 01:01:01.000000',
 'end': '2023-04-01 23:59:59.999999'
}

In [4]:
# Создаю класс-коннектор к БД в шаблоне SingleTon, который обеспечивает единственное подключение
class DatabaseConnection:
    def __new__(cls, *args, **kwargs):
        if not hasattr(cls, 'instance'):
            cls.instance = super().__new__(cls)
        return cls.instance
    
    def __init__(self, params, autocommit = True):
        self.conn = psycopg2.connect(
            host = params['host'],
            port = params['port'],
            database = params['database'],
            user = params['user'],
            password = params['password']
        )
        self.conn.autocommit = autocommit
        
    def get_cursor(self):
        return self.conn.cursor()
    
    def close_connect(self):
        self.conn.close()
        
    def rollback(self):
        self.conn.rollback()
        

In [5]:
# Указываю путь, куда будут записываться логи, и количество дней, которые будут логироваться
PATH = 'E:\directoriya_Postgre\logs'
days_recorded = 3

In [6]:
# Пишу запрос к БД, который будет заполнять таблицу grader
query = """
insert into grader (user_id, oauth_consumer_key, lis_result_sourcedid, 
lis_outcome_service_url, is_correct, attempt_type, created_at) 
values (%s, %s, %s, %s, %s, %s, pg_catalog.to_timestamp(%s, 'YYYY-MM-DD HH24:MI:SS.US'))
"""

In [7]:
# Здесь указываю необходимые значения, чтобы гарантированно записывать эти данные (даже если они None) в БД
needing_values = {
    'lti_user_id':None,
    'is_correct':None,
    'attempt_type': None,
    'created_at': None,
    'oauth_consumer_key': None,
    'lis_result_sourcedid': None,
    'lis_outcome_service_url': None
}

In [12]:
class DataCarrier:
    def __init__(self, **kwargs):
        self.logger_path = kwargs.get('path', None)
        self.logs_cnt = kwargs.get('logs_cnt', 3)
        self.api_url = kwargs.get('api_url', None)
        self.paydate = kwargs.get('paydate', None)
        self.database_params = kwargs.get('params', None)
        self.autocommit = kwargs.get('autocommit', True)
        self.query = kwargs.get('query', None)
        self.needing_values = kwargs.get('needing_values', None)
        
    def set_logger_settings(self, logger_path, logs_cnt = 3):
        self.logger_path = logger_path
        self.logs_cnt = logs_cnt
        
    def set_API_settings(self, api_url, paydate):
        self.api_url = api_url
        self.paydate = paydate
    
    def set_DB_settings(self, database_params):
        self.database_params = database_params
        
    def set_query_setting(self, query, needing_values = None):
        self.query = query
        self.needing_values = needing_values
        
    def check_required_params(self):
        return all([self.logger_path, 
                   self.logs_cnt, 
                   self.api_url, 
                   self.paydate, 
                   self.database_params,
                   self.query, 
                   self.needing_values])
    
    def create_logger(self):
        try:
            logging.basicConfig(
                format='%(asctime)s %(name)s %(levelname)s: %(message)s',
                filename=f"{self.logger_path}\\{datetime.now().date()}.txt",filemode="w",
                level=logging.INFO)
            return True
        except Exception as err:
            print('Не удалось создать файл для логирования')
            return False
        
    def delete_old_logs(self):
        logging.info(f'Удаление старых логов из файла {self.logger_path}')
        last_filedates = [str(datetime.now().date() - timedelta(days = x))+'.txt' for x in range(0, self.logs_cnt)]
        fileList = glob.glob(self.logger_path+f'\*.txt')
        fileList = [path for path in fileList if path.split('\\')[-1] not in last_filedates]
        for filePath in fileList:
            try:
                logging.info('Файл {} успешно удалён'.format(filePath.split("\\")[-1]))
                os.remove(filePath)
            except Exception as err:
                logging.error('Ошибка {} при попытке удалить файл {}'.format(err, filePath.split("\\")[-1]))
                
    def close_logger(self):
        loggers = [logging.getLogger()]
        for logger in loggers:
            handlers = logger.handlers[:]  # Копируем список обработчиков
            for handler in handlers:
                handler.close()  # Закрываем обработчик
                logger.removeHandler(handler)
    
    def get_data_from_API(self):
        logging.info('Начато получение данных с API')
        r = requests.get(self.api_url, params = self.paydate)
        try:
            logging.info(f'Скачивание данных с API прошло успешно, status_code: {r.status_code}')
            self.received_API_data = r.json()
            return True
        except Exception as err:
            logging.error(f'Произошла ошибка {err}, status_code: {r.status_code}')
            return False
            
    def connect_to_DB(self):
        logging.info('Попытка соединения с базой данных')
        try:
            self.DB_connect = DatabaseConnection(self.database_params, self.autocommit)
            logging.info('Соединение прошло успешно')
            return True
        except Exception as err:
            logging.error(f'Соединение не выполнено. Ошибка {err}')
            return False
    
    def DB_connect_getter(self):
        if hasattr(self, 'DB_connect'):
            return self.DB_connect
        else:
            raise Exception('Атрибут DB_connect еще не задан')
            
    def put_data_to_DB(self):
        logging.info('Начало загрузки данных с API в Базу')
        with self.DB_connect.get_cursor() as cur:
            try:
                for row in self.received_API_data:
                    try:
                        if self.needing_values:
                            values = self.needing_values
                            values.update(row)
                        else:
                            values = row.copy()
                        passback_params = values.pop('passback_params')
                        values.update(json.loads(passback_params.replace("\'", "\"")))
                        cur.execute(self.query, (values['lti_user_id'], values['oauth_consumer_key'], 
                                        values['lis_result_sourcedid'], values['lis_outcome_service_url'],
                                        values['is_correct'], values['attempt_type'], values['created_at']))
                    except Exception as err:
                        logging.error(f'Ошибка {err} при попытке занести в базу строку {row}')
                        continue
                logging.info('Все данные успешно занесены в таблицу')
                return True
            except Exception as err:
                logging.error(f'Произошла ошибка {err}')
                return False
                
    def process(self, close_DB_connect = True):
        if not self.check_required_params():
            raise Exception('Необходимые параметры не заданы')
            
        if not self.create_logger():
            raise Exception('Ошибка при создании файла для логирования')
            
        self.delete_old_logs()
        
        if not self.get_data_from_API():
            raise Exception('Ошибка при взаимодействии с API')
            
        if not self.connect_to_DB():
            raise Exception('Ошибка при подключении к Базе Данных')
            
        if not self.put_data_to_DB():
            raise Exception('Ошибка при взаимодействии с Базой Данных')
        
        if close_DB_connect:
            self.DB_connect_getter().close_connect()
            logging.info('Автоматическое закрытие соединения с БД')
        self.close_logger()
        
        

In [13]:
connector = DataCarrier(path = PATH, 
                        api_url = api_url, 
                        paydate = paydate, 
                        params = DB_PARAMS, 
                        query = query, 
                        needing_values = needing_values)

In [14]:
connector.set_logger_settings(PATH)

In [15]:
connector.process()

# Часть вторая - агрегация данных в Google Sheets

In [43]:
CREDENTIAL_PATH = 'E:\directoriya_Postgre\creds_it-resume-project.json'
TABLE_NAME = "IT_Resume_aggregate_datas"

In [44]:
template = {
    'Дата': '''select now()::date''',
    'Совершенных попыток': '''
        select count(*)
        from grader
        where attempt_type = 'submit'
        ''',
    'Успешных попыток': '''
        select count(*)
        from grader
        where attempt_type = 'submit'
        and is_correct = 1
        ''',
    'Уникальных пользователей': '''
        select count(distinct user_id)
        from grader
        '''
}

In [45]:
class GoogleSheetsAPI:
    def __init__(self,  credential_path):
        self.account = gspread.service_account(filename = credential_path)
    
    def open_table(self, table_name):
        self.cur_table = self.account.open(table_name)
    
    def open_sheet(self, sheet = 0):
        self.wks = self.cur_table.get_worksheet(sheet)
        
    def get_sheet(self):
        return self.wks
    
    def fast_create_sheet(self, table_name, sheet = 0):
        self.open_table(table_name)
        self.open_sheet(sheet)
        return self.get_sheet()
    
    def connect_to_DataBase(self, database_params):
        self.connector = DatabaseConnection(database_params) 
    
    def set_template(self, template):
        self.template = template
    
    def write_to_sheet(self):
        self.wks.format('A:A', {'textFormat': {'bold': True}})
        with self.connector.get_cursor() as cur:
            for key in self.template.keys():
                query = template[key]
                cur.execute(query)
                self.wks.append_row([key, str(cur.fetchone()[0])])
        self.wks.columns_auto_resize(0, len(self.wks.col_values(1))+1)
    
    

In [46]:
my_sheet = GoogleSheetsAPI(CREDENTIAL_PATH)
my_sheet.fast_create_sheet(TABLE_NAME)

<Worksheet 'Лист1' id:0>

In [47]:
my_sheet.connect_to_DataBase(DB_PARAMS)
my_sheet.set_template(template)

In [48]:
my_sheet.write_to_sheet()

# Часть третья - отправка данных по почте

In [16]:
import smtplib
import ssl
from email.message import EmailMessage

In [49]:
context = ssl.create_default_context()

In [51]:
subject = "Отчет по skillfactory"
message = f'''Новый отчет по данным от Skillfactory загружен в GoogleSheets 
https://docs.google.com/spreadsheets/d/1l5eeU-IxGX0bUkCCZIswszLqTO9KbdZVNuLWUFxt_yc/edit?gid=0#gid=0'''
EMAIL_ADDRESS = 'egorhohlov999@mail.ru'
EMAIL_PASSWORD = 'r7gX8piDUzngdVxNijh5'

In [52]:
msg = EmailMessage()
msg.set_content(message)
msg['Subject'] = subject
msg['From'] = EMAIL_ADDRESS
msg['To'] = EMAIL_ADDRESS

In [53]:
server = smtplib.SMTP_SSL('smtp.mail.ru', 465)

In [54]:
server.login(EMAIL_ADDRESS, EMAIL_PASSWORD)

(235, b'Authentication succeeded')

In [55]:
server.send_message(msg=msg)

{}