In [1]:
import os
import urllib3
import copy
from dotenv import load_dotenv
from minio import Minio

In [2]:
# Define .env path
ENV_PATH = "../.env"

# Load env file
load_dotenv(ENV_PATH)

# Disable warnings
urllib3.disable_warnings()

# Define the env file
MINIO_ACCESS_KEY = os.getenv('MINIO_ACCESS_KEY_LZD')
MINIO_SECRET_ACCESS_KEY = os.getenv('MINIO_SECRET_ACCESS_KEY_LZD')
MINIO_URL = os.getenv('MINIO_URL_LZD')
MINIO_BUCKET_NAME = os.getenv('MINIO_BUCKET_NAME_LZD')
TLS = os.getenv('MINIO_TLS')

In [3]:
# Create main function
def main():
    """
    This function used to ingest file from minio to local
    
    """

    # Create minio instance
    client = Minio(
            endpoint= MINIO_URL,
            access_key=MINIO_ACCESS_KEY,
            secret_key=MINIO_SECRET_ACCESS_KEY,
            secure=TLS,
            cert_check= (not TLS)
    )

    if(not client.bucket_exists(MINIO_BUCKET_NAME)):
        print('Bucket not found!')
        return 0
    
    try:
        
        object_names = []
        
        res = client.list_objects(
            bucket_name = MINIO_BUCKET_NAME
        )
        
        for obj in res:
            object_names.append(obj.object_name)
            
        object_data = []
        
        for names in object_names:
            res = client.get_object(
                bucket_name = MINIO_BUCKET_NAME,
                object_name = names
            )
            
            object_data.append(copy.deepcopy(res.data.decode()))
            
            
        with open("../data/raw/dataset_items.csv", 'w') as f:
            f.write(object_data[0])

        with open("../data/raw/dataset_reviews.csv", 'w') as f:
            f.write(object_data[1])

        with open("../data/raw/categories.txt", 'w') as f:
            f.write(object_data[2])
        
        print('Fetching data success!')
    except Exception as e:
        print(str(e))
        
if __name__ == '__main__':
    main()

Fetching data success!
