# HLS Version 2 Cloud Mask query, download and chipping pipeline

### More information on how functions work can be found in the hls_v2_pipeline.ipynb

## Importing Packages

In [None]:
import geopandas
import json
import shapely
import shapely.geometry
import xarray
import rasterio
import rioxarray
import os
import fiona
import urllib.request as urlreq
import pandas as pd
import numpy as np
import requests
import xmltodict
import shutil
import datetime
import boto3
import pyproj

from shapely.ops import transform
from shapely.geometry import Point
from shapely.geometry import Polygon
from pystac_client import Client 
from collections import defaultdict
from glob import glob
from rasterio.enums import Resampling
from rasterio import Affine
from rasterio.crs import CRS
import matplotlib.pyplot as plt
from subprocess import Popen, PIPE
from tqdm import tqdm
from netrc import netrc
from subprocess import Popen
from platform import system
from getpass import getpass
from rasterio.session import AWSSession
from pathlib import Path

pd.set_option('display.max_columns', None)
pd.set_option('display.max_rows', None)

%matplotlib inline

## Setting folder pathes and file paths

In [None]:
##### START OPTIONS #####

## Threshold of cloud cover percentage. 
## 30 pct to 80 pct is used to get various could cover pct at chip level
cloud_lower = 30
cloud_upper = 80

# Root paths
root_path = "/data/cloudMask/"
req_path = "/home/data/fmask/"
extra_files = "/data/requirements/"
json_path = "/home/data/"

# Paths to necessary files
chip_file =  json_path + "chip_bbox_task_3_5070.geojson"
query_file = json_path + "chip_bbox_task_3.geojson"
chip_csv = req_path + "chip_tracker.csv"
kml_file = extra_files + 'sentinel_tile_grid.kml'

## Saving paths
## Manually creating these folders is recommended before running the pipeline
chip_dir = root_path + 'cloud_mask/'
tile_dir = root_path + 'tiles_fmask/'
chip_fmask_dir = root_path + "cloud_mask/"

#####  END OPTIONS  #####

## Read in csvs and jsons from saved file (Only run when need to load from previously query results)

In [None]:
# Getting all saved dataframes and json
chip_df = pd.read_csv(req_path + "chip_df.csv")
with open(json_path + "chip_ids.json", 'r') as f:
    chip_ids = json.load(f)
track_df = pd.read_csv(req_path + "track_df.csv")
with open(query_file, "r") as file:
    chips = json.load(file)
selected_tiles = pd.read_csv(req_path + "selected_tiles.csv")

## Data Processing

In [None]:
# Loading chips bounding boxes from geojson
with open(query_file, "r") as file:
    chips = json.load(file)

# Create lists about chip information to find tiles corresponding to it later
chip_ids = []
chip_x = []
chip_y = []
for item in chips['features']:
    chip_ids.append(item['properties']['id'])
    chip_x.append(item['properties']['center'][0])
    chip_y.append(item['properties']['center'][1])

In [None]:
# Save the chip_ids for chipping uses
with open("/home/data/chip_ids.json", "w") as f:
    json.dump(chip_ids, f, indent=2)

In [None]:
# Read in sentinel kml file
fiona.drvsupport.supported_drivers['KML'] = 'rw'
tile_src = geopandas.read_file(kml_file, driver='KML')

# Create table containing information about sentinel tiles
tile_name = []
tile_x = []
tile_y = []
for tile_ind in range(tile_src.shape[0]):
    tile_name.append(tile_src.iloc[tile_ind].Name)
    tile_x.append(tile_src.iloc[tile_ind].geometry.centroid.x)
    tile_y.append(tile_src.iloc[tile_ind].geometry.centroid.y)
tile_name = np.array(tile_name)
tile_x = np.array(tile_x)
tile_y = np.array(tile_y)
tile_src = pd.concat([tile_src, tile_src.bounds], axis = 1)
tile_src.head(5)

In [None]:
def find_tile(x,y):
# Identify closest tile
    s = (tile_x - x)**2+(tile_y - y)**2
    tname = tile_name[np.argmin(s)]
    return(tname)

In [None]:
# Assign each chip a tile by using the find_tile function
chip_df = pd.DataFrame({"chip_id" : chip_ids, "chip_x" : chip_x, "chip_y" : chip_y})
chip_df['tile'] = chip_df.apply(lambda row : find_tile(row['chip_x'], row['chip_y']), axis = 1)
chip_df.tail(5)

In [None]:
# Save dataframe to csv for later uses
chip_df.to_csv(req_path + "chip_df.csv", index=False)

In [None]:
tiles = chip_df.tile.unique().tolist()
tiles[0:5]

## Querying tile links based on geometry of chips

In [None]:
STAC_URL = 'https://cmr.earthdata.nasa.gov/stac'
catalog = Client.open(f'{STAC_URL}/LPCLOUD/')

In [None]:
# Query based on the cloud pct bounds
tile_list = []
print(f"There are a total of {len(tiles)} tiles")
tile_iter = 0
for current_tile in tiles:

    chip_df_filt = chip_df.loc[chip_df.tile == current_tile]#.reset_index()
    first_chip_id = chip_df_filt.chip_id.iloc[0]
    first_chip_index_in_json = chip_ids.index(first_chip_id)
    roi = chips['features'][first_chip_index_in_json]['geometry']

    search = catalog.search(
        collections = ['HLSS30.v2.0'],
        intersects = roi,
        datetime = '2022-03-01/2022-09-30',
    ) 
    
    num_results = search.matched()
    item_collection = search.get_all_items()
    
    tile_name = "T" + current_tile
    iter_items = 0
    for i in tqdm(item_collection ,desc=f"({tile_iter}/{len(tiles)})"):
        if i.id.split('.')[2] == tile_name:
            if i.properties['eo:cloud_cover'] >= cloud_lower and i.properties['eo:cloud_cover'] <= cloud_upper:
                response = requests.get(i.assets['metadata'].href)
                if response.status_code == 200:
                    temp_xml = response.text
                    temp_xml = xmltodict.parse(temp_xml)
                    temp_dict = {"tile_id": tile_name, "cloud_cover": i.properties['eo:cloud_cover'], 
                                 "spatial_cover": int(temp_xml['Granule']['AdditionalAttributes']['AdditionalAttribute'][3]['Values']['Value']),
                                 "http_links": {"Fmask": i.assets['Fmask']},
                                 "s3_links": {"Fmask": i.assets['Fmask'].href.replace('https://data.lpdaac.earthdatacloud.nasa.gov/lp-prod-protected/', 's3:/')}}
                    tile_list.append(temp_dict)
                    iter_items += 1
                else: 
                    assert False, f"Failed to fetch XML from {i.assets['metadata'].href}. Error code: {response.status_code}"
            
    tile_iter += 1
    
tile_df = pd.DataFrame(tile_list)

In [None]:
# Save to csv for later uses
tile_df.to_csv(req_path + "tile_df_fmask.csv", index=False)

In [None]:
tile_df.head()

## Filtering based on could and spatial coverage of the tiles we gathered earlier

In [None]:
def spatial_filtering (dataframe):
    """
        Using spatial coverage percentage to filter chips

        Args:
            dataframe: A pandas dataframe that generated previously
    """
    cover_list = [100, 90, 80, 70, 60, 50]
    tile_list_ft = []
    tile_list = dataframe.tile_id.unique().tolist()
    
    for tile in tqdm(tile_list):
        temp_df = dataframe[dataframe.tile_id == tile]
        for cover_pct in cover_list:
            
            temp_df_filtered = temp_df[temp_df.spatial_cover >= cover_pct]
            if len(temp_df_filtered) >= 6: # Number of "timestep" wish to get for each tile
                for i in range(len(temp_df_filtered)):
                    tile_list_ft.append(temp_df_filtered.iloc[i])
                break
    
    tile_df_filtered = pd.DataFrame(tile_list_ft)
    return tile_df_filtered

In [None]:
def select_scenes(dataframe):
    """
        Selecting best spatial covered scenes based on timesteps

        Args:
            dataframe: A pandas dataframe that generated previously
    """
    select_tiles = []
    tile_list = dataframe.tile_id.unique().tolist()

    for tile in tqdm(tile_list):
        temp_df = dataframe[dataframe.tile_id == tile].reset_index(drop=True)
        for i in range(6): # Number of "timestep" wish to get for each tile
            select_tiles.append(temp_df.loc[i])

    return pd.DataFrame(select_tiles).reset_index(drop=True)

In [None]:
def tile_filter_process(dataframe):
    sptial_filtered_df =  spatial_filtering(dataframe)
    time_selected_df = select_scenes(sptial_filtered_df)
    return time_selected_df

In [None]:
selected_tiles = tile_filter_process(tile_df)

In [None]:
# Save to csv for later uses
selected_tiles.to_csv(req_path + "selected_tiles.csv", index=False)

In [None]:
selected_tiles.head()

## Data downloading

### Creating netrc file on root for credentials (Run Once each docker session)

In [None]:
urs = 'urs.earthdata.nasa.gov'    # Earthdata URL endpoint for authentication
prompts = ['Enter NASA Earthdata Login Username: ',
           'Enter NASA Earthdata Login Password: ']

# Determine the OS (Windows machines usually use an '_netrc' file)
netrc_name = "_netrc" if system()=="Windows" else ".netrc"

# Determine if netrc file exists, and if so, if it includes NASA Earthdata Login Credentials
try:
    netrcDir = os.path.expanduser(f"~/{netrc_name}")
    netrc(netrcDir).authenticators(urs)[0]

# Below, create a netrc file and prompt user for NASA Earthdata Login Username and Password
except FileNotFoundError:
    homeDir = os.path.expanduser("~")
    Popen('touch {0}{2} | echo machine {1} >> {0}{2}'.format(homeDir + os.sep, urs, netrc_name), shell=True)
    Popen('echo login {} >> {}{}'.format(getpass(prompt=prompts[0]), homeDir + os.sep, netrc_name), shell=True)
    Popen('echo \'password {} \'>> {}{}'.format(getpass(prompt=prompts[1]), homeDir + os.sep, netrc_name), shell=True)
    # Set restrictive permissions
    Popen('chmod 0600 {0}{1}'.format(homeDir + os.sep, netrc_name), shell=True)

    # Determine OS and edit netrc file if it exists but is not set up for NASA Earthdata Login
except TypeError:
    homeDir = os.path.expanduser("~")
    Popen('echo machine {1} >> {0}{2}'.format(homeDir + os.sep, urs, netrc_name), shell=True)
    Popen('echo login {} >> {}{}'.format(getpass(prompt=prompts[0]), homeDir + os.sep, netrc_name), shell=True)
    Popen('echo \'password {} \'>> {}{}'.format(getpass(prompt=prompts[1]), homeDir + os.sep, netrc_name), shell=True)

### Getting temporary credentials from NASA's S3 Bucket(Run once each docker session to make sure it works)

In [None]:
s3_cred_endpoint = 'https://data.lpdaac.earthdatacloud.nasa.gov/s3credentials'

In [None]:
def get_temp_creds():
    temp_creds_url = s3_cred_endpoint
    return requests.get(temp_creds_url).json()

In [None]:
temp_creds_req = get_temp_creds()

In [None]:
session = boto3.Session(aws_access_key_id=temp_creds_req['accessKeyId'], 
                        aws_secret_access_key=temp_creds_req['secretAccessKey'],
                        aws_session_token=temp_creds_req['sessionToken'],
                        region_name='us-west-2')

In [None]:
rio_env = rasterio.Env(AWSSession(session),
                  GDAL_DISABLE_READDIR_ON_OPEN='EMPTY_DIR',
                  GDAL_HTTP_COOKIEFILE=os.path.expanduser('~/cookies.txt'),
                  GDAL_HTTP_COOKIEJAR=os.path.expanduser('~/cookies.txt'))
rio_env.__enter__()

### Tile downloading

In [None]:
# The download process would generate a temp credential for each tile download. 
# A multi-processed version is WIP.
def tile_download(table, from_csv = True):
    """
        Downloading tiles by reading from the metadata information gathered earlier

        Args:
            table: A pandas dataframe that generated previously
            boto3_session: The session that set earlier when getting credentials
            from_csv: If the tile information is from a csv, then True
    """
    info_list = []
    bands = ["Fmask"]
    accept_tiles = np.unique(table.tile_id)
    for tile in tqdm(accept_tiles):

        temp_creds_req = get_temp_creds()
        session = boto3.Session(aws_access_key_id=temp_creds_req['accessKeyId'], 
                        aws_secret_access_key=temp_creds_req['secretAccessKey'],
                        aws_session_token=temp_creds_req['sessionToken'],
                        region_name='us-west-2')
        
        temp_tb = table[table.tile_id == tile]
        for i in range(6): # Number of "timestep" wish to get for each tile
            if from_csv:
                bands_dict = json.loads(temp_tb.iloc[i].s3_links.replace("'", '"'))
            else:
                bands_dict = temp_tb.iloc[i].s3_links
            for band in bands:
                temp_key = bands_dict[band].replace("s3:/", "")
                temp_sav_path = f"/data/cloudMask/tiles_fmask/{bands_dict[band].split('/')[2]}/{bands_dict[band].split('/')[3]}"
                os.makedirs(f"/data/cloudMask/tiles_fmask/{bands_dict[band].split('/')[2]}", exist_ok=True)
                if not Path(temp_sav_path).is_file():
                    session.resource('s3').Bucket('lp-prod-protected').download_file(Key = temp_key, Filename = temp_sav_path)
            temp_dict = {"tile":tile, "count":i, "save_path":f"/data/cloudMask/tiles_fmask/{bands_dict[band].split('/')[2]}/", "filename":bands_dict["Fmask"].split('/')[3].replace(".Fmask.tif","")}
            info_list.append(temp_dict)
    return pd.DataFrame(info_list)

In [None]:
track_df = tile_download(selected_tiles, from_csv = False)

In [None]:
track_df.to_csv(req_path + "track_df.csv", index=False)

## Chipping (Run hls_reproject.ipynb before going into this chunk)

In [None]:
# Getting all saved dataframes and json
chip_df = pd.read_csv(req_path + "chip_df.csv")
with open("/home/data/chip_ids.json", 'r') as f:
    chip_ids = json.load(f)
track_df = pd.read_csv(req_path + "track_df.csv")
with open(chip_file, "r") as file:
    chips = json.load(file)

In [None]:
tiles_to_chip = track_df.tile.unique().tolist()
with open(chip_file, "r") as file_chip:
    chipping_js = json.load(file_chip)

### Chipping functions

In [None]:
## The qa_lut lookup table is from the nasa_hls repo.
## For more information check "https://benmack.github.io/nasa_hls/build/html/tutorials/Working_with_HLS_datasets_and_nasa_hls.html"
qa_lut = pd.read_csv("/home/data/qa_lut.csv")

In [None]:
cloud_classes = qa_lut[qa_lut.cloud == True].qa_value.tolist()

In [None]:
def cloud_mask_reclass(x):
    ## binary reclass of the cloud mask
    return(cloud_classes.count(x))

c_rcl = np.vectorize(cloud_mask_reclass)

In [None]:
def process_chip(chip_id, 
                 chip_tile,
                 shape,
                 track_csv):
    
    """
    This function receives a chip id, HLS tile, chip geometry, and a list of bands to process. 
    
    Assumptions:
    
    Inputs:
    - chip_id: string of chip id, e.g. '000_001'
    - chip_tile: string of HLS tile , e.g. '15ABC'
    - shape: 'geometry' property of single polygon feature read by fiona
    
    The function writes out a multi-date TIF containing the bands for each of the three image dates for an HLS tile. 
    The function writes out a multi-date TIF containing the QA bands of each date.
    The function writes out a chipped version of CDL. 
    The function calls check_qa(), which makes assumptions about what QA pixels are valid.
    The function returns the number of valid QA pixels at each date, as a tuple.
    
    """
    ## get reprojected image paths
    tile_info_df = track_csv[track_csv.tile == chip_tile]
    
    selected_image_folders = tile_info_df.save_path.to_list()

    all_qa = []
                     
    for i in range(len(selected_image_folders)):
        all_qa.append(tile_info_df.iloc[i].save_path + f"{tile_info_df.iloc[i].filename}.Fmask.reproject.tif")

    cloud_pct = [] 
    # bad_pct_val = []
    qa_bands = []
                     
    for i in range(len(selected_image_folders)):
        with rasterio.open(all_qa[i]) as src:
            out_image, out_transform = rasterio.mask.mask(src, shape, crop=True)
        out_image_binary = c_rcl(out_image).astype(np.uint8)
        qa_bands.append(out_image_binary[0])
        cloud_pct.append(np.count_nonzero(out_image_binary[0] == 1)/(out_image_binary.shape[1] * out_image_binary.shape[2]))
        
    qa_bands = np.array(qa_bands).astype(np.uint8)
        
    with rasterio.open(all_qa[0]) as src:
        out_meta = src.meta
    
    out_meta.update({"driver": "GTiff",
                     "height": qa_bands.shape[1],
                     "width": qa_bands.shape[2],
                     "count": 1,
                     "transform": out_transform})
    for i in range(len(selected_image_folders)):
        current_cmask = np.expand_dims(qa_bands[i], axis=0)
        with rasterio.open(f"{chip_fmask_dir}{str(chip_id)}_{i}_cmask.tif", "w", **out_meta) as dest:
            dest.write(current_cmask)  
    
    return(cloud_pct)
    

### Chipping process

In [None]:
## process chips
fmask_chips_info = []
for tile in tiles_to_chip:
    chips_to_process = chip_df[chip_df.tile == tile.replace("T", "")].reset_index(drop = True)
    for k in range(len(chips_to_process)):
        current_id = chips_to_process.chip_id[k]
        chip_tile = chips_to_process.tile[k]
        # print(current_id)
        chip_index = chip_ids.index(current_id)

        chip_feature = chipping_js['features'][chip_index]

        shape = [chip_feature['geometry']]

        ## do we want to scale/clip reflectances?
        full_tile_name = "T" + chip_tile
        cloud_pct = process_chip(current_id, full_tile_name, shape, track_df)

        chip_df_index = chip_df.index[chip_df['chip_id'] == current_id].tolist()[0]

        for i in range(len(cloud_pct)):
            temp_dict = {"fmask_name": f"{current_id}_{i}_cmask.tif",
                         "cloud_pct": cloud_pct[i]}
            fmask_chips_info.append(temp_dict)
fmask_tracker = pd.DataFrame(fmask_chips_info)

In [None]:
fmask_tracker.to_csv("/home/data/fmask_tracker.csv", index = False)