#1- Initialise api




In [1]:
import ee

# Trigger the authentication flow.
ee.Authenticate()

# Initialize the library.
ee.Initialize()

To authorize access needed by Earth Engine, open the following URL in a web browser and follow the instructions. If the web browser does not start automatically, please manually browse the URL below.

    https://accounts.google.com/o/oauth2/auth?client_id=517222506229-vsmmajv00ul0bs7p89v5m89qs8eb9359.apps.googleusercontent.com&scope=https%3A%2F%2Fwww.googleapis.com%2Fauth%2Fearthengine+https%3A%2F%2Fwww.googleapis.com%2Fauth%2Fdevstorage.full_control&redirect_uri=urn%3Aietf%3Awg%3Aoauth%3A2.0%3Aoob&response_type=code&code_challenge=fvy10hdf0xZ5LibDqeRptTdvF1vgJyeTF_MODz63cdY&code_challenge_method=S256

The authorization workflow will generate a code, which you should paste in the box below. 
Enter verification code: 4/1AX4XfWiOdAsF5FyJESwiKjgK0ks4LI6Po5i2QgLePzMLOGaDIjN3QqEJIiQ

Successfully saved authorization token.


#2- Useful functions

In [2]:
class generate_image(object):
  def  __init__(self, api, boundaries_path, start_date,end_date, cloud_percentage, folder = 'earthengine'):
    self.api = api
    self.boundaries_path=boundaries_path 
    self.start_date = start_date 
    self.end_date = end_date 
    self.cloud_percentage = cloud_percentage 
    #self.filename = filename
    self.folder = folder
    

  def getGeometry(self):
      area = self.api.FeatureCollection(self.boundaries_path);
      return  area.geometry()

  def getImage(self):
    s2 = self.api.ImageCollection("COPERNICUS/S2_SR")
    geometry = self.getGeometry()
    filtered = s2.filter(self.api.Filter.lt('CLOUDY_PIXEL_PERCENTAGE', self.cloud_percentage)).filter(self.api.Filter.date(self.start_date , self.end_date)).filter(self.api.Filter.bounds(geometry))
    image = filtered.median()
    return image


  def getImages(self):
    s2 = self.api.ImageCollection("COPERNICUS/S2_SR")
    geometry = self.getGeometry()
    filtered = s2.filter(self.api.Filter.lt('CLOUDY_PIXEL_PERCENTAGE', self.cloud_percentage)).filter(self.api.Filter.date(self.start_date , self.end_date)).filter(self.api.Filter.bounds(geometry))
    return filtered
  
# Create a composite and apply cloud mask
  def getMask_images(self):
    # Cloud masking
    def maskCloudAndShadows(image):
      cloudProb = image.select('MSK_CLDPRB')
      snowProb = image.select('MSK_SNWPRB')
      cloud = cloudProb.lt(40)
      snow = snowProb.lt(5)
      scl = image.select('SCL')
      # shadow = scl.eq(3); # 3 = cloud shadow
      # cirrus = scl.eq(30); # 10 = cirrus
      # Cloud probability less than 5% or cloud shadow classification
      mask = cloud.And(snow) #.And(cirrus.neq(1)).And(shadow.neq(1))
      return image.updateMask(mask)

    s2 = self.api.ImageCollection("COPERNICUS/S2_SR")
    geometry = self.getGeometry()
    filtered = s2.filter(self.api.Filter.lt('CLOUDY_PIXEL_PERCENTAGE', self.cloud_percentage)).filter(self.api.Filter.date(self.start_date , self.end_date)).map(maskCloudAndShadows).filter(self.api.Filter.bounds(geometry))
    return filtered
#--------------------------------------------

  def gets2cloudless(self):
      s2_cloudless_col = (self.api.ImageCollection('COPERNICUS/S2_CLOUD_PROBABILITY')
          .filterBounds(self.getGeometry())
          .filterDate(self.start_date, self.end_date))
      return s2_cloudless_col

  def mosaicByDate(self, imgCol):
        '''
        mosaicByDate is a function that merges images together that have the same date.

        @ imgCol: [ee.ImageCollection] mandatory value that specifies the image collection to merge by dates with.

        Returns ee.ImageCollection
        '''
        #Convert the image collection to a list.
        imgList = imgCol.toList(imgCol.size())
        
        # Driver function for mapping the unique dates
        def uniqueDriver(image):
            return self.api.Image(image).date().format("YYYY-MM-dd")
        
        uniqueDates = imgList.map(uniqueDriver).distinct()

        # Driver function for mapping the moasiacs
        def mosaicDriver(date):
            date = self.api.Date(date)
            
            image = (imgCol
                  .filterDate(date, date.advance(1, "day"))
                  .mosaic())
            
            return image.set(
                            "system:time_start", date.millis(), 
                            "system:id", date.format("YYYY-MM-dd"))
        
        mosaicImgList = uniqueDates.map(mosaicDriver)
        
        return self.api.ImageCollection(mosaicImgList)
    
  def getTask(self, image, filename):
    aoi = self.getGeometry()
    task = self.api.batch.Export.image.toDrive(
       **{
                 'image': image,
                 'description':filename,
                 'folder': self.folder,
                 'fileNamePrefix':filename,
                  'region': aoi,
                  'scale': 10,
                  'maxPixels': 1e12,
                  'fileFormat': "GeoTIFF",
         }
       )
    return task
    
  def getNDVI_task(self, ndvi_name,image =False):
    #print(ndvi_name)
    ndvi_name = ndvi_name + 'ndvi_'+ self.start_date+'to'+ self.end_date +'_cloudval-'+ str(self.cloud_percentage)
    print(ndvi_name)
    geometry = self.getGeometry()
    #print('geometry')
    #print(geometry)
    if image==False:
      image = self.getImage() 
    else:
      image = image
    #print('image')
    #print(image)
    ndvi = image.normalizedDifference(['B8', 'B4']).rename(['ndvi']);
    #print(ndvi)
    visualized_ndvi = ndvi.clip(geometry)
    #print(visualized_ndvi)
   
    # task = self.api.batch.Export.image.toDrive(
    #   **{
    #             'image': visualized_ndvi,
    #             'description':ndvi_name,
    #             'folder': self.folder,
    #             'fileNamePrefix':ndvi_name,
    #              'region': geometry,
    #              'scale': 10,
    #              'maxPixels': 1e12,
    #              'fileFormat': "GeoTIFF",
    #     }
    #   )
    task = self.getTask(visualized_ndvi, ndvi_name  )
    return  task

  def getMNDWI_task(self, mndwi_name, image =False):
    mndwi_name = mndwi_name #+'_mndwi_' #+ self.start_date+'to'+ self.end_date +'_cloudval-'+ str(self.cloud_percentage)
    geometry = self.getGeometry()
    if image==False:
      image = self.getImage() 
    else:
      image = image
    mndwiVis = {'min':0, 'max':0.5, 'palette': ['white', 'blue']}
    mndwi = image.normalizedDifference(['B3', 'B11']).rename(['mndwi']);
    visualized_mndwi = mndwi.clip(geometry)
    # task = self.api.batch.Export.image.toDrive(
    #   **{
    #             'image': visualized_mndwi,
    #             'description':mndwi_name,
    #             'folder': self.folder,
    #             'fileNamePrefix': mndwi_name,
    #              'region': geometry,
    #              'scale': 10,
    #              'maxPixels': 1e12,
    #              'fileFormat': "GeoTIFF",
    #     }
    #   )
    # color_mndwi = mndwi.clip(geometry).visualize(**mndwiVis);

    # task = self.getTask(color_mndwi ,mndwi_name)
    task = self.getTask(visualized_mndwi,mndwi_name)
    return task



  def getNDWI_task(self, ndwi_name, image=False):
    ndwi_name = ndwi_name + 'ndwi_' +self.start_date+'to'+ self.end_date +'_cloudval-'+ str(self.cloud_percentage)
    geometry = self.getGeometry()
    if image==False:
      image = self.getImage() 
    else:
      image = image
    ndwi = image.normalizedDifference(['B8', 'B11']).rename(['ndwi']); 
    visualized_ndwi = ndwi.clip(geometry)
    # task = self.api.batch.Export.image.toDrive(
    #   **{
    #             'image': visualized_ndwi,
    #             'description':ndwi_name,
    #             'folder': self.folder,
    #             'fileNamePrefix': ndwi_name,
    #              'region': geometry,
    #              'scale': 10,
    #              'maxPixels': 1e12,
    #              'fileFormat': "GeoTIFF",
    #     }
    #   )
    task = self.getTask(visualized_ndwi, ndwi_name )
    return task

  def getrgb_img_task(self, rgb_name, image=False):
    rgb_name = rgb_name# + 'rgb_' +self.start_date+'to'+ self.end_date +'_cloudval-'+ str(self.cloud_percentage)
    geometry = self.getGeometry()
    if image==False:
      image = self.getImage() #.select(['B4', 'B3', 'B2'])
    else:
      image = image
    rgbVis = {
      'min': 0.0,
      'max': 3000,
      'bands': ['B4', 'B3', 'B2']
      } 
    #Map.addLayer(image, rgbVis, 'Filtered Collection');
    #Map.addLayer(image.clip(geometry), rgbVis, 'Image')
    visualized_rgb = image.clip(geometry).visualize(**rgbVis); #.visualize(image)
    # task = self.api.batch.Export.image.toDrive(
    #   **{
    #   'image': visualized_rgb,
    #   'description':rgb_name,
    #   'folder': 'earthengine',
    #   'fileNamePrefix':rgb_name,
    #   'region': geometry,
    #   'scale': 10,
    #   'maxPixels': 1e12,
    #   'fileFormat': "GeoTIFF",
    #     }
    #   )
    task1 = self.getTask(visualized_rgb, rgb_name )
    
    return task1


  #==================================================================================
  #==================================================================================


  def add_cloud_bands(self,img):
    CLD_PRB_THRESH = 60
    # Get s2cloudless image, subset the probability band.
    cld_prb = self.api.Image(img.get('s2cloudless')).select('probability')

    # Condition s2cloudless by the probability threshold value.
    is_cloud = cld_prb.gt(CLD_PRB_THRESH).rename('clouds')

    # Add the cloud probability layer and cloud mask as image bands.
    return img.addBands(self.api.Image([cld_prb, is_cloud]))

  def getS2cloudless_task(self,cloud_name, image=False):
    geometry = self.getGeometry()
    if image==False:
      image = self.getImage() #.select(['B4', 'B3', 'B2'])
    else:
      image = image
    # tmp =cloud_name.split('_')[1].split('-')
    # START_DATE= tmp[0] +  tmp[1] +  tmp[2]+str(1)
    # END_DATE = tmp[0] +  tmp[1] + tmp[2]+str(2)

    # img = self.getcloud_images(START_DATE, END_DATE)
    
    # img = img.mosaic()

    #cloud_img = image.clip(geometry).visualize(  **{ 'min': 0.0,'max': 100, 'palette':['brown', 'gray' ,'white']} )
    cloud_img = image.clip(geometry)

    task = self.getTask(cloud_img, cloud_name)
    return task


  def getAll_mosaic(self, types=['mndwi','rgb', 'cloud'], mask=False):
    if mask ==False:
      collection  = self.mosaicByDate(self.getImages())
      # get s2cloudless images
      error = False
      try:
        collection_s2cloudless =self.mosaicByDate(self.gets2cloudless())
        image_lists2cloudless = collection_s2cloudless.toList(collection_s2cloudless.size())
      except:
        error = True
        print(error)
      print('error', error)
      image_list = collection.toList(collection.size())
      img_size =  image_list .size().getInfo()
      print('size collection',image_list.size().getInfo())
      print('error', error)
      print('size collection  s2cloudless',image_lists2cloudless .size().getInfo())

      #unique_dates =  ee.Image(image_list .get(2)).date().format("YYYY-MM-dd")
      #print('datename',unique_dates.getInfo())
      tasks =[]
      for i in range(img_size):
        single_img = self.api.ImageCollection([image_list .get(i), image_list .get(i)]).mosaic();

        date =  self.api.Image(image_list.get(i)).date().format("YYYY-MM-dd").getInfo()
        if 'rgb' in types:
          tasks.append(self.getrgb_img_task('rgb_'+str(date),single_img))
        if 'mndwi' in types:
          tasks.append(self.getMNDWI_task('mndwi_'+str(date),single_img))
        if 'cloud' in types:
            img = self.api.ImageCollection([image_lists2cloudless.get(i), image_lists2cloudless.get(i)]).mosaic();
            tasks.append(self.getS2cloudless_task('cloud_'+str(date),img))
        if 'swi' in types:
          tasks.append(self.getSWI_task('swi_'+str(date),single_img))


      return tasks
    #=================================================================================

    else:  # GET IMAGES WITH MASK  PIXELS
      collection  = self.mosaicByDate(self.getMask_images())
      # get s2cloudless images
      error = False
      collection_s2cloudless =self.mosaicByDate(self.gets2cloudless())

      image_list = collection.toList(collection.size())
      img_size =  image_list .size().getInfo()
      print('size collection',image_list.size().getInfo())
    
      tasks =[]
      for i in range(img_size):
        single_img = self.api.ImageCollection([image_list .get(i), image_list .get(i)]).mosaic();
        date =  self.api.Image(image_list.get(i)).date().format("YYYY-MM-dd").getInfo()
        if 'rgb' in types:
          tasks.append(self.getrgb_img_task('rgb_'+str(date),single_img))
        if 'mndwi' in types:
          tasks.append(self.getMNDWI_task('mndwi_'+str(date),single_img))
      return tasks



# Get all images mosaic by dates

dagana :  from 01

In [None]:
api = ee
boundaries_path = "users/gloriewowo/dagana"
start_date = '2022-01-01'
end_date = '2022-03-16'
cloud_percentage =100
folder = 'earthengine'
generate_im1 = generate_image(api, boundaries_path, start_date,end_date, cloud_percentage, folder )

MNDWI

In [None]:
# Get the mndwi images
tasks = generate_im1.getAll_mosaic(['mndwi'])
for tsk in tasks:
  tsk.start()

CLOUD

In [None]:
# Download the S2CLOUDLESS
tasks = generate_im1.getAll_mosaic(['cloud'])
for tsk in tasks:
  tsk.start()

error False
size collection 59
error False
size collection  s2cloudless 88


# Rgb image

In [3]:
api = ee
boundaries_path = "users/gloriewowo/dagana"
start_date = '2022-01-01'
end_date = '2022-01-03'
cloud_percentage =100
folder = 'earthengine'
generate_im1 = generate_image(api, boundaries_path, start_date,end_date, cloud_percentage, folder )

# Download an rgb image
tasks = generate_im1.getAll_mosaic(['rgb'])
for tsk in tasks:
  tsk.start()

error False
size collection 1
error False
size collection  s2cloudless 1
