# Summary

This file has been taken from various versions of the GEE_pull_functions files that have been used to pull surface reflectance from Landsat Collection 1 over rivers, lakes, wqp matchups, etc. It has been modified to run with Landsat Collcetion 2, based on changes with collection 2 data such as: 

* Bit mask information (i.e. bit numbers) 
* Scaling parameters for optical bands 
* Band names 
* Thresholds (inputs, rather) for the DSWE function

At this current stage, this code is meant to test pulling reflectance from various waterbodies within the Ohio River Basin. Waterbodies have been sampled from the Ohio River Basin using the get_waterbodies call in nhdPlusTools in R, and imported into GEE as an asset. 

In [5]:
import geemap
import time
import os
import time
import ee
import os
import numpy
import pandas

In [None]:
ee.Authenticate()

In [6]:
ee.Initialize()

# Image Masking 

Masks are binary images used to remove unwanted pixels from the final layer. 

Below are the updated bit numbers within the 'QA_PIXEL' band for LS Collection 2 data. Note that this is one of the main differences between collection 1 and 2. 

*    Bit 0: Fill
*    Bit 1: Dilated Cloud
*    Bit 2: Cirrus (high confidence)
*    Bit 3: Cloud
*    Bit 4: Cloud Shadow
*    Bit 5: Snow
*    Bit 6: Clear
*        0: Cloud or Dilated Cloud bits are set
*        1: Cloud and Dilated Cloud bits are not set
*    Bit 7: Water
*    Bits 8-9: Cloud Confidence
*        0: None
*        1: Low
*        2: Medium
*        3: High
*    Bits 10-11: Cloud Shadow Confidence
*        0: None
*        1: Low
*        2: Medium
*        3: High
*    Bits 12-13: Snow/Ice Confidence
*        0: None
*        1: Low
*        2: Medium
*        3: High
*    Bits 14-15: Cirrus Confidence
*        0: None
*        1: Low
*        2: Medium
*        3: High

# Unpack

The following functions 'unpack' the information stored within the QA band. 

In [78]:

def Unpack(bitBand, startingBit, bitWidth): #For reference, the Water bit, bit 7, would have a starting bit of 7 and a bit width of 1
  return (ee.Image(bitBand)\
  .rightShift(startingBit)\
  .bitwiseAnd(ee.Number(2).pow(ee.Number(bitWidth)).subtract(ee.Number(1)).int()))


def UnpackAll(bitBand, bitInfo):
  unpackedImage = ee.Image.cat([Unpack(bitBand, bitInfo[key][0], bitInfo[key][1]).rename([key]) for key in bitInfo])
  return unpackedImage



# Function of Mask (FMask)

Fmask is used for cloud, cloudshadow, snow/ice, and water masking 

In [79]:
def AddFmask(image):
    bitInfo = {
    'Cloud': [3, 1],
    'CloudShadow': [4, 1], 
    'SnowIce': [5, 1],
    'Water': [7, 1]
    }
    
    temp = UnpackAll(image.select(['qa']), bitInfo)
    
    fmask = (temp.select(['Water']).rename(['fmask'])
    .where(temp.select(['SnowIce']), ee.Image(4)) #4 because we're taking SnowIce bit number (5) and subtracting 1
    .where(temp.select(['CloudShadow']), ee.Image(3))
    .where(temp.select(['Cloud']), ee.Image(2))
    .mask(temp.select(['Cloud']).gte(0))) 
    #mask the fmask so that it has the same footprint as the quality (BQA) band
    return(image.addBands(fmask))

# Dynamic Surface Water Extent (DSWE)

These functions are for calculating DSWE. The thresholds for various tests (t1-t5), haven't necessarily changed. What's changed are the inputs. Previously, for LS Collection 1 the inputs for these functions were unscaled surface reflectance. Now, the inputs are scaled surface reflectance. 


In [80]:
#Modified Normalized Difference Wetness Index
def Mndwi(image): 
  return(image.normalizedDifference(['Green', 'Swir1']).rename('mndwi'))

#Multi-band Spectral Relationship Visible
def Mbsrv(image):
  return(image.select(['Green']).add(image.select(['Red'])).rename('mbsrv'))

#Multi-band Spectral Relationship Near infrared
def Mbsrn(image):
  return(image.select(['Nir']).add(image.select(['Swir1'])).rename('mbsrn'))

#Normalized Difference Vegetation Index
def Ndvi(image):
  return(image.normalizedDifference(['Nir', 'Red']).rename('ndvi'))

#Automated Water Extent Shadow
def Awesh(image):
  return(image
  .expression('Blue + 2.5 * Green + (-1.5) * mbsrn + (-0.25) * Swir2', {
    'Blue': image.select(['Blue']),
    'Green': image.select(['Green']),
    'mbsrn': Mbsrn(image).select(['mbsrn']),
    'Swir2': image.select(['Swir2'])
  }))

def Dswe(i):
   mndwi = Mndwi(i)
   mbsrv = Mbsrv(i)
   mbsrn = Mbsrn(i)
   awesh = Awesh(i)
   swir1 = i.select(['Swir1'])
   nir = i.select(['Nir'])
   ndvi = Ndvi(i)
   blue = i.select(['Blue'])
   swir2 = i.select(['Swir2'])
  
  # These thresholds are taken from the LS Collection 2 DSWE Data Format Control Book:
  # (https://d9-wret.s3.us-west-2.amazonaws.com/assets/palladium/production/s3fs-public/media/files/LSDS-2042_LandsatC2_L3_DSWE_DFCB-v2.pdf)
  # Inputs are meant to be scaled reflectance values 

   t1 = mndwi.gt(0.124) # MNDWI greater than Wetness Index Threshold
   t2 = mbsrv.gt(mbsrn) # MBSRV greater than MBSRN
   t3 = awesh.gt(0) #AWESH greater than 0
   t4 = (mndwi.gt(-0.44)  #Partial Surface Water 1 thresholds
   .And(swir1.lt(0.09)) #900 for no scaling (LS Collection 1)
   .And(nir.lt(0.15)) #1500 for no scaling (LS Collection 1)
   .And(ndvi.lt(0.7))) 
   t5 = (mndwi.gt(-0.5) #Partial Surface Water 2 thresholds
   .And(blue.lt(0.1)) #1000 for no scaling (LS Collection 1)
   .And(swir1.lt(0.3)) #3000 for no scaling (LS Collection 1)
   .And(swir2.lt(0.1)) #1000 for no scaling (LS Collection 1)
   .And(nir.lt(0.25))) #2500 for no scaling (LS Collection 1)
  
   t = t1.add(t2.multiply(10)).add(t3.multiply(100)).add(t4.multiply(1000)).add(t5.multiply(10000));
  
   noWater = (t.eq(0)
   .Or(t.eq(1))
   .Or(t.eq(10))
   .Or(t.eq(100))
   .Or(t.eq(1000)))
   hWater = (t.eq(1111)
   .Or(t.eq(10111))
   .Or(t.eq(11011))
   .Or(t.eq(11101))
   .Or(t.eq(11110))
   .Or(t.eq(11111)))
   mWater = (t.eq(111)
   .Or(t.eq(1011))
   .Or(t.eq(1101))
   .Or(t.eq(1110))
   .Or(t.eq(10011))
   .Or(t.eq(10101))
   .Or(t.eq(10110))
   .Or(t.eq(11001))
   .Or(t.eq(11010))
   .Or(t.eq(11100)))
   pWetland = t.eq(11000)
   lWater = (t.eq(11)
   .Or(t.eq(101))
   .Or(t.eq(110))
   .Or(t.eq(1001))
   .Or(t.eq(1010))
   .Or(t.eq(1100))
   .Or(t.eq(10000))
   .Or(t.eq(10001))
   .Or(t.eq(10010))
   .Or(t.eq(10100)))
  
   iDswe = (noWater.multiply(0)
   .add(hWater.multiply(1))
   .add(mWater.multiply(2))
   .add(pWetland.multiply(3))
   .add(lWater.multiply(4)))
  
   return iDswe.rename('dswe')

# Hillshades and hillshadows

These functions calculate hillshade and hillhshadow based on information on azimuth and zenith angles stored within a Landsat image. 

The names for the azimuth angle was changed from 'SOLAR_AZIMUTH' to 'SUN_AZIMUTH'. There also is no 'ZENITH' field, but a 'SUN_ELEVATION' field that you can do 90 - 'SUN_ELEVATION' to get 'SUN_ZENITH'. 

In [13]:

def CalcHillShades(image, geo):

  MergedDEM = ee.Image("users/eeProject/MERIT").clip(geo.buffer(30))
  
  SOLAR_AZIMUTH_ANGLE = ee.Number(image.get('SUN_AZIMUTH'))
  SOLAR_ZENITH_ANGLE =ee.Number(90).subtract(image.get('SUN_ELEVATION'))
  
  hillShade = ee.Terrain.hillshade(MergedDEM, SOLAR_AZIMUTH_ANGLE,
  SOLAR_ZENITH_ANGLE).rename(['hillShade'])
               
  return hillShade
  
def CalcHillShadows(image, geo):
  MergedDEM = ee.Image("users/eeProject/MERIT").clip(geo.buffer(30)) # potential choice, area buffered that hillshadow is calculated within 
  SOLAR_AZIMUTH_ANGLE = ee.Number(image.get('SUN_AZIMUTH'))
  SOLAR_ZENITH_ANGLE =ee.Number(90).subtract(image.get('SUN_ELEVATION'))

  hillShadow = ee.Terrain.hillShadow(MergedDEM, SOLAR_ZENITH_ANGLE,SOLAR_ZENITH_ANGLE, 30).rename(['hillShadow'])
    
  return hillShadow


# Need to add comments here describing function

In [82]:
def ExtractChannel(image, centerline, bound, maxDistance):
  cost = image.Not().cumulativeCost(ee.Image().toByte().paint(centerline, 1).And(image), maxDistance, False)
  channelMask = cost.eq(0).unmask(0).clip(bound).rename(['channelMask'])
  channel = image.mask(channelMask).unmask(0)
  return channel

# Need to add comments which explain why this is necessary

In [None]:
def removeGeo(i):
    return i.setGeometry(None)

# Surface reflectance pull function

* The pull function first defines variables that will be used to mask each image (i.e. clouds, hillshadows, dswe). 

* Then a waterOut variable is defined as the image with all of the masks applied and additional variables to be exported within our table (i.e. the DSWE value, the cloud value, etc.)

* The extract channel function is then called to apply our pull function over the channel mask

* Then the combined reducer which I don't understand , yet ! 

* Then we reduce regions and map the function with the removed geometry function which I alsot don't understand it's importance, yet!

In [2]:
def pull(image):

    d = Dswe(image).select('dswe')
    dswe3 = d.eq(3).rename('dswe3').selfMask().updateMask(clouds.eq(0)).updateMask(hs.eq(1)) 
    f = AddFmask(image).select('fmask')
    hs = CalcHillShadows(image, reach_poly.geometry()).select('hillShadow')
    clouds = f.gte(2).rename('clouds')
    d = Dswe(image).select('dswe')
    dummy = (image.select(['Blue'],['dswe1']).updateMask(clouds.eq(0)).updateMask(d.eq(1)).updateMask(hs.eq(1)))
    hs0 = hs.eq(0).rename('shadow').selfMask().updateMask(clouds.eq(0)).updateMask(d.eq(1))
    cloud_cover = image.metadata('CLOUD_COVER')
    sun_elevation = image.metadata('SUN_ELEVATION')
    sun_azimuth = image.metadata('SUN_AZIMUTH')
   
    waterOut = (image.addBands(d)
              #.addBands(image.select(['Nir'],['NirSD'])) # no idea why this is needed at the moment so I'm commenting it out
              .updateMask(d.eq(1))
              .updateMask(clouds.eq(0))
              .updateMask(hs.eq(1))
              .addBands(dswe3)
              .addBands(dummy)
              .addBands(hs0)
              .addBands(hs)
              .addBands(clouds)
              .addBands(cloud_cover)
              .addBands(sun_elevation)
              .addBands(sun_azimuth))
   
    waterChannel = ExtractChannel(waterOut.select('dswe'.eq(1), reach, polygon.geometry(), 750))
  
    waterOut2 = waterOut.updateMask(waterChannel)
    
    combinedReducer = (ee.Reducer.median().unweighted()
    .forEachBand(waterOut2.select(['Aerosol','Blue', 'Green', 'Red', 'Nir', 'Swir1', 'Swir2', 'TIR1','pixel_qa', 'dswe']))
    .combine(ee.Reducer.stdDev().unweighted().forEachBand(waterOut2.select(['NirSD'])), 'sd_', False)
    .combine(ee.Reducer.count().unweighted().forEachBand(waterOut2.select(['dswe3', 'dswe1','shadow' ])), 'pCount_', False)
    .combine(ee.Reducer.mean().unweighted().forEachBand(waterOut2.select(['hillShadow', 'clouds'])), '', False)
    .combine(ee.Reducer.firstNonNull().unweighted().forEachBand(waterOut2.select(['CLOUD_COVER', 'SUN_ELEVATION', 'SUN_AZIMUTH']))))
         
    ## Collect median reflectance and occurance values
    ## Make a cloud score, and get the water pixel count
    lsout = waterOut2.reduceRegions(reach_poly, combinedReducer, 30)
    
    out = lsout.map(removeGeo)
    
    return out


Function for limiting number of tasks in Task Manager

In [8]:
def maximum_no_of_tasks(MaxNActive, waitingPeriod):
  ##maintain a maximum number of active tasks
  time.sleep(10)
  ## initialize submitting jobs
  ts = list(ee.batch.Task.list())

  NActive = 0
  for task in ts:
       if ('RUNNING' in str(task) or 'READY' in str(task)):
           NActive += 1
  ## wait if the number of current active tasks reach the maximum number
  ## defined in MaxNActive
  while (NActive >= MaxNActive):
      time.sleep(waitingPeriod) # if reach or over maximum no. of active tasks, wait for 2min and check again
      ts = list(ee.batch.Task.list())
      NActive = 0
      for task in ts:
        if ('RUNNING' in str(task) or 'READY' in str(task)):
          NActive += 1
  return()

Merging landsat collections and scaling band values

In [None]:
#Clip image function
def clipImage(image):
  return image.clip(polygon.geometry())

#LS Collection 2 has a different scaling parameter that needs to be applied to the optical bands. Also, thermal bands need scale factors and these are different for LS8 and LS5/LS7

def scale_ls5_ls7(image):

  opticalBands = image.select('SR_B.').multiply(0.0000275).add(-0.2)
  thermalBand = image.select('ST_B6').multiply(0.00341802).add(149.0)
  return image.addBands(srcImg = opticalBands, overwrite = True)\
              .addBands(srcImg = thermalBand, overwrite = True)

def scale_ls8_ls9(image):
  opticalBands = image.select('SR_B.').multiply(0.0000275).add(-0.2)
  thermalBand = image.select('ST_B10').multiply(0.00341802).add(149.0)
  return image.addBands(srcImg = opticalBands, overwrite = True)\
              .addBands(srcImg = thermalBand, overwrite = True)

l9 = ee.ImageCollection('LANDSAT/LC09/C02/T1_L2')
l8 = ee.ImageCollection("LANDSAT/LC08/C02/T1_L2")
l7 = ee.ImageCollection('LANDSAT/LE07/C02/T1_L2')
l5 = ee.ImageCollection('LANDSAT/LT05/C02/T1_L2')

bn98 = ['SR_B1','SR_B2','SR_B3', 'SR_B4', 'SR_B5','SR_B6','SR_B7', 'ST_B10','QA_PIXEL']
bn75 = ['Null_CS','SR_B1','SR_B2','SR_B3', 'SR_B4', 'SR_B5','SR_B6','SR_B7','QA_PIXEL']
bands = ['Aerosol','Blue', 'Green', 'Red', 'Nir', 'Swir1', 'Swir2', 'TIR1','pixel_qa']

ls9 = l9.map(scale_ls8_ls9).select(bn98, bands)
ls8 = l8.map(scale_ls8_ls9).select(bn98, bands)
ls7 = l8.map(scale_ls5_ls7).select(bn75, bands)
ls5 = l8.map(scale_ls5_ls7).select(bn75, bands)

#filtering for cloud cover less than 50 reduces processing time (don't know if this should be higher or lower)
ls_collection2 = l9.merge(ls8).merge(ls7).merge(ls5).filter(ee.Filter.lt('CLOUD_COVER', 50)).filterDate('1984-03-16', '2022-11-01')

Preparing to loop over site identifiers

In [12]:
#Testing doing this over Allegheny River 
polygons = ee.FeatureCollection("projects/ee-samsillen0/assets/nhd_Allegheny_sample_polygon");
#load nhd centerlines
centerline = ee.FeatureCollection("projects/ee-samsillen0/assets/nhd_Allegheny_sample_centerline");

polygonssort = polygons.sort('ID')

polygonsID = polygonssort.aggregate_array('ID').getInfo() 

dlDir = 'G:/My Drive/UpperAllegheny' 
filesDown = os.listdir(dlDir)  

filesDown = [int(i.replace(".csv", "")) for i in filesDown]

polygonsID  = [i for i in polygonsID if i not in filesDown]


In [96]:
for x in range(0,len([polygonsID])):

  polygon = ee.Feature(polygons.filter(ee.Filter.eq('ID', polygonsID[x])).first())
  
  reach = ee.Feature(centerline.filter(ee.Filter.eq('ID', polygonsID[x])).first())

  shell = ee.Feature(None)

  lsover = ls2.filterBounds(reach.geometry())\
  .map(clipImage)

  data = lsover.map(pull)
    
  dataOut = ee.batch.Export.table.toDrive(collection = data, \
                                            description = str(polygonsID[x]),\
                                            folder = 'UpperAllegheny',\
                                            fileFormat = 'csv')

  print(polygonsID[x])

  maximum_no_of_tasks(15, 60)
  #Send next task.
  
  dataOut.start()
  print('done')


8588
done
8589
done
8590
done
8591
done
8592
done
8593
done
8594
done
8595
done
8596
done
8597
done
8598
done
8599
done
8600
done
8601
done
8602
done
8603
done
8604
done
8605
done
8606
done
8607
done
8608
done
8609
done
8610
done
8611
done
8612
done
8613
done
8614
done
8615
done
8616
done
8617
done
8619
done
8620
done
8621
done
8622
done
8623
done
8624
done
8625
done
8626
done
8627
done
8628
done
8629
done
8630
done
8631
done
8632
done
8633
done
8634
done
8635
done
8636
done
8637
done
8638
done
8639
done
8640
done
8641
done
8642
done
8643
done
