Глобально, ваша задача на итоговый проект - написать скрипт, в котором:

Будет происходить обращение к нашему API для получения данных

Данные будут обрабатываться и готовиться к загрузке в базу данных

Обработанные данные будут загружаться в локальную базу PostgreSQL, которую вы развернули на своем компьютере или в облаке

Во время обработки будет сохраняться лог работы скрипта с отлавливанием всех ошибок и выводом промежуточных стадий (например, скачивание началось / скачивание завершилось / заполнение базы началось и т.д., с трекингом времени). Лог нужно сохранять в текстовый файл. Файл нужно именовать в соответствии с текущей датой. Если в папке с логами уже есть другие логи - их необходимо удалять, оставляем только логи за последние 3 дня.

### Init Block

In [16]:
# imports
from datetime import date, timedelta, datetime
from pathlib import Path
import re
import logging
from dotenv import dotenv_values
import requests
import ast

import psycopg2
from psycopg2 import sql
from psycopg2.extras import execute_values, DictCursor
from dotenv import dotenv_values

import gspread
from google.oauth2.service_account import Credentials

import smtplib
import ssl
from email.message import EmailMessage

ENV_FILE=dotenv_values('.env')

# -------------Bamboo PostgreSQL DB config-------------
BAMBOO_DB_DSN = ENV_FILE.get('DB_DSN_bamboo')

# -------------Google Sheets config-------------
GS_CREDS_FILE = Path('bamboo_service_account.json')  # path to json google key

DASHBOARD_SPREADSHEET_ID = "1U5ApJgj2_4CWYd_7nV1_LUvUL1aKmU8dB96OlLSd170"      # id from URL
DASHBOARD_WORKSHEET_NAME = "Лист1"                         # list name
DASHBOARD_SCOPES = ["https://www.googleapis.com/auth/spreadsheets"] # rights request

# -------------API and download period config-------------

# today time period
download_time_start =f"{date.today()} 00:00:00.000000"
download_time_end=f"{date.today()} 23:59:59.999999"

#fixed time period 
# download_time_start ='2025-01-06 12:46:47.860798'
# download_time_end='2026-01-06 13:50:47.860798'

ITRESUME_URL = "https://b2b.itresume.ru/api/statistics"
ITRESUME_SKILLFACTORY_PARAMS={
'client': 'Skillfactory',
'client_key':ENV_FILE.get('CLIENT_KEY_Skillfactory'),
'start':download_time_start,
'end':download_time_end
}

# -------------GMAIL config-------------
BARROCO_SMTP = 'smtp.gmail.com'
BARROCO_EMAIL_PORT= 465
BARROCO_ACCOUNT_PASSWORD = ENV_FILE.get('GOOGLE_APP_PASS')
BARROCO_EMAIL = 'unpocobarroco@gmail.com'
BARROCO_EMAIL_NAME = 'Bamboo Project'

### log files and log directory manage

In [17]:
today_date=date.today()
keep_limit_days=3
keep_limit_date=today_date-timedelta(days=keep_limit_days)

log_dir = Path("logs")
log_dir.mkdir(parents=True, exist_ok=True)

for p in log_dir.iterdir():
    
    if not p.is_file():
        continue
    
    filename=p.name
    match_date_in_name=re.search(r"\d{4}-\d{2}-\d{2}",filename)
    
    if not match_date_in_name:
        continue
    iso_date_in_name=date.fromisoformat(match_date_in_name.group())
    
    if iso_date_in_name > keep_limit_date:
        continue
    
    p.unlink()

    
print("OK")

OK


### Logging settings
- force=True - гарантия создания нового файла при смене даты

In [18]:
log_file = log_dir / f"logs ({today_date}).txt"
logging.basicConfig(filename=log_file,level=logging.INFO, filemode='a', format="%(asctime)s %(name)s %(levelname)s: %(message)s", force=True)


print("OK")

OK


### Downloading data by API

In [19]:
try:
    logging.info("----------- New ETL started -----------")
    logging.info("Start downloading data by API")
    
    r=requests.get(ITRESUME_URL, params=ITRESUME_SKILLFACTORY_PARAMS)
    rows_income_cnt=len(r.json())
    
    logging.info(f"Downloading data by API completed successfully. {rows_income_cnt} rows total")
    
except:
    logging.exception("Downloading data by API failed")
    

r

<Response [200]>

### Visual control check of the raw data

In [20]:
r.json()

[{'lti_user_id': 'd17982fdf1a034ef99c2e397f0666066',
  'passback_params': "{'oauth_consumer_key': '', 'lis_result_sourcedid': 'course-v1:SkillFactory+SQL2.0+31AUG2020:lms.skillfactory.ru-1e20086e056d48cba2c5b081833fb825:d17982fdf1a034ef99c2e397f0666066', 'lis_outcome_service_url': 'https://lms.skillfactory.ru/courses/course-v1:SkillFactory+SQL2.0+31AUG2020/xblock/block-v1:SkillFactory+SQL2.0+31AUG2020+type@lti+block@1e20086e056d48cba2c5b081833fb825/handler_noauth/grade_handler'}",
  'is_correct': None,
  'attempt_type': 'run',
  'created_at': '2026-01-08 00:09:21.034528'},
 {'lti_user_id': 'be6529349275ed140519da244249f30f',
  'passback_params': "{'oauth_consumer_key': '', 'lis_result_sourcedid': 'course-v1:SkillFactory+DSPRMGU+2023_FEB:lms.skillfactory.ru-025a4a2b14014fe08e4201ace5008340:be6529349275ed140519da244249f30f'}",
  'is_correct': None,
  'attempt_type': 'run',
  'created_at': '2026-01-08 00:09:51.113642'},
 {'lti_user_id': 'be6529349275ed140519da244249f30f',
  'passback_para

### Validate and transform of downloaded data

In [21]:
def string_to_dict(string):
    try:
        return ast.literal_eval(string)
        
    except:
        logging.warning(f"Not able to convert {string} to dict format")
        raise ValueError("String-to-dict Convertation Error")
        

def convert_to_date(string):
    try:
        return datetime.fromisoformat(string)
    
    except:
        logging.warning(f"Not able to convert {string} to ISO-format.")
        raise ValueError("Date Convertation Error")

def validate_return_value(value, accepted_vals: tuple | None = None, class_or_tuple = None):
    if accepted_vals is not None and (value not in accepted_vals):
        logging.warning(f"Value Validation Error: {value} is not in {accepted_vals}")
        raise ValueError ("Value Validation Error")
    
    if class_or_tuple is not None and not isinstance(value, class_or_tuple):
        logging.warning(f"Type Validation Error: {value} doesn`t fit {class_or_tuple}")
        raise ValueError("Type Validation Error")
    
    return value

def hook(obj):
    
    try:
        # extract the vals according the correct structure as it is
        user_id = obj['lti_user_id']
        oauth_consumer_key = string_to_dict(obj['passback_params']).get('oauth_consumer_key', None)
        lis_result_sourcedid = string_to_dict(obj['passback_params']).get('lis_result_sourcedid', None)
        lis_outcome_service_url = string_to_dict(obj['passback_params']).get('lis_result_sourcedid', None)
        is_correct = obj['is_correct']
        attempt_type = obj['attempt_type']
        created_at = obj['created_at']
        
        # validate and put to the structure
        plain_dict = {
            'user_id':validate_return_value(user_id, class_or_tuple=str), # строковый айди пользователя
            'oauth_consumer_key': validate_return_value(oauth_consumer_key, class_or_tuple=(str, type(None))), # уникальный токен клиента
            'lis_result_sourcedid': validate_return_value(lis_result_sourcedid, class_or_tuple=(str, type(None))), # ссылка на блок, в котором находится задача в ЛМС
            'lis_outcome_service_url': validate_return_value(lis_outcome_service_url, class_or_tuple=(str, type(None))), # URL адрес в ЛМС, куда мы шлем оценку
            'is_correct': validate_return_value(is_correct, accepted_vals=(0, 1, None), class_or_tuple=(int, type(None))), # была ли попытка верной (null, если это run)
            'attempt_type': validate_return_value(attempt_type, accepted_vals=('run', 'submit'), class_or_tuple=str), # ран или сабмит
            'created_at': convert_to_date(created_at)  # дата и время попытки
            }
        
        return plain_dict
    
    except:
        logging.warning(f"Above fail araised in: created_at {created_at}, user_id {user_id}")
        
        return

try:    
    logging.info("Rows transformation and validation (row data stage 1) started")    
    user_solutions=r.json(object_hook=hook)
    
except:
    logging.exception("Rows transformation and validation (row data stage 1) failed") 

user_solutions

[{'user_id': 'd17982fdf1a034ef99c2e397f0666066',
  'oauth_consumer_key': '',
  'lis_result_sourcedid': 'course-v1:SkillFactory+SQL2.0+31AUG2020:lms.skillfactory.ru-1e20086e056d48cba2c5b081833fb825:d17982fdf1a034ef99c2e397f0666066',
  'lis_outcome_service_url': 'course-v1:SkillFactory+SQL2.0+31AUG2020:lms.skillfactory.ru-1e20086e056d48cba2c5b081833fb825:d17982fdf1a034ef99c2e397f0666066',
  'is_correct': None,
  'attempt_type': 'run',
  'created_at': datetime.datetime(2026, 1, 8, 0, 9, 21, 34528)},
 {'user_id': 'be6529349275ed140519da244249f30f',
  'oauth_consumer_key': '',
  'lis_result_sourcedid': 'course-v1:SkillFactory+DSPRMGU+2023_FEB:lms.skillfactory.ru-025a4a2b14014fe08e4201ace5008340:be6529349275ed140519da244249f30f',
  'lis_outcome_service_url': 'course-v1:SkillFactory+DSPRMGU+2023_FEB:lms.skillfactory.ru-025a4a2b14014fe08e4201ace5008340:be6529349275ed140519da244249f30f',
  'is_correct': None,
  'attempt_type': 'run',
  'created_at': datetime.datetime(2026, 1, 8, 0, 9, 51, 11364

### Get rid of the skipped rows ("None`s") in the list of user solutions 
- clearing nearly in-place to safe RAM

In [22]:
try:
    logging.info("Non-working rows clearing (row data stage 2) started")
    user_solutions[:]=[x for x in user_solutions if x is not None]

    rows_transformed_cnt=len(user_solutions)
    logging.info(f"Data transformation stage 1 and 2 completed ({rows_transformed_cnt} rows of {rows_income_cnt} done)")

except:
    logging.exception("Non-working rows clearing (row data stage 2) failed")


user_solutions

[{'user_id': 'd17982fdf1a034ef99c2e397f0666066',
  'oauth_consumer_key': '',
  'lis_result_sourcedid': 'course-v1:SkillFactory+SQL2.0+31AUG2020:lms.skillfactory.ru-1e20086e056d48cba2c5b081833fb825:d17982fdf1a034ef99c2e397f0666066',
  'lis_outcome_service_url': 'course-v1:SkillFactory+SQL2.0+31AUG2020:lms.skillfactory.ru-1e20086e056d48cba2c5b081833fb825:d17982fdf1a034ef99c2e397f0666066',
  'is_correct': None,
  'attempt_type': 'run',
  'created_at': datetime.datetime(2026, 1, 8, 0, 9, 21, 34528)},
 {'user_id': 'be6529349275ed140519da244249f30f',
  'oauth_consumer_key': '',
  'lis_result_sourcedid': 'course-v1:SkillFactory+DSPRMGU+2023_FEB:lms.skillfactory.ru-025a4a2b14014fe08e4201ace5008340:be6529349275ed140519da244249f30f',
  'lis_outcome_service_url': 'course-v1:SkillFactory+DSPRMGU+2023_FEB:lms.skillfactory.ru-025a4a2b14014fe08e4201ace5008340:be6529349275ed140519da244249f30f',
  'is_correct': None,
  'attempt_type': 'run',
  'created_at': datetime.datetime(2026, 1, 8, 0, 9, 51, 11364

### Make class for working with DB

In [23]:
try:   
    class DB_Ops:
        def __init__(self, db_dsn:str ):
            self.dsn=db_dsn
            
        def make_table(self, schema, table): 
            with psycopg2.connect(self.dsn) as conn:
                with conn.cursor() as cur:
                    
                    query=sql.SQL(
                    """
                    CREATE TABLE IF NOT EXISTS {} (
                        id BIGINT GENERATED ALWAYS AS IDENTITY PRIMARY KEY
                        , user_id TEXT NOT NULL
                        , oauth_consumer_key TEXT NOT NULL
                        , lis_result_sourcedid TEXT NOT NULL
                        , lis_outcome_service_url TEXT NOT NULL
                        , is_correct INT CHECK (is_correct in (0, 1) OR is_correct is NULL)
                        , attempt_type TEXT CHECK (attempt_type in ('run', 'submit')) NOT NULL
                        , created_at TIMESTAMP NOT NULL
                        , CONSTRAINT unique_check UNIQUE (user_id, created_at)
                    )
                    """).format(sql.Identifier(schema, table))
                    
                    cur.execute(query)
        
        def insert_data_to_table(self, schema, table, data):
            with psycopg2.connect(self.dsn) as conn:
                with conn.cursor() as cur:
            
                    query=sql.SQL(
                    """
                    INSERT INTO {} (user_id, oauth_consumer_key, lis_result_sourcedid, lis_outcome_service_url, is_correct, attempt_type, created_at )
                    VALUES %s
                    ON CONFLICT ON CONSTRAINT unique_check DO NOTHING
                    """).format(sql.Identifier(schema, table))
                    
                    execute_values(cur, query, data, page_size=1000)
                
        def get_headers(self, schema, table):
            with psycopg2.connect(self.dsn) as conn:
                with conn.cursor() as cur:
                    
                    query=sql.SQL(
                    """
                    SELECT * FROM {} LIMIT 0
                    """).format(sql.Identifier(schema, table))
                    
                    cur.execute(query)
                    
                    return [col.name for col in cur.description]
                
        def query_fetchall_dict(self, query):
            with psycopg2.connect(self.dsn) as conn:
                with conn.cursor(cursor_factory=DictCursor) as cur:
                    
                    cur.execute(query)
                    return cur.fetchall()
except:
    logging.exception("Construction of DB operation class failed")
    
    
print("OK")

OK


### Creating a table in DB if doesn`t exist + put the info into table

In [24]:
try:
    logging.info("Start communication with Database")

    bamboo=DB_Ops(BAMBOO_DB_DSN)

    logging.info("Start checking/creating table")
    bamboo.make_table('public', 'solutions')
    
    logging.info("Start insert-tuples list preparing")  
    
    tuples_to_insert=[(
        row["user_id"],
        row["oauth_consumer_key"],
        row["lis_result_sourcedid"],
        row["lis_outcome_service_url"],
        row["is_correct"],
        row["attempt_type"],
        row["created_at"]
    ) for row in user_solutions]
    
    logging.info("Start inserting data to table")
    bamboo.insert_data_to_table('public', 'solutions', tuples_to_insert)
    
    logging.info("Communication with Database completed successfully") 

except:
    logging.exception("DB communications failed")
    

print("OK")

OK


### Prepare aggregated data dict, using DB

In [25]:
try:    
	logging.info("SQL query construction starts")

	db_dic_list= bamboo.query_fetchall_dict(
	""" 
	WITH 
		first_seen AS (
			SELECT 
				min(date(created_at)) AS study_date
				, user_id
			FROM solutions
			GROUP BY user_id
		),
		
		new_users AS (
			SELECT
				study_date
				, count(user_id) AS new_users_cnt
			FROM first_seen
			GROUP BY study_date
		),

		solutions_metrics AS (
			SELECT 
				date(created_at) AS study_date
				, count(*) AS attempts_per_day_total
				, count(*) FILTER(WHERE is_correct=1) AS correct_attempts_per_day
				, count(DISTINCT user_id) AS unique_users_per_day	
			FROM solutions
			GROUP BY study_date
			ORDER BY study_date
		)
			
	SELECT 
		to_char(sm.study_date, 'yyyy-mm-dd') AS study_date
		, attempts_per_day_total
		, correct_attempts_per_day
		, unique_users_per_day	
		, COALESCE(nu.new_users_cnt, 0) AS new_users_per_day
	FROM solutions_metrics sm
	LEFT JOIN new_users nu USING(study_date)
	ORDER BY study_date
	"""
	)
	
	logging.info("SQL query was built successfully")

except:
    logging.exception("SQL query construction failed")


db_dic_list

[['2025-12-25', 119, 54, 10, 10],
 ['2026-01-05', 546, 169, 36, 36],
 ['2026-01-06', 987, 264, 57, 41],
 ['2026-01-07', 911, 199, 44, 22],
 ['2026-01-08', 61, 6, 3, 2]]

### Google Sheets credentials

In [26]:
try:
    # use credentials
    creds = Credentials.from_service_account_file(
        GS_CREDS_FILE,
        scopes=DASHBOARD_SCOPES
    )

    # client authorization
    gc = gspread.authorize(creds)

    # table and list open
    sh = gc.open_by_key(DASHBOARD_SPREADSHEET_ID)
    ws = sh.worksheet(DASHBOARD_WORKSHEET_NAME)


    # check requests are ok
    res=ws.get_values("A1:A1")

except:
    logging.exception("Using of Google Sheets credentials failed")
    
    
print("Connection OK! Test get_values A1:", res)

Connection OK! Test get_values A1: [['study_date']]


### Sheet headers prepare directly in WS
- get existing Google ws headers, 
- check if headers are the same as in DB, add new headers from DB if needed (nothing deletes)

In [27]:
try:
    ws_headers=ws.row_values(1)
    query_headers=db_dic_list[0].keys()

    add_headers=[header for header in query_headers if header not in ws_headers]
        
    if add_headers:
        ws_headers.extend(add_headers)
        ws.update(range_name='1:1', values=[ws_headers])

except:
    logging.exception("Headers preparing in WS failed")
 
        
ws_headers

['study_date',
 'attempts_per_day_total',
 'correct_attempts_per_day',
 'unique_users_per_day',
 'new_users_per_day']

### Add aggregated data to WS in Google Sheets
- If date exists- row replace, if not - row append. 
- Can find the date-column dinamicaly. 
- The row structure automatically fits the Google ws neww structure.
- Lines qty doesn`t validate
- Columns width doesn`t change
- Columns with same name doesn`t validate - just change both

In [28]:
try:
    logging.info("Data appending to Google WS started")
    
    dates_in_ws=ws.col_values(ws_headers.index('study_date')+1)

    for dic in db_dic_list:
        row_to_add=[dic.get(key) for key in ws_headers]
        row_date=dic['study_date']
        
        if row_date in dates_in_ws:
            ws.update(range_name=f"A{dates_in_ws.index(row_date)+1}", values=[row_to_add], value_input_option="USER_ENTERED")
            
        else:
            ws.append_row(row_to_add, value_input_option="USER_ENTERED")
            
    logging.info("Data appending to Google WS completed successfully")

except:
    logging.exception("Data appending to Google WS failed")
    

print("OK")

OK


### Creating email server connection class
- SSL connection with SMTP
- Use `with...as` to ensure SMTP-session is closed

In [29]:
try:
    class Mail_Server_SSL:
        
        context = ssl.create_default_context()
        
        def __init__(self, smtp_server:str, ssl_port:int, account_password:str, sender_email:str, sender_name:str):
            self.account_password=account_password
            self.smtp_server = smtp_server
            self.ssl_port = ssl_port
            self.sender_email = sender_email
            self.sender_name=sender_name
            
        def send_message(self, recipient_email, subject, message):
            msg = EmailMessage()
            
            msg['Subject'] = subject
            msg['From'] = f"{self.sender_name} <{self.sender_email}>"
            msg['To'] = recipient_email
            msg.set_content(message)
            
            
            with smtplib.SMTP_SSL(self.smtp_server, self.ssl_port, context=Mail_Server_SSL.context) as server:
                server.login(self.sender_email, self.account_password)
                server.send_message(msg)
                
            return msg
            
except:
    logging.exception("Construction of mail server class failed")
    
    
print("OK")

OK


### Send message via email

In [30]:
try:
    logging.info("Email message preparing started")
    
    gmail = Mail_Server_SSL(BARROCO_SMTP, BARROCO_EMAIL_PORT, BARROCO_ACCOUNT_PASSWORD, BARROCO_EMAIL, BARROCO_EMAIL_NAME)

    message=f"""
    Dear all,
    
    Thanks for following our team digest!
    
    Todays ETL script was runned successfully:
    {rows_transformed_cnt} rows were extracted by API,
    {rows_income_cnt} rows were transformed and added to DB.
    
    The dashboard Google Sheet was also updated successfully.
    
    BR,
    Bamboo team
    """

    msg = gmail.send_message('fluxor@atomicmail.io', "Work done", message)
    
    logging.info("Email message was sent successfully")

except:
    logging.exception("Email message sending was failed")
    
    
print(msg)

Subject: Work done
From: Bamboo Project <unpocobarroco@gmail.com>
To: fluxor@atomicmail.io
Content-Type: text/plain; charset="utf-8"
Content-Transfer-Encoding: 7bit
MIME-Version: 1.0


    Dear all,

    Thanks for following our team digest!

    Todays ETL script was runned successfully:
    61 rows were extracted by API,
    61 rows were transformed and added to DB.

    The dashboard Google Sheet was also updated successfully.

    BR,
    Bamboo team
    

