# Data Import From MINIO server

## A. Accessing MINIO Console

Step 1: Head to this link http://10.1.32.31:9001 on your browser

Step 2: Login using your username and password

## B. Create test bucket in your name and upload few files

<img style="width: 60%" src="im/cap1.PNG">

<img style="width: 60%"  src='im/cap2.PNG'>

## C. Download data from MINIO to workspace

### 1. Import libraries

In [1]:
from minio import Minio
from dotenv import load_dotenv # add this line
import os

In [2]:
os.getcwd()

'/workspace/best_practices_1'

### 2. Create .env file with credentials 

#### Run these in the terminal

`echo "MINIO_ACCESS"=USERID >> .env`

`echo "MINIO_KEY"=PWD >> .env`

### 3. Load credentials securely

In [3]:
load_dotenv() # add this line
user = os.getenv('MINIO_ACCESS')
key = os.getenv('MINIO_KEY')

In [4]:
user

'elan'

### 4. Initialize s3 client

In [5]:
import boto3
s3 = boto3.resource('s3',
                endpoint_url="http://minio-0:9000",
                aws_access_key_id=user,
                aws_secret_access_key=key)

### 5. Load data into workspace

In [6]:
# Print buckets in data server
buckets = s3.buckets.all()
for b,bucket in enumerate(buckets):
    print(b, "Bucket: " ,bucket.name, bucket.creation_date)

0 Bucket:  abdominal-segmentation 2022-05-02 21:11:24.482000+00:00
1 Bucket:  boneage 2022-07-14 18:01:53.034000+00:00
2 Bucket:  dvcremote 2022-04-17 00:20:53.034000+00:00
3 Bucket:  elan 2022-03-22 20:04:49.117000+00:00
4 Bucket:  heart-segmentation 2022-08-10 13:27:29.411000+00:00
5 Bucket:  ike 2022-03-14 15:12:48.441000+00:00
6 Bucket:  lda-object-detection 2022-08-03 20:24:19.629000+00:00
7 Bucket:  madi 2022-03-29 18:52:05.070000+00:00
8 Bucket:  monailabel 2022-03-22 20:34:59.637000+00:00
9 Bucket:  neeraja 2022-03-11 19:20:49.062000+00:00
10 Bucket:  neerajanew 2022-03-11 19:29:13.226000+00:00
11 Bucket:  orthanc-tcv 2022-08-06 20:00:28.533000+00:00
12 Bucket:  pathology 2022-03-31 15:40:42.341000+00:00
13 Bucket:  pathologytest 2022-03-24 19:28:11.765000+00:00
14 Bucket:  pulmonary-segmentation 2022-05-05 14:43:59.849000+00:00
15 Bucket:  utilities 2022-03-25 19:53:29.863000+00:00
16 Bucket:  zach 2022-04-26 20:22:40.847000+00:00


In [7]:
from multiprocessing.pool import Pool, ThreadPool
from multiprocessing import cpu_count
from tqdm import tqdm

In [59]:
def download_file(objkey):
    dir_path = os.path.join(download_dir, os.path.dirname(objkey))
    if not os.path.exists(dir_path):
        # print('Creating dir: ', dir_path)
        os.makedirs(dir_path, exist_ok=True)
    filename = os.path.join(dir_path, os.path.basename(objkey))
    bucket.download_file(objkey, filename)  # save to same path

def downloadDirectoryFroms3(remoteDirectoryName, objectcount=None, suffix=None):
    print(
        "Attempting to download bucket, ", bucketname, " folder, ", remoteDirectoryName, ' | counting objects: '
    )
    objkeys = [obj.key for obj in tqdm(bucket.objects.filter(Prefix=remoteDirectoryName))]

    if objectcount:
        objkeys = objkeys[:objectcount]

    if suffix:
        objkeys = [o for o in objkeys if o.endswith(suffix)]
    
    print('Downloading ', len(objkeys),' Objects ...')
    
    #Parallel download
    try:
        pool = Pool(processes=int(cpu_count()))
        pool.imap_unordered(download_file, tqdm(objkeys,total=len(objkeys)))
        pool.close()
        pool.join()
    except Exception as e:
        print('Paralled download fails:', e.message, e.args)
        

    #Single core download
#     for objkey in tqdm(objkeys):
#         download_file(objkey)
    
    #Check object size and downloaded filesize are same
    print('Checking ', len(objkeys),' Objects ...')
    for objkey in tqdm(objkeys):
        #print('Processing: ', objkey)
        objsize = bucket.Object(objkey).content_length
        
        dir_path = os.path.join(download_dir, os.path.dirname(objkey))
        filename = os.path.join(dir_path, os.path.basename(objkey))
        
        filesize = 0
        if os.path.exists(filename):
            filesize = os.path.getsize(filename)
        attempt = 0
        
        # Attempt 10 times to make sure filesize is same as object size if not throw error
        #print(objkey ,' Object Size: ', objsize, ' Filesize: ', filesize)
        while (objsize != filesize and attempt < 10):
            try:
                bucket.download_file(objkey, filename)  # save to same path
            except  Exception as E:
                print('Error downloading file: ', filename)
                print('Single download fails:', e.message, e.args)
                
            if os.path.exists(filename):
                filesize = os.path.getsize(filename)
            attempt += 1

        if attempt == 10:
            print('objsize of ', objkey, ' is: ',objsize)
            print('filesize of ', objkey, ' is: ',filesize)
            raise RuntimeError('incomplete download after 10 attempts..') 

### Download your bucket

In [60]:
# Directory to download your bucket to in the workspace
download_dir = '/workspace/abdominal-segmentation/datasets'

#abdominal-segmentation\CT CCHMC\masks\exam_0_CT_1.nii

bucketname = 'abdominal-segmentation' #input('Enter minio bucket name: ')
bucket = s3.Bucket(bucketname)

In [61]:
# List Bucket contents
# for object in bucket.objects.all():
#     print(object.key, object.storage_class)

s3c = boto3.client("s3",endpoint_url="http://minio-0:9000",
                aws_access_key_id=user,
                aws_secret_access_key=key)
prefix = 'CTCCHMC/test/scans' # [ CT CCHMC-test, CT CCHMC-train]CT CCHMC,
for object_summary in bucket.objects.filter(Prefix=prefix):
    print(object_summary.key)
    break


CTCCHMC/test/scans/exam_109_CT_209.nii.gz


In [63]:
%%time
## Parameters
#remoteDirectoryName - Folder name from the bucket - leave this blank ('') if you want all folders
#objectcount - Number of objects you want downloaded, for large data, test with a smaller number of objects first.
#               Set this to None for all objects
#suffix - If you want only objects with a certain extension downloaded like '.dcm' or '.svs'
#       - None if you want all objects

os.makedirs(download_dir,exist_ok=True)
downloadDirectoryFroms3(remoteDirectoryName=prefix,objectcount=None,suffix=None)
print(download_dir ,' now has ', len(os.listdir(os.path.join(download_dir,prefix))), ' files')
#downloadDirectoryFroms3_single(s3,bucketname,'')

55it [00:00, 1044.63it/s]
  0%|          | 0/55 [00:00<?, ?it/s]

Attempting to download bucket,  abdominal-segmentation  folder,  CTCCHMC/test/scans  | counting objects: 
Downloading  55  Objects ...
Checking  55  Objects ...
Processing:  CTCCHMC/test/scans/exam_109_CT_209.nii.gz
CTCCHMC/test/scans/exam_109_CT_209.nii.gz  Object Size:  74020814  Filesize:  74020814
Processing:  CTCCHMC/test/scans/exam_10_CT_108.nii.gz
CTCCHMC/test/scans/exam_10_CT_108.nii.gz  Object Size:  14250574  Filesize:  14250574
Processing:  CTCCHMC/test/scans/exam_118_CT_219.nii.gz
CTCCHMC/test/scans/exam_118_CT_219.nii.gz  Object Size:  18821777  Filesize:  18821777
Processing:  CTCCHMC/test/scans/exam_11_CT_109.nii.gz
CTCCHMC/test/scans/exam_11_CT_109.nii.gz  Object Size:  22863376  Filesize:  22863376
Processing:  CTCCHMC/test/scans/exam_124_CT_226.nii.gz
CTCCHMC/test/scans/exam_124_CT_226.nii.gz  Object Size:  24187358  Filesize:  0


 25%|██▌       | 14/55 [00:00<00:01, 38.25it/s]

Processing:  CTCCHMC/test/scans/exam_130_CT_233.nii.gz
CTCCHMC/test/scans/exam_130_CT_233.nii.gz  Object Size:  26923271  Filesize:  26923271
Processing:  CTCCHMC/test/scans/exam_134_CT_237.nii.gz
CTCCHMC/test/scans/exam_134_CT_237.nii.gz  Object Size:  19249372  Filesize:  19249372
Processing:  CTCCHMC/test/scans/exam_135_CT_238.nii.gz
CTCCHMC/test/scans/exam_135_CT_238.nii.gz  Object Size:  20556722  Filesize:  20556722
Processing:  CTCCHMC/test/scans/exam_145_CT_28.nii.gz
CTCCHMC/test/scans/exam_145_CT_28.nii.gz  Object Size:  17605641  Filesize:  17605641
Processing:  CTCCHMC/test/scans/exam_147_CT_3.nii.gz
CTCCHMC/test/scans/exam_147_CT_3.nii.gz  Object Size:  25622654  Filesize:  25622654
Processing:  CTCCHMC/test/scans/exam_152_CT_34.nii.gz
CTCCHMC/test/scans/exam_152_CT_34.nii.gz  Object Size:  17286100  Filesize:  17286100
Processing:  CTCCHMC/test/scans/exam_155_CT_38.nii.gz
CTCCHMC/test/scans/exam_155_CT_38.nii.gz  Object Size:  23979802  Filesize:  23979802
Processing:  CTC

100%|██████████| 55/55 [00:00<00:00, 76.80it/s]

CTCCHMC/test/scans/exam_260_CT_261.nii.gz  Object Size:  21419260  Filesize:  21419260
Processing:  CTCCHMC/test/scans/exam_269_CT_270.nii.gz
CTCCHMC/test/scans/exam_269_CT_270.nii.gz  Object Size:  21719622  Filesize:  21719622
Processing:  CTCCHMC/test/scans/exam_284_CT_285.nii.gz
CTCCHMC/test/scans/exam_284_CT_285.nii.gz  Object Size:  25321988  Filesize:  25321988
Processing:  CTCCHMC/test/scans/exam_285_CT_286.nii.gz
CTCCHMC/test/scans/exam_285_CT_286.nii.gz  Object Size:  23888036  Filesize:  23888036
Processing:  CTCCHMC/test/scans/exam_287_CT_288.nii.gz
CTCCHMC/test/scans/exam_287_CT_288.nii.gz  Object Size:  25556192  Filesize:  25556192
Processing:  CTCCHMC/test/scans/exam_292_CT_294.nii.gz
CTCCHMC/test/scans/exam_292_CT_294.nii.gz  Object Size:  24494362  Filesize:  24494362
Processing:  CTCCHMC/test/scans/exam_29_CT_125.nii.gz
CTCCHMC/test/scans/exam_29_CT_125.nii.gz  Object Size:  26392423  Filesize:  26392423
Processing:  CTCCHMC/test/scans/exam_303_CT_305.nii.gz
CTCCHMC/




## D. Upload data from workspace to bucket in MINIO using s3

### 1. Define function to delete bucket if it already exists

In [16]:
from botocore.client import ClientError

In [75]:
def delete_bucket(s3, bucketname):
    try:
        s3.meta.client.head_bucket(Bucket=bucketname)
        bucket = s3.Bucket(bucketname)
        response = bucket.objects.all().delete()
        #print('Deleted bucket objects: \n',response)
        response = bucket.delete(
            ExpectedBucketOwner='string'
        )
        print('Deleted bucket: \n',bucketname)
    except ClientError:
        print('Bucket does not exist')

### 2. Set new bucket and upload folder - bucket name should be only small letters, no symbols

In [113]:
new_bucket = "monailabel"
upload_folder = "monailabel"

### 3. Delete bucket if it exists and create empty bucket - Only for test purposes - don't delete project data buckets!

In [114]:
delete_bucket(s3,new_bucket)

Deleted bucket: 
 monailabel


### 4. Upload Data to bucket

In [115]:
from multiprocessing.pool import ThreadPool 
bucket = s3.Bucket(new_bucket)

In [116]:
folderpath = upload_folder
def upload_file(full_path):
    with open(full_path, 'rb') as data:
            bucket.put_object(Key=full_path[len(folderpath)+1:], Body=data)

In [117]:
def upload_files(s3,bucketname,folderpath):
    if not bucket.creation_date:
        s3.create_bucket(Bucket=bucketname)
    all_paths = []
    for subdir, dirs, files in os.walk(folderpath):
        full_paths = [os.path.join(subdir, file) for file in files]
        all_paths = all_paths + full_paths
        
    pool = ThreadPool(processes=2*len(all_paths))
    pool.map(upload_file,all_paths)
    #pool.join()
    pool.close()

In [118]:
%%time

upload_files(s3,new_bucket,upload_folder)

print("Upload Done")

Upload Done
CPU times: user 17.3 s, sys: 20.4 s, total: 37.7 s
Wall time: 16 s


### 5. Go to MINIO Console and check if data is uploaded

<img style="width: 40%" src="im/cap3.PNG">