In [5]:
import numpy as np
import urllib
import os

# Download the dataset

In [6]:
# Get the webpage
url = 'https://landsat.usgs.gov/landsat-8-cloud-cover-assessment-validation-data'
response = urllib.request.urlopen(url)

# Find the data urls in the source
data_urls = []
for line in response:
    if 'https://landsat.usgs.gov/cloud-validation/cca' in str(line) and 'tar' in str(line):        
        data_urls.append(str(line)[47:124])

print("Products found:", np.size(data_urls))
print("Example url:   ", data_urls[0])        

Products found: 96
Example url:    https://landsat.usgs.gov/cloud-validation/cca_l8/LC80420082013220LGN00.tar.gz


In [7]:
# define backgrounds
backgrounds = ['Barren',
               'Forest',
               'GrassCrops',
               'Shrubland',
               'SnowIce',
               'Urban',
               'Water',
               'Wetlands']

In [8]:
from IPython.display import clear_output

last_percent_decimal_step = 0.0

# only use for synchronous download (global variable might lead to lags)
def rep_hoo(current_block_count, block_size, total_size):
    percentage = current_block_count*block_size/total_size*100
    
    global last_percent_decimal_step 
    if abs(percentage - last_percent_decimal_step) >= 1.0 or percentage==100.0:
        last_percent_decimal_step = percentage
        print(f"Download at {percentage:.1f}% - {current_block_count} blocks of {total_size//block_size} total blocks")
        clear_output(wait=True)

def async_report_hook(current_block_count, block_size, total_size):
    percentage = current_block_count*block_size/total_size*100
    if percentage%1==0 or percentage==100.0:
        print(f"Download at {percentage:.1f}% - {current_block_count} blocks of {total_size//block_size} total blocks")
        clear_output()
    

Synchronous Download

In [37]:
import urllib.request

index = -1 # index -1
for i in range(0, np.size(data_urls)): # range(0, np.size(data_urls))
    if i%12 == 0:  # There are 12 scenes in every background category
        index += 1
        if not os.path.exists(os.getcwd()+"/tar/"+backgrounds[index]+"/"):
            os.mkdir(os.getcwd()+"/tar/"+backgrounds[index]+"/")
    print("Downloading file " + data_urls[i].split("/")[-1])
    (txt, httpmsg) = urllib.request.urlretrieve(data_urls[i], os.getcwd()+"/tar/"+backgrounds[index]+"/"+data_urls[i].split("/")[-1], reporthook=rep_hoo)

KeyboardInterrupt: 

Asynchronous Download

In [23]:
import asyncio
import aiohttp
from concurrent.futures import ThreadPoolExecutor

async def download_tar_by_url_in_thread(url, background):
    # this will default the workers to #processors on your machine *5
    # https://stackoverflow.com/questions/75885213/how-to-increase-asyncio-thread-limits-in-an-existing-co-routine
    loop = asyncio.get_event_loop()
    path = os.getcwd()+"/tar/"+background+"/"+url.split("/")[-1]
    future = loop.run_in_executor(None, urllib.request.urlretrieve, url, path, async_report_hook)
    response, headers = await future

async def download_tar_by_url_in_thread_with_limit(url, background, limit_workers=40):
    # this creates a custom ThreadPoolExecutor with default worker limit of 40
    loop = asyncio.get_event_loop()
    path = os.getcwd()+"/tar/"+background+"/"+url.split("/")[-1]
    future = loop.run_in_executor(ThreadPoolExecutor(max_workers=limit_workers), urllib.request.urlretrieve, url, path, async_report_hook)
    response, headers = await future

async def download_tar_by_url(session, url, background):
    # https://www.slingacademy.com/article/python-aiohttp-how-to-download-files-using-streams/
    # !! chunks often corrupted
    async with session.get(url) as response:
        assert response.status == 200
        filename=url.split("/")[-1]
        path = f"./tar/{background}/{filename}"
        with open(path, 'wb') as file:
            while True:
                chunk = await response.content.readany()
                if not chunk:
                    break
                file.write(chunk)
        print(f"Downloaded {url} to {path}")
    #urllib.request.urlretrieve(data_urls[i], os.getcwd()+"/tar/"+backgrounds[index]+"/"+data_urls[i].split("/")[-1], reporthook=rep_hoo)
    #return await subprocess.Popen(f"wget {url} -P {'./tar/' + background + '/'}")
    #return await os.system(f"wget {url} -P {'./tar/' + background + '/'}")

async def download_bulk_urllib(urls, backgrounds):
    tasks = [download_tar_by_url_in_thread(url, bg) for url, bg in zip(urls, backgrounds)]
    await asyncio.gather(*tasks)

async def download_bulk(urls, backgrounds):
    async with aiohttp.ClientSession() as session:
        tasks = [download_tar_by_url(session, url, bg) for url, bg in zip(urls, backgrounds)]
        await asyncio.gather(*tasks)

async def download_all(start=0, folder_size=12):
    for i in range(start, np.size(data_urls)):
        if i%12==0:
            bg = backgrounds[i//folder_size]
            if not os.path.exists(os.getcwd()+"/tar/"+bg):
                os.mkdir(os.getcwd()+"/tar/"+bg)
            await download_bulk_urllib(urls=[data_urls[j] for j in range(i, i+folder_size)], backgrounds=[bg]*folder_size)

async def download_all_v2(start=0, end=0, folder_size=12):
    # pass <np.size(data_urls)> as end for download all
    bulk = []
    bgs = []
    for i in range(start, end):
        if i%folder_size == 0:
            bg = backgrounds[i//folder_size]
            if not os.path.exists(os.getcwd()+"/tar/"+bg):
                    os.mkdir("./tar/"+bg)
        bulk.append(data_urls[i])
        bgs.append(bg)    
    await download_bulk_urllib(urls=bulk, backgrounds=bgs)


In [24]:
%autoawait asyncio

# v2 with download_tar_by_url_in_thread, threadcount 70, 24m 55.5s at ~40-70MBps
# v2 with download_tar_by_url_in_thread_with_limit(..., limit_workers=40), threadcount 166, <time> at ~70MBps
await download_all_v2(end=np.size(data_urls))

ContentTooShortError: <urlopen error retrieval incomplete: got only 500935805 out of 932439747 bytes>

Other possible (not implemented) options

In [None]:
# Download all the products
# index = -1
# for i in range(0, np.size(data_urls)):
#     if i%12 == 0:  # There are 12 scenes in every background category       
#         index += 1     
#     !wget {data_urls[i]+'.tar.gz'} -P {'./tar/' + backgrounds[index] + '/'}

http://www.gnu.org/software/parallel/man.html#example__download_10_images_for_each_of_the_past_30_days

Running 10000 jobs with 100 in parallel:
> seq 10000 | parallel -j100 wget https://www.example.com/page{}.html

In [None]:
# Download all the products in background
#index = -1
#for i in range(0, np.size(data_urls)):
    # if i%12 == 0:  # There are 12 scenes in every background category       
    #     index += 1
    # !wget -b {data_urls[i]+'.tar.gz'} -P {'./tar/' + backgrounds[index] + '/'}

Unzipping

In [15]:
from shutil import unpack_archive
from pathlib import Path


folders = [i for i in os.listdir("./tar/") if i not in [".gitignore", ".gitkeep"]]

for folder in folders:
    Path("./unzipped/" + folder).mkdir(parents=True, exist_ok=True)
    files = sorted(os.listdir("tar/" + folder))
    files = [f for f in files if '.tar.gz' in f] 
    for file in files:
        try: 
            print("----")
            print("Unzipping product:", file)

            unpack_archive(f"./tar/{folder}/{file}", f"./unzipped/{folder}")
        except:
            print("----------------------------------------------------")
            print("UNZIP ERROR WITH PRODUCT:", file)
            pass

----
Unzipping product: LC80420082013220LGN00.tar.gz
----
Unzipping product: LC80500092014231LGN00.tar.gz
----
Unzipping product: LC80530022014156LGN00.tar.gz
----
Unzipping product: LC81330312013202LGN00.tar.gz
----
Unzipping product: LC81360302014162LGN00.tar.gz
----
Unzipping product: LC81390292014135LGN00.tar.gz
----
Unzipping product: LC81550082014263LGN00.tar.gz
----
Unzipping product: LC81570452014213LGN00.tar.gz
----
Unzipping product: LC81640502013179LGN01.tar.gz
----
Unzipping product: LC81750432013144LGN00.tar.gz
----
Unzipping product: LC81930452013126LGN01.tar.gz
----
Unzipping product: LC81990402014267LGN00.tar.gz
----
Unzipping product: LC80310202013223LGN00.tar.gz
----
Unzipping product: LC80340192014167LGN00.tar.gz
----
Unzipping product: LC81010142014189LGN00.tar.gz
----
Unzipping product: LC81020152014036LGN00.tar.gz
----
Unzipping product: LC81030162014107LGN00.tar.gz
----
Unzipping product: LC81070152013260LGN00.tar.gz
----
Unzipping product: LC81080162013171LGN00.

In [46]:
# Convert the .img masks to .TIF masks (all systems) (must have gdal installed in py environment)
from osgeo.gdal import Translate, TranslateOptions
format = "GTiff"

folders = sorted(os.listdir("./unzipped/"))
folders = [f for f in folders if '.' not in f]  # Filter out .gitignore
for folder in folders[1:2]:
    products = sorted(os.listdir("./unzipped/" + folder + "/BC/"))
    for product in products[1:2]:
        img_path  = "./unzipped/" + folder + "/BC/" + product + "/" + product + "_fixedmask.img"
        gtiff_path = "./unzipped/" + folder + "/BC/" + product + "/" + product + "_fixedmask123.TIF"
        Translate(gtiff_path, img_path, **{"format": format})

In [26]:
# Convert the .img masks to .TIF masks
folders = sorted(os.listdir("./unzipped/"))
folders = [f for f in folders if '.' not in f]  # Filter out .gitignore
for folder in folders:
    products = sorted(os.listdir("./unzipped/" + folder + "/BC/"))
    for product in products:
        img_path  = "./unzipped/" + folder + "/BC/" + product + "/" + product + "_fixedmask.img"
        gtiff_path = "./unzipped/" + folder + "/BC/" + product + "/" + product + "_fixedmask.TIF"
        !gdal_translate -of GTiff {img_path + " " + gtiff_path}

Input file size is 8911, 8941
0...10...20...30...40...50...60...70...80...90...100 - done.
Input file size is 8671, 8721
0...10...20...30...40...50...60...70...80...90...100 - done.
Input file size is 9101, 9071
0...10...20...30...40...50...60...70...80...90...100 - done.
Input file size is 7611, 7751
0...10...20...30...40...50...60...70...80...90...100 - done.
Input file size is 7871, 7991
0...10...20...30...40...50...60...70...80...90...100 - done.
Input file size is 7781, 7901
0...10...20...30...40...50...60...70...80...90...100 - done.
Input file size is 8931, 8961
0...10...20...30...40...50...60...70...80...90...100 - done.
Input file size is 7551, 7721
0...10...20...30...40...50...60...70...80...90...100 - done.
Input file size is 7551, 7741
0...10...20...30...40...50...60...70...80...90...100 - done.
Input file size is 7571, 7391
0...10...20...30...40...50...60...70...80...90...100 - done.
Input file size is 7511, 7321
0...10...20...30...40...50...60...70...80...90...100 - done.

In [30]:
# Move to raw data folder
# assuming this file is 1 folder lower in project folder
project_path = "/home/mxh/RS-Net/"
!cp -R ./unzipped/* {project_path + 'data/raw/landsat8ccv/'}