# Update datasets in GC which have newer versions available

In [None]:
import numpy as np
import pandas as pd
import os
import gcsfs
import xarray as xr
from functools import partial
from IPython.display import display
from glob import glob
import warnings
import datetime

### Local modules

In [None]:
from request import requests, set_request_id
from search import search_new, esgf_search_sites
from netcdf import get_ncfiles, concatenate
from identify import needed_newversion, get_version
from response import response, dict_to_dfcat, get_details
from utilities import getFolderSize

### Initialization

In [None]:
# Where to write NEW zarr stores:

mach = os.uname()[1]
if 'haden' in mach:
    local_storage = True
    zarr_local = '/h68/naomi/zarr-minimal'
else:
    local_storage = False
    zarr_local = '/d1/naomi/cmip6-zarrs'  # usually matches location in nb1-DataRequests

fs = gcsfs.GCSFileSystem(token='anon', access='read_only')

### Choose basic configuration parameters

In [None]:
dtype = esgf_search_sites()

print('possible ESGF API search nodes: ',list(dtype.keys()))

local_node=False
ESGF_site = dtype['llnl'];local_node = True
#ESGF_site = dtype['dkrz'];local_node = False
#ESGF_site = dtype['ipsl'];local_node = False
#ESGF_site = dtype['ceda'];local_node = False

# List sites to skip for aquiring new netcdf files: broken or slow sites
skip_sites = []#'esg.lasg.ac.cn','esgf-data2.diasjp.net','esgf-cnr.hpc.cineca.it'] #['dist.nmlab.snu.ac.kr']

### Get prior Google Sheet requests

In [None]:
df_prior = pd.read_csv('csv/requests.csv')
#df_prior
#df_prior.keys()

### Get new Google Sheet requests
- by default, only the new rows from the sheet are considered
- specifying a list of rows or emails will add older entries 

In [None]:
rows = []   
emails = []

# modify here:

rows = [199]

#emails = ['neil.swart@canada.ca']
df_request_new, dtrouble = requests(df_prior,rows=rows,emails=emails)

request_id = set_request_id()

# Check for mal-formed requests (non-existent variables, etc)
if len(dtrouble)>=1:
    print(dtrouble)

df_request_new

In [None]:
# choose a new request to process:
timestamps = df_request_new.Timestamp.unique()
print(timestamps)
df_request_new = df_request_new[df_request_new.Timestamp == timestamps[-1]]
df_request_new

### Search ESGF for the availability of requested data

In [None]:
NewSearch=True

if NewSearch:
    print(ESGF_site)
    df_ESGF = search_new(ESGF_site,df_request_new,local_node=local_node)
    df_ESGF.to_csv('csv/ESGF_UV.csv',index=False)
else:
    df_ESGF = pd.read_csv('csv/ESGF_UV.csv')

### Get the master list of existing zarr stores
- df_master includes all curated stores
- df_avail includes all stores, even those with known ES-DOC issues 

In [None]:
df_avail = pd.read_csv('https://cmip6.storage.googleapis.com/cmip6-zarr-consolidated-stores-noQC.csv')
len(df_avail),len(df_ESGF)

### Check the new requests:
- exists in df_ESGF (what is available)? if yes, continue
- already exists in df_avail (what we have)? if yes, check version

In [None]:
print(ESGF_site)

df_needed = needed_newversion(df_avail, df_request_new, df_ESGF)

if len(df_needed) > 0:
    num_stores = df_needed.zstore.nunique() 
    print(f'needed: nfiles={len(df_needed)}, nstores={num_stores}')
    #print(df_needed.zstore.unique())
else:
    print('no new data available')
    exit

In [None]:
#assert False 

### Start logging the progress and exceptions

In [None]:
cat_file = 'csv/cmip6_'+request_id+'.csv'
log_file = 'txt/request_'+request_id+'.log'
print(log_file)

In [None]:
# open and close for each write in case of kernel interrupt
def write_log(file,str,verbose=True):
    f = open(file,'a')
    if verbose:
        print(str)
    f.write(str+'\n')
    f.close()
    return

In [None]:
df_needed['member'] = [int(s.split('r')[-1].split('i')[0]) for s in df_needed['member_id']]
df_needed = df_needed.sort_values(by=['source_id'])
df_needed = df_needed.sort_values(by=['member'])
df_needed.head()

### The real work is done in this next loop 

In [None]:
# List sites to skip for aquiring new netcdf files: broken or slow sites

skip_sites = []
skip_sites += ['esg.lasg.ac.cn']
#skip_sites += ['esgf.nci.org.au']
skip_sites += ['esg-cccr.tropmet.res.in']
skip_sites += ['esgf.ichec.ie']  # incomplete downloads
#skip_sites += ['esgf-data3.ceda.ac.uk']
skip_sites += ['dpesgf03.nccs.nasa.gov']
#skip_sites += ['esgf3.dkrz.de']
skip_sites

In [None]:
# some new versions are IDENTICAL to old - same checksums, tracking_ids, etc. 
# WHAT TO DO???? Will keep trying to download again and again
skip_stores = ['gs://cmip6/CMIP/E3SM-Project/E3SM-1-1-ECA/historical/r1i1p1f1/Amon/prc/gr/'] # new version same as old
skip_stores += ['gs://cmip6/CMIP/E3SM-Project/E3SM-1-1-ECA/historical/r1i1p1f1/Amon/pr/gr/'] # new version same as old


In [None]:
# reload the catalog
df_GCS = pd.read_csv('https://cmip6.storage.googleapis.com/cmip6-zarr-consolidated-stores-noQC.csv', dtype='unicode')

date = str(datetime.datetime.now().strftime("%Y%m%d"))
os.system('cp /home/naomi/cmip6-zarr/csv/pangeo-cmip6-noQC.csv /home/naomi/cmip6-zarr/csv/pangeo-cmip6-'+date+'-noQC.csv')

if local_storage:
    df_loc = pd.read_csv('shelf-new/local.csv', dtype='unicode')
    zbdirs = []
    for i in range(1,84):
        zbdirs += ['/h'+str(i)]

# refresh the gcsfs
fs.invalidate_cache()

new_zarrs = df_needed.zstore.unique()

verbose = True
for item,zarr in enumerate(new_zarrs):
   
    zbdir  = zarr_local  + zarr
    zbtemp = 'zbtemp' + zarr
    gsurl = 'gs://cmip6' + zarr
    
    if gsurl in skip_stores:
        print('Troublesome data - new version same as old',gsurl)
        continue
        
    write_log(log_file,f"\n>>{item+1}/{num_stores}:<< local file: {zbdir}",verbose=verbose)
    
    try:
        dataset_cloud, version_cloud = get_version(fs.get_mapper(gsurl),method='none')   
        # caching trouble using fsspec!!, use the fs directly
    except:
        version_cloud = 'unknown'
    
    cstore = df_GCS[df_GCS.zstore == gsurl]
    if len(cstore) > 0:
        print(cstore.zstore.values,'version in GCS catalog:',cstore.version.values)
        print('store in cloud catalog') 
        
    new_version = df_needed[df_needed.zstore==zarr].version.unique()[0][1:]
    
    print('available version:',new_version,'cloud version, using gcsfs:',version_cloud)
    if int(new_version) <= int(version_cloud):
        print('new(er) version already uploaded to cloud',version_cloud)
        continue
        
    # CHECK IN shelf-new/local.csv
    if local_storage:
        df1 = df_loc[df_loc['zstore'].str.contains(zarr[:-1])]

    gfiles,troubles,codes,okay = get_ncfiles(zarr,df_needed,skip_sites)
    
    write_log(log_file,troubles,verbose=verbose)
    
    if okay == False:
        continue

    if len(gfiles) == 0: 
        write_log(log_file,'no files available',verbose=verbose)
        continue
    
    variable_id = zarr.split('/')[-3]
    for gfile in gfiles:   # changes file sizes!!
        command = '/usr/bin/ncatted -h -O -a missing_value\,'+variable_id+',d,, '+gfile
        os.system(command)

    # concatenate in time with mfdataset
    #print(gfiles)
    gfiles = sorted(gfiles)
    status, ds, dstr = concatenate(zarr,gfiles,codes)  

    if status == 'failure':
        write_log(log_file,status+dstr,verbose=verbose)
        continue
    else:
        write_log(log_file,dstr)

    # 0. save new zstore
    try:
        del ds[variable_id].encoding['missing_value']
        ds.to_zarr(zbtemp, consolidated=True, mode='w')
    except:
        ds.to_zarr(zbtemp, consolidated=True, mode='w')
        
    if not os.path.isfile(zbtemp+'/.zmetadata'):
        write_log(log_file,'to_zarr failure: ',verbose=verbose)
        continue
     
    # 1. delete old version in GC
    command = '/usr/bin/gsutil -m rm -r '+ gsurl[:-1]
    print(command)
    os.system(command) 
    
    # 2. delete entry in ncsv/GC_files_{activity_id}-{institution_id}.csv
    activity_id = cstore.activity_id.values[0]
    institution_id = cstore.institution_id.values[0]
    
    file = f'/home/naomi/cmip6-zarr/ncsv/GC_files_{activity_id}-{institution_id}.csv'
    #print('modifying ',file)
    with open(file, "r") as f:
        lines = f.readlines()
    with open(file, "w") as f:
        for line in lines:
            if line.strip("\n") != gsurl + ".zmetadata":
                f.write(line)
        
    if local_storage:
        # 3. delete old local copy(ies)
        for zloc in df1.zstore.values:
            if os.path.exists(zloc):
                command = '/bin/rm -rf '+ zloc
                print(command)
                os.system(command)
            else:
                print('not active: ',zloc)
    
        # 4. delete entry(ies) in shelf-new/h*.csv
        for zloc in df1.zstore.values:
            file = 'shelf-new/' + zloc.split('/')[1] + '.csv'
            print(file)
            writeable = os.access(file, os.W_OK)
            if not writeable:
                command = "chmod u+w " + file
                #print(command)
                os.system(command)

            dfff = pd.read_csv(file, dtype='unicode')
            dff = dfff[dfff.zstore != zloc]
            dff.to_csv(file, mode='w+', index=False)

            if not writeable:
                command = "chmod u-w " + file
                #print(command)
                os.system(command)
               
            # remove from concatenated catalog
            file = 'shelf-new/local.csv'
            dfff = pd.read_csv(file, dtype='unicode')
            dff = dfff[dfff.zstore != zloc]
            dff.to_csv(file, mode='w+', index=False)


        # 5. copy zbtemp to zbdir
        grid_label = zbdir[:-1].split('/')[-1]
        command = 'mkdir -p ' + zbdir.split(grid_label)[0]
        os.system(command) 
        command = '/bin/cp -r '+ zbtemp[:-1] + ' ' + zbdir[:-1]
        write_log(log_file,command,verbose=verbose)
        os.system(command) 
    
    # 6. upload to cloud
    command = '/usr/bin/gsutil -m cp -r '+ zbtemp[:-1] + ' ' + gsurl[:-1]
    write_log(log_file,command,verbose=verbose)
    os.system(command) 
        
    size_remote = fs.du(gsurl)
    size_local = getFolderSize(zbdir)
    assert (size_remote - size_local) < 100

    try:
        ds = xr.open_zarr(fs.get_mapper(gsurl), consolidated=True)
        write_log(log_file,f'successfully saved as {gsurl}')
        for gfile in gfiles:
            os.system('rm -f '+ gfile)
        #os.system('rm -f '+ zbtemp)
    except:
        write_log(log_file,'store did not get saved to GCS properly')
    

    # 7. fix version in local noQC catalog  (only update cloud catalog when finished)
    
    df = pd.read_csv('csv/pangeo-cmip6-noQC.csv', dtype='unicode')
    idx = df.index[df['zstore'] == gsurl]
    df.at[idx, 'version'] = new_version
    df.to_csv('csv/pangeo-cmip6-noQC.csv', mode='w+', index=False)

In [None]:
assert == False

In [None]:
# When done, update the GCS noQC catalog with the modified local one
ret = os.system('/usr/bin/gsutil -m cp csv/pangeo-cmip6-noQC.csv gs://cmip6/cmip6-zarr-consolidated-stores-noQC.csv')
if ret != 0:
    print('noQC upload not working')