In [1]:
#------------------------------------------------------------------------------
# This script upload local files to a blob in the azure cloud.
# It needs account information:
#   - Account name.
#   - Account key.
# It needs the blob container information
#   - Container name
#   - Container sub-directory
#------------------------------------------------------------------------------
import os, uuid, sys
import subprocess
import tqdm
from tqdm import tqdm_notebook
import astropy
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import time
import multiprocessing

import pyarrow as pa
import pyarrow.parquet as pq

from io import BytesIO
from astropy.io import fits
from functools import partial
from azure.storage.blob import BlockBlobService, PublicAccess

In [2]:
import config_blob_keys as cfg
#!pip install azure-storage-blob
# Create the BlockBlockService that is used to call the Blob service 
# for the storage account
account_name = cfg.AccountName
account_key = cfg.AccountKey
block_blob_service = BlockBlobService(account_name=account_name, account_key=account_key)

cont_name_unc = cfg.ContNameUnC
block_blob_service.set_container_acl(cont_name_unc, public_access=PublicAccess.Container)

cont_name_proc = cfg.ContNameProc
block_blob_service.set_container_acl(cont_name_proc, public_access=PublicAccess.Container)

# Number of workers
NumberWorkers=10

In [30]:
# Create a list "filelist" with the blob content
# inside the "Azure:container/folder" location 
def BlobList(container, folder, filelist, verbose=False):
    
    gen = block_blob_service.list_blobs(container, prefix=folder)
    
    for blob in gen:
        file = str(blob.name).replace(folder,'')
        filelist.append(file)
        if verbose == True:
            print("\t Blob name: " + blob.name)
        
    return filelist

# Download a file "blobfile" from "container" and save it 
# in the file "locfile"
def DownBlob(container, blobfile, locfile, verbose=False):
    
    if verbose == True:
        print('Downloading ' + blobfile + ' to ' + locfile)
    
    block_blob_service.get_blob_to_path(container, blobfile, locfile)

# Uncompress data 
def UnCompress(file, verbose=False):
    
    if verbose == True:
        print('Uncompressing ' + file)
    
    subprocess.call(['uncompress', file])
    #os.popen('uncompress ' + file)

# Upload file "locfile" to the blob "blobfile" in container
def UpBlob(container, blobfile, locfile, verbose=False):
    
    if verbose == True:
        print('Uploading ' + locfile + ' to ' + blobfile)
        
    block_blob_service.create_blob_from_path(container, blobfile, locfile, validate_content=True)
    
# Decompose .fits files  from uncompressed in numpy arrays containing hdu.data data
# and upload the data to processed/numpy
def TransformFits(path_loc, unc_blob_sub_dir, npy_blob_sub_dir, file, verbose=False, format='numpy'):       
    # Download the data from uncompressed
    unc_blob_name = os.path.join(unc_blob_sub_dir,file)
    path_to_file_loc = os.path.join(path_loc, file)

    DownBlob(cont_name_unc, unc_blob_name, path_to_file_loc, False)
    
    while not os.path.exists(path_to_file_loc):
        time.sleep(0.1)
    
    # Extract data from fits
    if format=='numpy':
        format_dir='numpy'

        hdu_list = fits.open(path_to_file_loc)
        
        orig_file_name = hdu_list[0].header['ORIGFILE']
        
        if verbose == True:
            print('File name = ' + orig_file_name)  
            
        npy_folder = ''.join([npy_blob_sub_dir[i] for i in range(len(npy_blob_sub_dir)) if npy_blob_sub_dir[i] in orig_file_name])
        
        try:
            for ext, hdu in enumerate(hdu_list):
                if verbose == True:
                    print("\nProcessing index: ", ext)

                #print((hdu.header)[0:100])
                try:
                    header_naxis=hdu.header['NAXIS']
                except:
                    header_naxis=None

                #if header_value != None:
                try:
                    data_array = np.array(hdu.data)
                except:
                    data_array = np.array(0)

                #print("Extension: ", ext, " - Data shape: ", data_array.shape)
                if header_naxis==2 & len(data_array.shape)==2:
                    if verbose == True:
                        print("\nProcessing index: ", ext)
                    loc_np_file = path_to_file_loc.replace('.fits','')
                    loc_np_file = loc_np_file + '_ext' + str(ext) + '.npy'
                    #data_array = np.array(hdu.data)

                    if verbose == True:
                        print('\nSaving: ' + loc_np_file)
                    np.save(loc_np_file, data_array)

                    # Upload the data
                    while not os.path.exists(loc_np_file):
                        time.sleep(0.1)

                    if verbose == True:
                        statinfo=os.stat(loc_np_file)
                        print("File size {} MB".format(statinfo.st_size/1024**2))

                    np_file = file.replace('.fits','.npy')#orig_file_name.replace('.fits','')
                    #np_file = np_file + '_ext' + str(ext) + '.npy'
                    #np_file = np_file + '.npy'
                    ext_folder = 'ext' + str(ext)
                    npy_blob_name = os.path.join(format_dir, os.path.join(npy_folder, ext_folder))
                    npy_blob_name = os.path.join(npy_blob_name, np_file)
                    #print('Blob name = ' + npy_blob_name)
                    #print('Loc name  = ' + loc_np_file)
                    UpBlob(cont_name_proc, npy_blob_name, loc_np_file, False)

                    # Remove uploaded array
                    os.remove(loc_np_file)
                elif header_naxis==2:
                    print("Corrupted file - blob_name: ", unc_blob_name)
                    with open('transform_fits_corrupted.lst', 'a') as fd:
                        fd.write(unc_blob_name +'\n')

            hdu_list.close()
        except:
            print("Check file - blob_name: ", unc_blob_name)
            with open('transform_fits_check.lst', 'a') as fd:
                fd.write(unc_blob_name +'\n')
            
                
    elif format=='parquet':
        format_dir='parquet'

        hdu_list = fits.open(path_to_file_loc)
        for ext, hdu in enumerate(hdu_list):
            print("\nProcessing index: ", ext)
            try:
                header_value=hdu.header['XTENSION']
            except:
                header_value=None
        hdu_list.close()
        
    # Remove .fits        
    os.remove(path_to_file_loc)

In [31]:
# Script to dowload the uncompressed data and transform the images 
# into numpy arrays for each image extension 

# Flags for checks
check_from_file = False
check_from_list = False

uncBlobSubDirs = ['bias_blue', 'bias_red', 'blue_arc_flat','red_arc_flat']
#Test
#uncBlobSubDirs = ['red_arc_flat']
path_loc = '../Temp'

start_time_out = time.time()

# loof for uncompressed folders
for unc_blob_sub_dir in uncBlobSubDirs:
    
    # Define the subdirs to be created in /numpy based on the type of images
    if unc_blob_sub_dir == 'bias_blue':
        npy_blob_sub_dir = ['UVES_BLUE_BIAS']
    elif unc_blob_sub_dir == 'bias_red':
        npy_blob_sub_dir = ['UVES_RED_BIAS']
    elif unc_blob_sub_dir == 'blue_arc_flat':
        npy_blob_sub_dir = ['UVES_BLUE_WAVE','UVES_DIC1B_FLAT','UVES_DIC1B_DFLAT']
    elif unc_blob_sub_dir == 'red_arc_flat':
        npy_blob_sub_dir = ['UVES_RED_WAVE','UVES_DIC1R_FLAT']
    
    # List the uncompressed data
    unc_files_list = []
    unc_folder_rem = unc_blob_sub_dir + '/'

    BlobList(cont_name_unc, unc_folder_rem, unc_files_list)
    #unc_files_list = unc_files_list[0:1]

    #if check_from_list:
    #    fits_files = []
    #    BlobList(container_name_fits, folder_rem, fits_files)
    #    fits_files = [file.replace('.fits','.fits.Z') for file in fits_files]
    #    Z_files = [file for file in Z_files if file not in fits_files]

    if check_from_file:
        corr_file = open('./transform_fits_all_check.txt','r')
        corr_files = corr_file.readlines()
        corr_files = [file.replace('\n','') for file in corr_files]
        for file in corr_files:
            corr_list = file.split('/')
            if corr_list[0] == unc_blob_sub_dir:
                unc_files_list.append(corr_list[1])
    
    #Test
    #unc_files_list= unc_files_list[0:10]
    
    start_time_dir= time.time()

    if unc_files_list:
        print('Working on ' + unc_blob_sub_dir + '...')

        tasks = partial(TransformFits, path_loc, unc_blob_sub_dir, npy_blob_sub_dir)
        with multiprocessing.Pool(NumberWorkers) as p:
            result = list(tqdm.tqdm_notebook(p.imap(tasks, unc_files_list), total=len(unc_files_list)))

    end_time_dir = time.time()
    total_time_dir = end_time_dir - start_time_dir
    print('Total dir time: ', total_time_dir)

end_time = time.time()
total_time_out = end_time - start_time_out
print('Total out time: ', total_time_out)

Working on bias_blue...


HBox(children=(IntProgress(value=0, max=16227), HTML(value='')))

Total dir time:  891.9518530368805
Working on bias_red...


HBox(children=(IntProgress(value=0, max=16591), HTML(value='')))

Total dir time:  2271.333835840225
Working on blue_arc_flat...


HBox(children=(IntProgress(value=0, max=9861), HTML(value='')))

Total dir time:  798.6886813640594
Working on red_arc_flat...


HBox(children=(IntProgress(value=0, max=6865), HTML(value='')))

Total dir time:  1081.1292214393616
Total out time:  5080.136281490326
