In [None]:
#@title Copyright 2024 Google LLC. { display-mode: "form" }
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

# Ingest CBERS data from GCS

This script lists a set of XML files from CBERS4A assets stored in a Cloud Storage bucket (here, a bucket named "CBERS4A"), then triggers image ingestion.

## Start an authorized session

To be able to make an Earth Engine asset in your user folder, you need to be able to authenticate as yourself when you make the request.  You can use credentials from the Earth Engine authenticator to start an [`AuthorizedSession`](https://google-auth.readthedocs.io/en/master/reference/google.auth.transport.requests.html#google.auth.transport.requests.AuthorizedSession).  You can then use the `AuthorizedSession` to send requests to Earth Engine.

In [None]:
import ee
from google.auth.transport.requests import AuthorizedSession

ee.Authenticate()  #  or !earthengine authenticate --auth_mode=gcloud

# Specify the project you want associated with Earth Engine requests.
ee_project = 'ee-demos' # @param {"type":"string","placeholder":"mdewitt-earthengine"}

session = AuthorizedSession(
    ee.data.get_persistent_credentials().with_quota_project(ee_project)
)

## Generate a list of all the images from the bucket.

This produces a list of all of the `BAND6.xml` files in the bucket; each image is assumed to correspond one-to-one with a `BAND6.xml` file.

In [None]:
! gsutil ls -r gs://cbers4a-test/WFI/** | grep 'BAND14.xml' > xmlfiles.txt
! wc -l xmlfiles.txt
! head xmlfiles.txt

Given the path of an XML file, parse out the metadata.

In [None]:
%%capture
!pip install untangle

In [None]:
from google.cloud import storage
from requests.adapters import HTTPAdapter
from urllib.parse import urlparse
from urllib.parse import urlunparse
import pathlib
import requests
import untangle
import urllib3

import warnings

# Cloud Storage client.
storage_client = storage.Client()

# The number of parallel threads to use for reading data and registering images.
THREAD_COUNT = 40
adapter = HTTPAdapter(
    pool_connections=THREAD_COUNT*2,
    pool_maxsize=THREAD_COUNT*2)
storage_client._http.mount("gs://", adapter)
storage_client._http.mount("https://", adapter)
storage_client._http._auth_request.session.mount("https://", adapter)
session.mount("gs://", adapter)
session.mount("https://", adapter)

# Housekeeping - this is a noisy warning.
warnings.filterwarnings('ignore', message=".*Connection pool size.*")

def getMetadata(xmlUrl):
  """Return the metadata dict for the given URL."""
  parsed = urlparse(xmlUrl)
  bucket = storage_client.bucket(parsed.netloc)
  blob = bucket.blob(parsed.path.strip('/'))

  with blob.open("r") as f:
    return extractMetadata(f.read())

def extractMetadata(fileContents):
  """Extract the metadata dict from a file's contents."""
  # Metadata
  m = untangle.parse(fileContents).prdf

  # Properties
  p = {}
#  p['ORIENTATION_ANGLE_DEGREE'] = m.orientationAngle.degree.cdata
#  p['ORIENTATION_ANGLE_MINUTE'] = m.orientationAngle.minute.cdata
#  p['ORIENTATION_ANGLE_SECOND'] = m.orientationAngle.second.cdata
#  p['SUN_INCIDENCE_ANGLE_DEGREE'] = m.sunIncidenceAngle.degree.cdata
#  p['SUN_INCIDENCE_ANGLE_MINUTE'] = m.sunIncidenceAngle.minute.cdata
#  p['SUN_INCIDENCE_ANGLE_SECOND'] = m.sunIncidenceAngle.second.cdata
#  p['VIEWING_TIME_CENTER'] = m.viewing.center.cdata + 'Z'

  m = m.image
#  p['RECEIVING_STATION'] = m.receivingStation.cdata
#  p['PROCESSING_STATION'] = m.processingStation.cdata
#  p['PROCESSING_TIME'] = m.processingTime.cdata
#  p['LEVEL'] = m.level.cdata
#  p['COLUMNS'] = m.columns.cdata
#  p['LINES'] = m.lines.cdata
#  p['VERTICAL_PIXEL_SIZE'] = m.verticalPixelSize.cdata
#  p['HORIZONTAL_PIXEL_SIZE'] = m.horizontalPixelSize.cdata
#  p['ORBIT_DIRECTION'] = m.orbitDirection.cdata
#  p['PATH'] = m.path.cdata
#  p['ROW'] = m.row.cdata
#  p['SUN_POSITION_ELEVATION'] = m.sunPosition.elevation.cdata
#  p['SUN_POSITION_AZIMUTH'] = m.sunPosition.sunAzimuth.cdata

  return {'properties' : p,
          'startTime': m.timeStamp.begin.cdata + "Z",
          'endTime': m.timeStamp.end.cdata + "Z"
          }

def createIngestRequest(xml_url):
  """Given a URL to an XML file, build the request to register the image."""
  parsed = urlparse(xml_url)
  path = pathlib.Path(parsed.path)

  asset_id = path.parent.name
  band_nums = [5, 6, 7, 8]
  uri_prefix = urlunparse([parsed.scheme, parsed.netloc, str(path.parent) + '/', '', '', ''])
  tilesets = list(map(lambda band_num: {
          "id": str(band_num),
          "sources": {
            "uris": [ str(path.name).replace('BAND14.xml', f'BAND{band_num}.tif') ]
          }
      }, band_nums))
  bands = list(map(lambda band_num: {
      "id": f"B{band_num}",
      "tilesetId": str(band_num)
  }, band_nums))

  return {
     "imageManifest": {
        "name": f"projects/{ee_project}/assets/CBERS4A/WFI/{asset_id}",
        "uriPrefix": uri_prefix,
        "tilesets": tilesets,
        "bands": bands,
        **getMetadata(xml_url)
      },
      "overwrite": True
  }

In [None]:
from concurrent.futures import ThreadPoolExecutor
from pprint import pprint
from tqdm import tqdm

import json
import time
import random

failedImages = []

def importImage(xmlUrl):
  """Import the image."""
  try:
    request = createIngestRequest(xmlUrl)
    url = f"https://earthengine.googleapis.com/v1alpha/projects/{ee_project}/image:importExternal"
    data = json.dumps(request)
    fetchWithBackoff(url, data)
  except Exception as ex:
    failedImages.append(f"{xmlUrl} {ex}")

def fetchWithBackoff(url, data, max_retries=5):
  """Post data to the given URL, retrying if necessary."""
    retry_delay = 1  # Initial delay in seconds
    for attempt in range(max_retries):
        try:
            response = session.post(
              url=url,
              data=data,
            )
            response.raise_for_status()
            return response.json()
        except requests.RequestException:
          time.sleep(retry_delay)
          retry_delay *= 2
          retry_delay += random.uniform(0, 1)
    raise Exception(f"Maximum retries hit\n{data}\n Final response: {response.reason}")


# Actually register all of the images that correspond to the XML files in the
# list of URLs.
with open('xmlfiles.txt') as f:
  warnings.filterwarnings('ignore', message=".*Connection pool size.*")
  lines = list(f.readlines())
  with ThreadPoolExecutor(max_workers=THREAD_COUNT) as pool:
    print("Running...")
    list(tqdm(pool.map(importImage, lines), total=len(lines)))
print(f'Done. Failed images: {failed_images}')