# CamHD Video File to Azure Blob with Dask

## Start by downloading an MOV from the raw data server

In [None]:
# we will use a recent mp4 for a smaller size from ooi's raw data server
import pandas 
link = 'https://rawdata.oceanobservatories.org/files/RS03ASHS/PN03B/06-CAMHDA301/2019/07/01/CAMHDA301-20190701T043000.mp4'
# filename = url.split('/')[-1]
path = '/home/jovyan/floc_data/'

#Check your filename ;)
#print(filename)

In [None]:
## List of Inputs for Dask
urls = ['https://rawdata.oceanobservatories.org/files/RS03ASHS/PN03B/06-CAMHDA301/2019/07/01/CAMHDA301-20190701T050000.mp4', 'https://rawdata.oceanobservatories.org/files/RS03ASHS/PN03B/06-CAMHDA301/2019/07/01/CAMHDA301-20190701T051500.mp4', 'https://rawdata.oceanobservatories.org/files/RS03ASHS/PN03B/06-CAMHDA301/2019/07/01/CAMHDA301-20190701T043000.mp4']


### Raw Server to Local Storage 

In [None]:
# # Here we create our download function without Dask
# # Stream is used to download media files with requests. Use chunks to download the file in "chunks" instead of loading the entire file at once.
# import requests
# import os
# import sys

# def download_file(url):
#     local_filename = url.split('/')[-1]
#     # NOTE the stream=True parameter
#     r = requests.get(url, stream=True)
#     with open(os.path.join(path, local_filename), 'wb') as f:
#         ##for chunk in progress.bar(r.iter_content(chunk_size=5024), expected_size=(total_length/5024) + 1): 
#         for chunk in r.iter_content(chunk_size=5024): 
#             if chunk: 
#                 f.write(chunk)
#     print("File" , filename , "created in", path)

# #download_file(url)

## It works! Now let us try something that requires more thinking
### Raw Server to Local Storage with delayed functions

In [None]:
# get yer dask going
from dask.distributed import Client, progress
client = Client(threads_per_worker=4, n_workers=1)
client

In [None]:
# Here we create our download function for Dask
# Stream is used to download media files with requests. Use chunks to download the file in "chunks" instead of loading the entire file at once.
import requests
import os
import sys

@delayed
def delayed_get_file(url):
    # NOTE the stream=True parameter
    r = requests.get(url, stream=True)
    with open(os.path.join(path, filename), 'wb') as f:
        for chunk in r.iter_content(chunk_size=5024): 
            if chunk: 
                f.write(chunk)
    print("File" , filename , "created in", path)


In [None]:
# We will generate a set of inputs on which we want to run our simulation function.
distributed_download = []
for url in urls:
    filename = url.split('/')[-1]
    data = delayed_get_file(url)
    distributed_download.append(data)

In [None]:
Check here: https://stackoverflow.com/questions/54412569/best-way-to-download-process-and-concat-into-tfrecords-using-dask
https://examples.dask.org/applications/embarrassingly-parallel.html

### The Way Below Did not Work:

## The 2nd cell was made to fix the error below, the 2nd cell works, but the error persists

In [None]:
# /opt/conda/bin/dask-worker in delayed_get_file()
#       9     # NOTE the stream=True parameter
#      10     r = requests.get(url, stream=True)
# ---> 11     with open(os.path.join(path, filename), 'wb') as f:
#      12         ##for chunk in progress.bar(r.iter_content(chunk_size=5024), expected_size=(total_length/5024) + 1):
#      13         for chunk in r.iter_content(chunk_size=5024):

# FileNotFoundError: [Errno 2] No such file or directory: '/home/jovyan/floc_data/CAMHDA301-20190701T043000.mp4'

In [None]:
# Here we create our directory for floc data
# ->>>>>>>> move this to a relative directory. 
import os
import sys

for url in urls:
    filename = url.split('/')[-1]   
    if not os.path.exists(path+filename):
        os.mkdir(path+filename)
        print("Path", path+filename, "now paved")
    else:    
        print("Directory" , path+filename , "already exists")

In [None]:
#Here we will try to DL with Dask
from dask import delayed, compute
import requests
import os
import sys

@delayed
def delayed_get_file(url):
    # NOTE the stream=True parameter
    r = requests.get(url, stream=True)
    with open(os.path.join(path, filename), 'wb') as f:
        for chunk in r.iter_content(chunk_size=5024): 
            if chunk: 
                f.write(chunk)
    print("File" , filename , "created in", path)

# @delayed
# def delayed_prores_to_blob(blob_service, container_name, blob_name, frame_data):
#     blob_service.create_blob_from_bytes(container_name, blob_name, frame_data)
#     return 0

#### Start a Dask cluster

In [None]:
from dask_kubernetes import KubeCluster
cluster = KubeCluster(n_workers=10)
cluster

In [None]:
from dask.distributed import Client
client = Client(cluster)
client

In [None]:
delayed_download = []
for url in urls:
    filename = url.split('/')[-1]
    data = delayed_get_file(url)
    delayed_download.append(data)

In [None]:
delayed_download[0]

In [None]:
%%time
ehh = compute(*delayed_download)

In [None]:
# # Run this to run the run function outlined above
# run()

In [None]:
# # This cell defines the function that will save our drive anon file in our data directory
# def run(): 
#     owd = os.getcwd()
#     #first change dir to path
#     os.chdir(path)
#     print("Now working in", os.getcwd(),)
#     #run download function to save video to data directory
#     download_file(url)
#     print("File" , filename , "created in", path)
#     #change dir back to original working directory (owd)
#     os.chdir(owd)
#     print("Now working back in", os.getcwd(),)

In [None]:
urls

# Blob It!!!!

In [None]:
import yaml
# load Azure storage account credentials
with open('.azure_credentials_ooitest.yaml', 'r') as f:
    credentials = yaml.load(f)
azure_storage_account_name = credentials['azure_storage_account_name']
azure_storage_account_key = credentials['azure_storage_account_key']

In [None]:
import azure.storage.blob  as ASB
import azure.storage.common

In [None]:
blob_service = ASB.BlockBlobService(azure_storage_account_name, account_key=azure_storage_account_key)

#content_settings = ContentSettings(content_type = "video/mov")
#blob_service.create_blob_from_path("mycontainer","myblockblob","sunset.png",content_settings)

In [None]:
source = '/home/jovyan/output_001.webm'

In [None]:
blob_name = 'blob_test'
container_name = 'movtest'
blob_service.create_container(container_name);

In [None]:
blob_service.create_blob_from_path(container_name, blob_name, source,)

In [None]:
# delete all blobs in container
blob = blob_service.list_blobs(container_name)
#for a in blobs:
 #    blob_service.delete_blob(container_name = container_name, blob_name = a.name)

In [None]:
dir(list(blob)[0])

In [None]:
ind_blob = list(blob)[0]

In [None]:
ind_blob.name

In [None]:
def append_block_from_url(container_name, blob_name, copy_source_url, timeout=20):
    

## Scrap

In [None]:
# create a container for the velocity data
#from azure.storage.blob import BlockBlobService
blob_service = BlockBlobService(azure_storage_account_name, azure_storage_account_key)
container_name = 'movtest'
blob_service.create_container(container_name);

In [None]:
azure.append_blob_from_path(container_name, blob_name, file_path, validate_content=False, maxsize_condition=None, progress_callback=None, lease_id=None, timeout=None)

In [None]:
blob_service.create_file_from_bytes(share_name, directory_name, container_name, file, index=0, count=None, content_settings=None, metadata=None, validate_content=False, progress_callback=None, max_connections=2, timeout=None)