In [79]:
import os
import yaml
import json
import gridfs
import joblib
import io
from typing import Text, Dict
from pymongo import MongoClient

In [21]:
def read_config():
    """
    Чтение конфигурационного файла проекта
    :return: словарь настроек
    """

    config_path = '../train/config/params.yml'
    config = yaml.load(open(config_path), Loader=yaml.FullLoader)

    return config

In [27]:
def db_connection():
    """
    Подключение к базе данных MongoDb
    :return: экземпляр базы данных IceCube
    """
    config = read_config()
    endpoints = config['host_aliases']
    db_config = config['database']

    db_host = os.getenv(endpoints['train']['host']['alias'], endpoints['train']['host']['default'])
    db_port = os.getenv(endpoints['train']['port']['alias'], endpoints['train']['port']['default'])
    db_user = db_config['username']
    db_password = db_config['password']

    db_url = f'mongodb://{db_user}:{db_password}@{db_host}:{db_port}/'
    client = MongoClient(db_url)
    db_name = db_config['name']
    db = client[db_name]

    return db

In [28]:
def insert_file(file_name: Text, file_path: Text, replace: bool = False):
    """
    Вставка файла в БД Mongo с возможностью замены
    :param file_path: относительный путь к файлу
    :param replace: флаг необходимости замены файла
    :param file_name: имя файла, под которым он будет сохранен в БД
    """
    db = db_connection()
    fs = gridfs.GridFS(db)
    files = fs.find({'filename': file_name})
    # Check if the file already exists
    if replace:
        for file in files:
            fs.delete(file._id)

    # Open the file in read mode and insert into GridFS
    with open(file_path, 'rb') as file_data:
        fs.put(file_data, filename=file_name)

In [66]:
def insert_data(collection_name: Text, object_name: Text, contents: dict, replace: bool = False):
    """
    Вставка текстовых данных в БД Mongo с возможностью замены
    :param collection_name: имя коллекции в БД
    :param object_name: наименование объекта
    :param contents: содержимое объекта для записи
    :param replace: флаг необходимости замены файла
    """
    db = db_connection()
    filter_criteria = {'type': object_name}
    collection = db[collection_name]
    if replace:
        collection.delete_many(filter_criteria)
    result = collection.insert_one({'type': object_name, 'contents': contents})

    return result

In [74]:
def request_data(collection_name: Text, object_name: Text):
    """
    Чтение текстовых данных из БД Mongo
    :param collection_name: имя коллекции в БД
    :param object_name: наименование объекта
    """
    db = db_connection()
    filter_criteria = {'type': object_name}
    collection = db[collection_name]
    result = collection.find_one(filter_criteria)
    if result:
        return json.loads(result['contents'])
    else:
        return None

In [None]:
def request_joblib(filename):
    db = db_connection()
    fs = gridfs.GridFS(db)
    file_data = fs.find_one({'filename': filename})

    if file_data:
        file_buffer = io.BytesIO(file_data.read())
        loaded_object = joblib.load(file_buffer)
        return loaded_object
    else:
        return None

In [48]:
insert_file('compose.yml', '../docker-compose.yaml', True)

In [76]:
insert_data('train', 'metrics', json.dumps({'a': 1, 'b': 2}), True)

InsertOneResult(ObjectId('667ed3031e5cd0b1c9c2c903'), acknowledged=True)

In [77]:
data = get_data('train', 'metrics')
data

{'a': 1, 'b': 2}