In [9]:
##### connect to the elasticsearch server
import json
from pprint import pprint
import os
import time
import pandas as pd 
from datetime import datetime
import pathlib

from dotenv import load_dotenv
from elasticsearch import Elasticsearch

import minio
from minio.error import S3Error
from minio.commonconfig import ENABLED
from minio.versioningconfig import VersioningConfig

from minio_utils import *

##### input args
path_to_main_input = "./examples/dummy_data"    
minio_credential = "credentials.macstudio.json"
es_credential = "es_credential.json"

file_type = "bam"
input_files = [item for item in pathlib.Path(os.path.join(path_to_main_input, file_type)).glob("*.{}".format(file_type))]
input_metadata = pd.DataFrame(data = [item.name for item in input_files], columns = ["FileName"])
input_metadata["FileType"] = file_type
input_metadata["Labcode"] = input_metadata["FileName"].apply(lambda x: x.replace(".{}".format(file_type), ""))
input_metadata["path"] = ["./examples/dummy_data/{}/{}.{}".format(file_type, labcode, file_type) for labcode in input_metadata.Labcode.values]
input_metadata["project"] = "ECD"
input_metadata["sub_project"] = "ECD_read_based"
input_metadata["Date"] = datetime.now()
input_metadata["pipeline"] = "bismark_wgbs"
input_metadata = input_metadata.set_index("path")
input_metadata_dict = input_metadata.to_dict(orient = "index") # the input metadata is ready to be added to the database elasticsearch

class RDSBucket:
    def __init__(self, 
                 minio_credential, 
                 bucketName, 
                 dataProfile,
                 es_credential, 
                 versioning = True, 
                 verbose = False):
        ##### minio client
        self.minio_credential = minio_credential
        self.es_credential = es_credential
        self.bucketName = bucketName
        self.dataProfile = dataProfile
        self.versioning = versioning
        self.verbose = verbose
        
        ##### elasticsearch client
        with open(self.es_credential, 'r') as file:
            keys = json.load(file)
            
        self.es = Elasticsearch(
            "http://localhost:9200", # deployed locally, no cloud
            basic_auth=(keys["username"], keys["password"])) 
        client_info = self.es.info()
        tmp = self.es.cat.indices(index='*', h='index', s='index:asc', format='json')
        self.all_ES_indices = [index['index'] for index in tmp if index['index'][0] != "."] # not show hidden indice
        
        if self.verbose:
            print('Connected to Elasticsearch!')
            pprint(client_info.body)
            
        ##### RUN
        with open(self.minio_credential, 'r') as file:
            keys = json.load(file)
        
        minio_client = minio.Minio(
            endpoint="localhost:9000",
            access_key=keys["accessKey"],
            secret_key=keys["secretKey"],
            secure=False 
        )
        self.minio_client = minio_client
        
    def initBucket(self):
        ##### initialize a new bucket
        try:
            # Check if the bucket already exists
            exists = self.minio_client.bucket_exists(self.bucketName)
            if exists == False: 
                # Make a new bucket
                self.minio_client.make_bucket(self.bucketName)
                print(f"Bucket '{self.bucketName}' created successfully.")
                if self.versioning:
                    self.minio_client.set_bucket_versioning(self.bucketName, VersioningConfig(ENABLED))

            else:
                print(f"Bucket '{self.bucketName}' already exists. Cannot create bucket with the same name. Please choose another name")
            return True
        except S3Error as e:
            print(f"Error creating bucket: {e}")
            return False
        
    def upload_file_to_bucket(self, path_to_file, object_name, file_metadata):
        ##### add bucket name to the file metadata
        file_metadata = {**file_metadata, **{"bucket": self.bucketName}}
        ##### check if the file_metadata match the bucket's dataProfile
        if list(file_metadata.keys()) == [list(item.keys()) for item in self.dataProfile.values()][0]:
            try:
                with open(path_to_file, 'rb') as file_data:
                    file_stat = os.stat(path_to_file)
                    self.minio_client.put_object(
                        bucket_name=self.bucketName,
                        object_name=object_name,
                        data=file_data,
                        length=file_stat.st_size,
                        metadata=file_metadata
                    )
                if self.verbose:
                    print(f"File '{object_name}' uploaded successfully with metadata.")
                
                ##### if file upload successfully, add the metadata to the elasticsearch database
                if self.bucketName in self.all_ES_indices == False:
                    self.es.indices.create(index = self.bucketName,  mappings = self.dataProfile)
                self.es.index(index = self.bucketName, body = file_metadata)
                return True
            except S3Error as e:
                print(f"Error uploading file: {e}")
                return False
        else:
            raise ValueError("Cannot upload file. The file metadata does not match the bucket's data profile")

In [10]:
bam_profile = bam_profile = {
    "properties": {
        "Labcode": {
            "type": "text"
        },
        "SequencingID": {
            "type": "text"
        },
        "FileName": {
            "type": "text"
        },
        "FileType": {
            "type": "text"
        },
        "Date": {
            "type": "date"
        },
        "pipeline": {
            "type": "text"
        },
        "project": {
            "type": "text"
        },
        "sub_project": {
            "type": "text"
        },
        "ref_genome": {
            "type": "text"
        },
        "bucket": {
            "type": "text"
        }
    }
}
bamBucket = RDSBucket( 
                      minio_credential = minio_credential, 
                      bucketName = "bam", 
                      dataProfile = bam_profile,
                      es_credential = es_credential, 
                      versioning = True, 
                      verbose = False)

In [11]:
bamBucket.initBucket()

Bucket 'bam' created successfully.


True

In [16]:
path = './examples/dummy_data/bam/Yas662.bam' 
file_metadata = input_metadata_dict[path]
bamBucket.upload_file_to_bucket(path_to_file = path, 
                                object_name= ["FileName"], 
                                file_metadata = file_metadata)


ValueError: The file metadata does not match the bucket's data profile