In [1]:
import os
import sys
import json
import shutil
import urllib.parse

import logging
from logging.handlers import RotatingFileHandler

from azure.storage.blob import BlobServiceClient, BlobClient


logger = logging.getLogger(__name__)
logger.setLevel(logging.DEBUG)

# Define log format
log_file = 'blob_log.log' #I defined the log file before logger and the logger stopped working
log_format = '%(asctime)s - %(levelname)s - %(name)s - %(module)s - %(lineno)d - %(message)s'
formatter = logging.Formatter(log_format)

if not logger.handlers:
    # Define log file and handler
    file_handler = logging.FileHandler(log_file)
    file_handler.setLevel(logging.INFO)
    file_handler.setFormatter(formatter)
    logger.addHandler(file_handler)
    logger.propagate = False
    # logging.disable(logging.DEBUG)

### Upload Local files to Blob Storage

In [2]:
'''Script to work your way around blob '''

# The credentials are stored as environment variable using json
# Using only BlobServiceClient for all container and blob interactions


class AzureStorageManipulation:
    

    def __init__(self, json_path='var.json'):
        
        self.config_path = json_path
        self.config_file = None
        self.blob_service_client = None
        self.logger = logging.getLogger(__name__)

        
    def load_config(self):

        try:
            with open(self.config_path, 'r') as file:
                self.config_file = json.load(file)
        except Exception as e:
            self.logger.error("Failed to load configuration: %s", e)
            print(e)


    def initialize_client(self):

        try:
            self.blob_service_client = BlobServiceClient.from_connection_string(conn_str=self.config_file["on_cloud"]["conn_string_BlobStorage"])
        except Exception as e:
            self.logger.error("Failed to initialize blob service client: %s", e)
            print(e)


### Child class for containers - the_container_wars

In [3]:
class TheContainerWars(AzureStorageManipulation):

    def __init__(self, json_path='var.json'):
        super().__init__(json_path)



    def check_status_container(self, container_name):

        try:
            container_client = self.blob_service_client.get_container_client(container_name)

            if container_client.exists():
                self.logger.info(f"The container {container_name} exists.")

                # Check if the container is empty
                count = 0
                if sum((count+1 for blob in container_client.list_blobs())) > 0:

                    self.logger.info(f"The container {container_name} is not empty.")
                    return "not_empty"

                else:
                    self.logger.info(f"The container {container_name} is empty.")
                    return "empty"      
                    
            else:
                self.logger.info(f"The container {container_name} does not exist.")
                return "not_exists"
            
        except Exception as e:
            self.logger.error(f"Failed to check container {container_name} status due to error: {e}")
            return "error"

    

    def make_container(self, container_name):

        if self.check_status_container(container_name) == "not_exists":
            
            try:
                # Create new container in the service
                container_client = self.blob_service_client.get_container_client(container_name)
                container_client.create_container()
                self.logger.info(f"Success: container {container_name} created")

            # except AssertionError:
                # self.logger.error(f"Failed to create container {container_name} as it already exists")

            except Exception as e:
                self.logger.error(f"Failed to create container {container_name} due to error: {e}")

        else:
            self.logger.error(f"Failed to create container {container_name} as it already exists")



    def list_containers(self):

        '''Return an iterator'''

        try:    
            # List containers in the storage account
            return self.blob_service_client.list_containers()
        except Exception as e:
            self.logger.error(f"Failed to list containers due to error: {e}")



    def copy_container_content(self, container_old, container_new, container_new_exists_check = True):

        ''' Need to add checks in for container existance and emptiness'''

        container_status = self.check_status_container(container_old)
        
        if container_status == "not_exists":
            self.logger.error(f"Failed to copy {container_old} contents as the container {container_old} does not exist. Returning None")
            return
        elif container_status == "empty":
            self.logger.error(f"Failed to copy {container_old} contents as the container {container_old} is empty. Returning None")
            return
        
        if container_new_exists_check:
            container_status = self.check_status_container(container_new)
            
            if container_status == "not_exists":
                self.logger.error(f"Failed to copy {container_old} contents as the container {container_new} does not exist. Returning None")
                return
            
        try:
            old_container_client = self.blob_service_client.get_container_client(container_old)
            blobs = old_container_client.list_blobs()

            # Copy the blobs from the original container to the new container
            new_container_client = self.blob_service_client.get_container_client(container_new)
            
            for blob in blobs:
                old_blob_client = old_container_client.get_blob_client(blob.name)
                new_blob_client = new_container_client.get_blob_client(blob.name)
                new_blob_client.start_copy_from_url(old_blob_client.url)

            self.logger.info(f"Success: Blobs copied from container {container_old} to container {container_new}")

        except Exception as e:
            self.logger.error(f"Failed to copy blobs from container {container_old} to container {container_new} : {e}")



    def rename_containers(self, name_old, name_new, container_new_exists_check = False):

        # bit long winded
        try:

            assert self.check_status_container(name_new) == "not_exists"
      
            # make new container
            self.make_container(name_new)
            self.copy_container_content(name_old, name_new, container_new_exists_check)
            self.delete_container(name_old)

        except AssertionError:
            self.logger.error(f"Failed: Container with name {name_new} already exists")

        except Exception as e:
            self.logger.error(f"Failed to rename container {name_old} to {name_new}: {e}")



    def delete_container(self, container_name):

        # Instantiate a client
        container_client = self.blob_service_client.get_container_client(container_name)

        try:
            container_client.delete_container()
            self.logger.info(f"Success: container {container_name} successfully deleted")
        except Exception as e:
            self.logger.error(f"Failed to delete container: {e}")

### Child class for blob - The blob

In [17]:
class TheBlob(AzureStorageManipulation):

    def __init__(self, json_path='var.json'):
        super().__init__(json_path)


    def upload_blobs(self):

        '''Uploads multiple blobs from local to cloud'''

        try: 
            # get local folder name and files to upload
            folder_local = self.config_file["on_prem"]["containerRawDataProcessed"]
            files = os.listdir(folder_local)
            files_num = len(files)
            count = 0

            # get destination cloud folder
            folder_cloud= self.config_file["on_cloud"]["containerRawDataProcessed"]
            container_client = self.blob_service_client.get_container_client(folder_cloud)

            if container_client.exists():
                # Upload local files, use local file name for the blob name
                for file in files:

                    # absolute path of the file 
                    filepath_local = os.path.join(folder_local, file)

                    # create blob client for every blob
                    client_blob = self.blob_service_client.get_blob_client(folder_cloud, file)

                    # upload the file (blob)
                    with open(filepath_local, "rb") as data:
                        client_blob.upload_blob(data, overwrite=True)
                        
                    count = count+1

                self.logger.info(f"Success: 'Upload Blob' completed. Uploaded {count} of {files_num} files to container {folder_cloud}\n")

            
        except Exception as e:
            self.logger.error(f"Failed: 'Upload Blob' failed due to error {e}\n" )
            


    def delete_blob(self, container_name, blob_name):

        try:

            container_client = self.blob_service_client.get_container_client(container_name, blob_name)
            blob_client = container_client.get_blob_client(blob_name)

            blob_client.delete_blob()
            self.logger.info(f"Success: blob {blob_name} successfully deleted")

        except Exception as e:
            self.logger.error(f"Failed to delete blob {blob_name}: {e}")
    

### Main.py

In [None]:
def main():

    # initialize the parent class and load the configuration stored in json file
    logger.info('START OF CONTAINER WARS')
    config = TheContainerWars()
    config.load_config() # lazy loading -> only done when needed
    config.initialize_client() # A client to interact with the Blob Service at the account level
    
    logger.info('CREATE CONTAINER INSTANCE RUNNING')
    config.make_container('testcontainer')

    logger.info('LISTING CONTAINERS INSTANCE RUNNING')
    containers = config.list_containers()
    for container in containers:
        print(container.name)

    logger.info('COPYING CONTAINER CONTENT INSTANCE RUNNING')
    config.copy_container_content('data-ready', 'testcontainer')
    config.copy_container_content('data-ready', 'testnewcontainer')

    logger.info('RENAMING CONTAINER INSTANCE RUNNING')
    config.rename_containers('testcontainer', 'testnewcontainer')
    config.rename_containers('testcontainer', 'testnewcontainer')

    logger.info('DELETE CONTAINER INSTANCE RUNNING')
    config.delete_container('testcontainer')


    logger.info('START OF HORRORS OF THE BLOB')
    horrorsof = TheBlob()
    horrorsof.load_config() # lazy loading -> only done when needed
    horrorsof.initialize_client()

    logger.info('UPLOAD BLOB INSTANCE RUNNING')
    horrorsof.upload_blobs()

    logger.info('DELETING BLOB INSTANCE RUNNING')
    horrorsof.delete_blob('testcontainer', 'test.csv')
    
    

if __name__ == "__main__":

    main()

## Preparation

### General Instructions

1. An Azure account with an active subscription. 
2. A Batch account with a linked Azure Storage account.
3. A Data Factory instance. 
4. Batch Explorer downloaded and installed.
5. Storage Explorer downloaded and installed
    **not necessary 
6. Python 3.7 or above, with the azure-storage-blob package installed by using pip.
    **When using Batch EXplorer gives option to set up Batch pool fro 'Data Science' projects that come with python and packages for data science
    **I did not see that option using Azure portal alone
7. Prepare requirements.txt file (most packages are not required with Data Science batch pool but I did it as an exercise)
    1. pip install azure-mgmt-storage
    2. pip install azure-storage-blob
8. Prepare json file with parameters to use in python script like connection string etc
    **You can also use ADF pipeline Parameter list OR Batch Process Extended Properties

### Features of this pipeline
1. The pipeline is set around csv file type. It will throw error for any other unless requisite changes are made in all ADF blocks
2. Data ( csv files), python scripts and all dependent scripts are placed in one blob
3. As data is copied to the Batch pool, it is quoted in the process and names of files with white spaces suffer. **method rename_files was an attempt to fix this. It does not work. it is in the code set for sake of capturing entirety of my work
4. 

### References
1. https://pypi.org/project/azure-storage-blob/
2. https://learn.microsoft.com/en-us/azure/data-factory/connector-file-system?tabs=data-factory#sample-linked-service-and-dataset-definitions
3. https://learn.microsoft.com/en-us/python/api/azure-storage-blob/azure.storage.blob.blobserviceclient?view=azure-python ***
4. https://learn.microsoft.com/en-us/azure/batch/tutorial-run-python-batch-azure-data-factory ***
5. https://stackoverflow.com/questions/52014916/enumerate-blob-names-in-azure-data-factory-v2 *****
6. https://azure.microsoft.com/mediahandler/files/resourcefiles/azure-data-factory-passing-parameters/Azure%20data%20Factory-Whitepaper-PassingParameters.pdf **
7. https://learn.microsoft.com/en-us/azure/data-factory/ *****
8. https://www.cathrinewilhelmsen.net/series/beginners-guide-azure-data-factory/ *****

## README