In [1]:
#Import libraries
import os
import geopandas as gp
import matplotlib.pyplot as plt
import json
from contextlib import redirect_stdout
import csv
import earthaccess
import rasterio
import numpy as np
import shutil 

In [2]:
#Login
earthaccess.login(persist=True)

<earthaccess.auth.Auth at 0x2c55992cd60>

In [13]:
#Set time and location
geojson_path = r'C:\Users\attic\HLS Kelp Detection\maps\Isla_Vista_Kelp.geojson'
field = gp.read_file(geojson_path)
bbox = tuple(list(field.total_bounds))
bbox #Display coordinate bounds
with open(geojson_path, 'r') as f:
    data = json.load(f)
# Extract the name
location = (data['name']).replace('.kmz', '').replace(' ', '_')
temporal = ("2024-07-01T00:00:00", "2025-01-01T00:00:00") #

#DEM map doesn't work great as is. For processing, I recommend downloading one from: https://www.ncei.noaa.gov/maps/bathymetry/ and inserting it into the /imagery folder
create_dem = False # set true if u want a digital elevation map 
dem_name = 'dem.tif'

num_to_download = 2 #set this value to the number of frames you want
load_num = 2 #sets number of granules to load, this should generally be >> than num_download; -1 loads all 
specific_tile = True #set true if you only want specific tile 
download_to_path = r'C:\Users\attic\HLS Kelp Detection\imagery'
tile = '11SKU' # 

threads = os.cpu_count() / 2 

In [14]:
#Search for satellite data from  Landsat 30m and Sentinel 30m
results = earthaccess.search_data(
    short_name=['HLSL30','HLSS30'],
    bounding_box=bbox,
    temporal=temporal,
     cloud_cover=0, #Determine cloud cover
    count=load_num
)
if create_dem:
    dem_results = earthaccess.search_data(
        short_name="ASTGTM",
        bounding_box=bbox)
    print(dem_results[0])

print(results[0])


Granules found: 50
Collection: {'EntryTitle': 'HLS Sentinel-2 Multi-spectral Instrument Surface Reflectance Daily Global 30m v2.0'}
Spatial coverage: {'HorizontalSpatialDomain': {'Geometry': {'GPolygons': [{'Boundary': {'Points': [{'Longitude': -120.25617766, 'Latitude': 34.20966703}, {'Longitude': -119.06537353, 'Latitude': 34.23552162}, {'Longitude': -119.09014717, 'Latitude': 35.2250277}, {'Longitude': -120.29518354, 'Latitude': 35.19820772}, {'Longitude': -120.25617766, 'Latitude': 34.20966703}]}}]}}}
Temporal coverage: {'RangeDateTime': {'BeginningDateTime': '2024-07-04T18:54:48.466Z', 'EndingDateTime': '2024-07-04T18:54:55.166Z'}}
Size(MB): 219.44079113006592
Data: ['https://data.lpdaac.earthdatacloud.nasa.gov/lp-prod-protected/HLSS30.020/HLS.S30.T11SKU.2024186T183919.v2.0/HLS.S30.T11SKU.2024186T183919.v2.0.VZA.tif', 'https://data.lpdaac.earthdatacloud.nasa.gov/lp-prod-protected/HLSS30.020/HLS.S30.T11SKU.2024186T183919.v2.0/HLS.S30.T11SKU.2024186T183919.v2.0.B07.tif', 'https://da

In [28]:
data = results[0]
print(type(data))
links = data.data_links()
print(links)
print(data)



<class 'earthaccess.results.DataGranule'>
['https://data.lpdaac.earthdatacloud.nasa.gov/lp-prod-protected/HLSS30.020/HLS.S30.T11SKU.2024186T183919.v2.0/HLS.S30.T11SKU.2024186T183919.v2.0.VZA.tif', 'https://data.lpdaac.earthdatacloud.nasa.gov/lp-prod-protected/HLSS30.020/HLS.S30.T11SKU.2024186T183919.v2.0/HLS.S30.T11SKU.2024186T183919.v2.0.B07.tif', 'https://data.lpdaac.earthdatacloud.nasa.gov/lp-prod-protected/HLSS30.020/HLS.S30.T11SKU.2024186T183919.v2.0/HLS.S30.T11SKU.2024186T183919.v2.0.B11.tif', 'https://data.lpdaac.earthdatacloud.nasa.gov/lp-prod-protected/HLSS30.020/HLS.S30.T11SKU.2024186T183919.v2.0/HLS.S30.T11SKU.2024186T183919.v2.0.B05.tif', 'https://data.lpdaac.earthdatacloud.nasa.gov/lp-prod-protected/HLSS30.020/HLS.S30.T11SKU.2024186T183919.v2.0/HLS.S30.T11SKU.2024186T183919.v2.0.B03.tif', 'https://data.lpdaac.earthdatacloud.nasa.gov/lp-prod-protected/HLSS30.020/HLS.S30.T11SKU.2024186T183919.v2.0/HLS.S30.T11SKU.2024186T183919.v2.0.Fmask.tif', 'https://data.lpdaac.earthdatac

AttributeError: 'DataGranule' object has no attribute 'help'

In [17]:
folder_path = os.path.join(os.path.join(download_to_path,location))

In [18]:
earthaccess.download(links[0], local_path=folder_path)

QUEUEING TASKS | :   0%|          | 0/1 [00:00<?, ?it/s]

PROCESSING TASKS | :   0%|          | 0/1 [00:00<?, ?it/s]

COLLECTING RESULTS | :   0%|          | 0/1 [00:00<?, ?it/s]

['C:\\Users\\attic\\HLS Kelp Detection\\imagery\\Isla_Vista_Kelp\\HLS.S30.T11SKU.2024186T183919.v2.0.VZA.tif']

In [5]:
def extract_metadata(granule):
    ## ======= Parse metadata ======= ##
    json_str = json.dumps(granule.__dict__)
    metadata = json.loads(json_str) 
    meta = metadata['render_dict']['meta']
    #For some reason, attributes are parsed into a list in the HLS metadata. This reformats it into a dictionary.
    attributes_list = metadata['render_dict']['umm']['AdditionalAttributes']
    attributes = {attr['Name']: attr['Values'][0] for attr in attributes_list}
    vis_urls = granule.dataviz_links()
    meta['data_vis_url'] = vis_urls[0]
    return {**attributes, **meta}
def save_metadata_csv(metadata, file_path,folder_name):
    ## ======= write metadata csv ======= ##
    csv_file = os.path.join(file_path, (f'{folder_name}_metadata.csv'))
    with open(csv_file, 'w', newline='') as file:
        writer = csv.writer(file)
        writer.writerow(metadata.keys())
        writer.writerow(metadata.values())
def get_sensor_bands(granule_name):
    file_data = granule_name.split('.')
    sensor = file_data[1]
    if sensor == 'L30':
        bands =  ['B02', 'B03', 'B04', 'B05', 'B06', 'B07','Fmask']
    else:
        bands =  ['B02', 'B03', 'B04', 'B8A', 'B11', 'B12','Fmask']
def get_band_links(data_links, granule_name):
    bands = get_sensor_bands(granule_name=granule_name)
    filtered_links = [f for f in data_links if any(band in f for band in bands)]
    return filtered_links

def get_sorted_files(folder_path, granule_name):

    sensor_bands = get_sensor_bands(granule_name)
    img_files = os.listdir(folder_path)
    filtered_files = [f for f in img_files if any(band in f for band in sensor_bands)]
    
    final_files = []
    for band in sensor_bands:
        for file in filtered_files:
            if band in file:
                final_files.append(file)
    
    # Sort files based on the order in sensor_bands
    def sort_key(filename):
        for band in sensor_bands:
            if band in filename:
                return sensor_bands.index(band)
        return len(sensor_bands)  # Put files with no band information at the end
    
    sorted_files = sorted(final_files, key=sort_key)
    
    return sorted_files

In [31]:
meta = extract_metadata(data)
print(meta['native-id'])

HLS.S30.T11SKU.2024186T183919.v2.0


In [None]:

folder_path = os.path.join(os.path.join(download_to_path,location))
## ======= create location folder path ======= ##
if not os.path.isdir(folder_path):
    os.mkdir(folder_path)

## ======= Create/write DEM, if requested ======= ##
if create_dem:
    if not os.path.isfile(os.path.join(folder_path, dem_name)):
            dem_path = earthaccess.download(dem_results[0], local_path=folder_path)
            os.rename(dem_path[0], os.path.join(folder_path,'dem.tif'))
            os.rename(dem_path[1], os.path.join(folder_path, 'num.tif'))

downloaded = 0

for i, granule in enumerate(results):
    metadata = extract_metadata(granule)
    ## ======= Parse metadata ======= ##

    name = metadata['native-id']

    #For some reason, attributes are parsed into a list in the HLS metadata. This reformats it into a dictionary.
    #print(attributes['MGRS_TILE_ID'])

    if(int(metadata['CLOUD_COVERAGE']) > 50): #Reject granules with large cloud cover, for now
        continue
    time = metadata['SENSING_TIME']
    tile_folder = metadata['MGRS_TILE_ID']
    if specific_tile and not tile_folder == tile:
        continue
    ## ======= Create file directory, if needed  ======= ##
    tile_path = os.path.join(folder_path,tile_folder)
    if not os.path.isdir(tile_path):
         os.mkdir(tile_path)
    folder_name = (f'{name}')
    file_path = os.path.join(tile_path,folder_name)
    if not os.path.isdir(file_path):
        os.mkdir(file_path) #Make folder for granule 

    # sorted data links 
    all_links = granule.data_links()
    links = get_band_links(all_links, name)

    ## ======= download granule ======= ##
    with open(os.devnull, 'w') as f, redirect_stdout(f): #The print out of this is kind of annoying, this redirects *most* of it 
        downloadPath = earthaccess.download(links, local_path=file_path, threads=2)
    downloaded = downloaded + 1
    print(f'{name}')
    if downloaded > num_to_download:
         break

    ## ======= write metadata csv ======= ##
    save_metadata_csv(metadata,file_path,folder_name)


In [15]:
temp_path = r'C:\Users\attic\HLS Kelp Detection\temp'
folder_path = os.path.join(os.path.join(download_to_path,location))
## ======= create location folder path ======= ##

downloaded = 0

for i, granule in enumerate(results):
    if os.path.isdir(temp_path):
       shutil.rmtree(temp_path)
    os.mkdir(temp_path)
    metadata = extract_metadata(granule)
    ## ======= Parse metadata ======= ##

    name = metadata['native-id']

    #For some reason, attributes are parsed into a list in the HLS metadata. This reformats it into a dictionary.
    #print(attributes['MGRS_TILE_ID'])

    if(int(metadata['CLOUD_COVERAGE']) > 50): #Reject granules with large cloud cover, for now
        continue
    time = metadata['SENSING_TIME']
    tile_folder = metadata['MGRS_TILE_ID']
    if specific_tile and not tile_folder == tile:
        continue
    ## ======= Create file directory, if needed  ======= ##
    tile_path = os.path.join(folder_path,tile_folder)
    if not os.path.isdir(tile_path):
         os.mkdir(tile_path)
    file_path = os.path.join(tile_path,f'{name}.tif')
    if os.path.isfile(file_path):
        continue
    # if not os.path.isdir(file_path):
    #     os.mkdir(file_path) #Make folder for granule 

    # sorted data links 
    all_links = granule.data_links()
    links = get_band_links(all_links, name)

    ## ======= download granule ======= ##
    #with open(os.devnull, 'w') as f, redirect_stdout(f): #The print out of this is kind of annoying, this redirects *most* of it 
    downloadPath = earthaccess.download(links, local_path=temp_path)
    bands = []
    transform = None

    for file in get_sorted_files(folder_path=temp_path,granule_name=name):
        with rasterio.open(os.path.join(temp_path,file), 'r') as src:
            bands.append(src.read())
            if transform is None:
                transform = src.transform
                crs = src.crs

    bands = np.vstack(bands)
    print(bands.shape)
    num_bands, height, width = bands.shape
    data_type = rasterio.int16
    profile = {
        'driver': 'GTiff',
        'width': width,
        'height': height,
        'count': num_bands,  # one band  B02, B03, B04, and B05, classified, mesma (Blue, Green, Red, and NIR).
        'dtype': data_type,  # assuming binary mask, adjust dtype if needed
        'crs': crs,
        'transform': transform,
        'nodata': 0,  # assuming no data is 0
        'tags': metadata
    }

    with rasterio.open(file_path, 'w', **profile) as dst: #output file is a stack of 6 HLS bands (in chronological order) + Fmask as image 7
        for i,band in enumerate(bands):
            dst.write(band.astype(data_type), i + 1)

    
    downloaded = downloaded + 1
    shutil.rmtree(temp_path)
    print(f'{name}')
    if downloaded > num_to_download:
         break

    ## ======= write metadata csv ======= ##
    #save_metadata_csv(metadata,file_path,folder_name)
