In [2]:
import subprocess
import boto3
import os
from google.cloud import storage
import shutil
import time
from botocore import UNSIGNED
from botocore.client import Config
from collections import namedtuple
from operator import attrgetter
from pprint import pprint
import multiprocessing as mp
from google.auth.transport.requests import AuthorizedSession
import ee
os.environ["GCLOUD_PROJECT"] = "opera-one"

In [2]:
s3_prefix = 'products/int_fwd_r2/2023-05-04_globalrun_2021-04-11_to_2021-04-22/RTC_S1/'
gcs_prefix = 'products/2023-05-04_globalrun_2021-04-11_to_2021-04-22/'
#gcs_prefix = 'products/cog-test/'

In [3]:
def upload_blob(bucket_name, source_file_name, destination_blob_name):
    storage_client = storage.Client()
    bucket = storage_client.get_bucket(bucket_name,timeout=None)
    blob = bucket.blob(destination_blob_name)
    blob.upload_from_filename(source_file_name,timeout=None)

def processRTC(s3key,gcskey):
    filename = s3key.split('/')[-1]
    if os.path.exists(f'./temp_{filename}/'):
        shutil.rmtree(f'./temp_{filename}/')
    os.mkdir(f'./temp/_{filename}')

    #Download RTC file
    print('Downloading RTC File')
    s3 = boto3.client('s3', config=Config(signature_version=UNSIGNED))
    bucket = 'opera-pst-rs-pop1'
    filename = s3key.split('/')[-1]
    filepath = f'./temp_{filename}/'+filename
    s3.download_file(bucket, s3key, filepath)

    #Change compression to DEFLATE
    print('Translating compression')
    tempfile = f'./temp_{filename}/gtiff32.tif'
    outfile = f'./temp_{filename}/cog32deflate.tif'
    gdal_cmd1 = f'gdal_translate -of GTiff -co NBITS=32 {filepath} {tempfile}'
    gdal_cmd2 = f'gdal_translate -of COG -co COMPRESS=DEFLATE -co RESAMPLING=AVERAGE {tempfile} {outfile}'
    subprocess.run(gdal_cmd1,shell=True,stdout=subprocess.DEVNULL)
    subprocess.run(gdal_cmd2,shell=True,stdout=subprocess.DEVNULL)
    metadata = os.popen(f'gdalinfo {filepath}').read()
    if metadata.split('ORBIT_PASS_DIRECTION=')[1].split('\n')[0] == 'ASCENDING':
        pass_d = 'A'
    elif metadata.split('ORBIT_PASS_DIRECTION=')[1].split('\n')[0] == 'DESCENDING':
        pass_d = 'D'
    else:
        pass_d = 'N'

    #Upload to GCS bucket
    print('Uploading to GCS Bucket')
    gcskey = gcskey.split('.tif')[0] + f'_{pass_d}.tif'
    gcsbucket = "opera-bucket-rtc"
    #upload_blob(gcsbucket,outfile,gcskey)

    shutil.rmtree(f'./temp_{filename}/')


In [88]:
s3 = boto3.client('s3', config=Config(signature_version=UNSIGNED))
paginator = s3.get_paginator('list_objects_v2')
pages = paginator.paginate(Bucket='opera-pst-rs-pop1', Prefix=s3_prefix, Delimiter='/')
keyList = []
for page in pages:
    #pprint(page)
    for obj in page['CommonPrefixes']:
        path = obj.get('Prefix')
        if path[-2] != 's':  #checks if end is not "static_layers"
            fname = path.split('/')[-2]+'_VH.tif'
            keyList.append(path+fname)

print(f'{len(keyList)} s3 keys found')

296476 s3 keys found


In [4]:
storage_client = storage.Client()
blobs = storage_client.list_blobs("opera-bucket-rtc", prefix=gcs_prefix)
gcsKeys = []
for blob in blobs:
    gcsKeys.append(blob.name.split('.tif')[0][:-2]+'.tif')
print(f'{len(gcsKeys)} existing gcs keys found')

274723 existing gcs keys found


In [105]:
keyPairs = []
for key in keyList:
    fname = key.split('/')[-1]
    gcsKey = gcs_prefix+fname
    if gcsKey not in gcsKeys:
        keydict = {'s3key':key,'gcsKey':gcsKey}
        keyPairs.append(keydict)
print(len(keyPairs))

295975


In [101]:
def run_rtc_transfer(keydict):
    s3key = keydict['s3key']
    gcskey = keydict['gcsKey']
    processRTC(s3key,gcskey)


In [102]:
pool = mp.Pool(4)

In [106]:
pool.map(run_rtc_transfer,[keydict for keydict in keyPairs[0:8]])

Process SpawnPoolWorker-9:
Process SpawnPoolWorker-10:
Process SpawnPoolWorker-11:
Process SpawnPoolWorker-12:
Traceback (most recent call last):
Traceback (most recent call last):
  File "/Users/mbonnema/miniconda3/envs/opera-gee/lib/python3.10/multiprocessing/process.py", line 315, in _bootstrap
    self.run()
  File "/Users/mbonnema/miniconda3/envs/opera-gee/lib/python3.10/multiprocessing/process.py", line 315, in _bootstrap
    self.run()
  File "/Users/mbonnema/miniconda3/envs/opera-gee/lib/python3.10/multiprocessing/process.py", line 108, in run
    self._target(*self._args, **self._kwargs)
  File "/Users/mbonnema/miniconda3/envs/opera-gee/lib/python3.10/multiprocessing/process.py", line 108, in run
    self._target(*self._args, **self._kwargs)
  File "/Users/mbonnema/miniconda3/envs/opera-gee/lib/python3.10/multiprocessing/pool.py", line 114, in worker
    task = get()
  File "/Users/mbonnema/miniconda3/envs/opera-gee/lib/python3.10/multiprocessing/pool.py", line 114, in worker


KeyboardInterrupt: 

In [48]:
i = 0
start_time = time.time()
for key in keyList[0:5]:
    fname = key.split('/')[-1]
    gcsKey = gcs_prefix+fname
    if gcsKey not in gcsKeys:
        processRTC(key,gcsKey)
        print(f'[{i} - {time.time() - start_time}] Uploaded to: {gcsKey}')
    else:
        print(f'[{i} - {time.time() - start_time}] Skipped: {gcsKey}')
    i = i+1

Downloading RTC File
Translating compression
Input file size is 3310, 1664
0...10...20...30...40...50...60...70...80...90...100 - done.
Input file size is 3310, 1664
0...10...20...30...40...50...60...70...80...90...100 - done.
Uploading to GCS Bucket
[0 - 7.791720867156982] Uploaded to: products/2023-05-04_globalrun_2021-04-11_to_2021-04-22/OPERA_L2_RTC-S1_T001-000010-IW1_20210419T180101Z_20230505T150330Z_S1A_30_v0.0_VH.tif
Downloading RTC File
Translating compression
Input file size is 3437, 1652
0...10...20...30...40...50...60...70...80...90...100 - done.
Input file size is 3437, 1652
0...10...20...30...40...50...60...70...80...90...100 - done.
Uploading to GCS Bucket
[1 - 16.34800410270691] Uploaded to: products/2023-05-04_globalrun_2021-04-11_to_2021-04-22/OPERA_L2_RTC-S1_T001-000010-IW2_20210419T180102Z_20230505T150330Z_S1A_30_v0.0_VH.tif
Downloading RTC File
Translating compression
Input file size is 3101, 1622
0...10...20...30...40...50...60...70...80...90...100 - done.
Input fi

In [81]:
#bucket = 'opera-bucket-1'
#prefix = 'Validation/DSWx-HLS/dswx_global_summer/'

#storage_client = storage.Client()
#blobs = storage_client.list_blobs(bucket, prefix=prefix)
#gcsKeys = []
#for blob in blobs:
#    gcsKeys.append(blob.name)
#page_token = blobs.next_page_token
#i = 0
#while page_token != None:
#    blobs = storage_client.list_blobs(bucket, prefix=prefix,next_page_token=page_token)
#    for blob in blobs:
#        gcsKeys.append(blob.name)
#    page_token = blobs.next_page_token
#    i = i+1
#    print(f'page number {i}')


In [86]:
print("Number of processors: ", mp.cpu_count())

Number of processors:  10


In [3]:
session = AuthorizedSession(ee.data.get_persistent_credentials())