# CamHD Video File to Azure Blob with Dask

## Define your computation calling function

In [None]:
def copy_to_blob(blob_service, container_name, blob_name, url):
    return blob_service.copy_blob(container_name, blob_name, url)

## Start dask gathering workers

In [None]:
#get yer dask going
from dask_kubernetes import KubeCluster
cluster = KubeCluster(n_workers=10)
cluster

In [None]:
from dask.distributed import Client
from dask import delayed, compute
# import dask.array as dsa
client = Client(cluster)
client

## Import modules

In [None]:
import azure.storage.blob as ASB
import azure.storage.common
import yaml
import dask
import pandas as pd

In [None]:
# load Azure storage account credentials
with open('/home/jovyan/.azure_credentials_ooitest.yaml', 'r') as f:
    credentials = yaml.load(f)
azure_storage_account_name = credentials['azure_storage_account_name']
azure_storage_account_key = credentials['azure_storage_account_key']

## Define the set of input parameters to call the function

In [None]:
## inspect your database
# this option will show the full filename
pd.set_option('display.max_colwidth', -1)
dbcamhd = pd.read_json('/home/jovyan/floc_data/dbcamhd.json', orient="records", lines=True).sort_values(by=['timestamp'])
#dbcamhd.tail()

In [None]:
# we make a small from dbcamhd, because in my mind it's a layer of protection from copying 14000 blobs on accident
short = dbcamhd[14400:14410]
# the container name will often throw an error, so follow the rules
container_name ='sattested'
blob_service = ASB.BlockBlobService(azure_storage_account_name, azure_storage_account_key)
blob_service.create_container(container_name);

In [None]:
#Use Dask Delayed to make our function lazy
lazy_attempt = []
for i, row in dbcamhd[14400:14407].iterrows():
    url = row.filename
    blob_name = (url.split('/')[-1]) 
    lazy_to_blob = dask.delayed(copy_to_blob)(blob_service, container_name, blob_name, url)
    lazy_attempt.append(lazy_to_blob)

In [None]:
# did it do what we wanted?
print(lazy_attempt)

In [None]:
## not the solution, this gives us the same problem
# %time
# results = dask.compute(*lazy_attempt)
# results

## Check size of containters & blobs to see if it worked

In [None]:
containers = block_blob_service.list_containers()
for c in containers:
        contName = c.name
        generator = block_blob_service.list_blobs(contName)
        contSize = 0
        contNum = 0
        for blob in generator: 
            contSize += blob.properties.content_length
            contNum += 1
        print(contName + " : " + str(contSize/1000000) + " MB" + " in " + str(contNum) + " blobs")

## Clean up your space before you go home

### clean up blobs

In [None]:
blob_list = block_blob_service.list_blobs(container_name)
for blob in blob_list: 
    blob_name = blob.name
    print(blob_name)

In [None]:
# # delete all blobs in container
# blob = block_blob_service.list_blobs(container_name)
# #for a in blob:
#        #blob_service.delete_blob(container_name = container_name, blob_name = a.name)

### clean up containers

In [None]:
# # #set container to delete
container_name = 'sattested'

In [None]:
# # Clean up resources. This includes the container and the temp files.
block_blob_service.delete_container(container_name)
# # os.remove(full_path_to_file)
# # os.remove(full_path_to_file2)