In [18]:
import numpy as np
import netCDF4 as nc
import datetime as dtm
import pandas as pd
import matplotlib.pyplot as plt
import glob
import os
import pyreadr
from multiprocessing import Pool
from functools import partial
import logging

from NastyaFlux import function

# Some globals are set here.

In [26]:
folder = '/projects/0/ctdas/PARIS/CTE-HR/PARIS_OUTPUT/'
scenario = 'BASE'
path = f'{folder}/{scenario}/paris_ctehr_yr1_{scenario}.nc'

MAX_LEN_FP = 240 
NLATS = 390
NLONS = 250
lats = np.linspace(33.05, 71.95, NLATS, dtype='float32')
lons = np.linspace(-14.9, 34.9, NLONS, dtype='float32')
# Define GLOBAL_LONS and GLOBAL_LATS
GLOBAL_LONS = np.arange(-179.5, 180.5, 1)
GLOBAL_LATS = np.arange(-89.5, 90.5, 1)

variables = ['flux_ff_exchange_prior', 
             'flux_ocean_exchange_prior', 
             'flux_fire_exchange_prior', 
             'flux_bio_exchange_prior']

fpdir = '/projects/0/ctdas/PARIS/DATA/footprints/wur/PARIS_recompile/'
# stationlist = ['JFJ', 'MHD', 'LUT']
stationlist = ['PAL']

bgpath = '/projects/0/ctdas/PARIS/DATA/background/STILT/'


In [27]:
stationlist = pd.read_csv('/projects/0/ctdas/PARIS/DATA/obs/stations/NAME_CO2_stations_UoB_Ganesan_Manning.txt', sep=',', header=None, index_col=0).index


with nc.Dataset(path) as ds:
    times = ds['time'][:]
    times = nc.num2date(times, units=ds['time'].units, 
                        only_use_cftime_datetimes=False, only_use_python_datetimes=True)
    times = pd.to_datetime(times)
    indices = np.arange(len(times))

# Function defenitions are done here

In [None]:
def get_times_needed(obsdate):
    """Get all the flux time stamps needed for the current observation"""
    firsttime = obsdate - dtm.timedelta(hours=MAX_LEN_FP)
    lasttime = obsdate
    return pd.date_range(firsttime, lasttime, freq='H', inclusive='right')

def get_fluxes_hour(time_toload):
    """Get the fluxes for one hour that requires loading"""
    index = indices[times==time_toload]
    
    fluxdict = {}
    with nc.Dataset(path) as ds:
        for v in variables:
            fluxdict[v] = ds[v][index, :, :]
    return fluxdict

def get_times_toload(obsdate, times_loaded):
    """Get the times needed to load for the current obsdate
    If some times are already loaded, we don't need them anymore!"""
    times_needed = get_times_needed(obsdate)
    times_to_load = times_needed.difference(times_loaded)
    return times_to_load

def load_fluxes(fluxdict_all, obsdate, times_loaded):
    times_to_load = get_times_toload(obsdate, times_loaded)
    print(f'Loading fluxes for {len(times_to_load)} hours')
    for t in times_to_load:
        newflux = get_fluxes_hour(t)
        for v in variables:
            # Overwrite the current array in fluxdict with a new hour
            # Cut-off the first value, as that is the oldest hour and we don't need it anymore
            fluxdict_all[v] = (np.append(fluxdict_all[v], newflux[v], axis=0))[1:, :, :]
        times_loaded = times_loaded.append(pd.DatetimeIndex([t]))
        # This way, times_loaded only gets longer. 
        # That should be no problem, as we only append NEWER dates.
        # Older fluxes (that are already handled), are thus never seen again.
        print(f'Loaded fluxes for {t}')
    return fluxdict_all, times_loaded

def get_fp(fpfile):
    """Get the footprint from the footrpint file
    Some checks are needed, as some of the footprint files are empty
    (and don't even contain metadata). This means all particles left the domain
    before touching the surface. We're handeling those cases in this function."""
    if fpfile is None:
        # Just means no footprint is made; probably we're not interested in this station/time combo
        return
    elif os.stat(fpfile).st_size == 0:
        # Means the sensitivity is 0, as all particles left the domain before touching the surface
        # We're returning the same format as if there was sensitivity
        # But just without sensitivity...
        return [np.array([], dtype=int)] * 4
    with nc.Dataset(fpfile) as ds:
        # Standard case: sensitivity to surface fluxes! Load in the indices
        influence = ds['Influence'][:]
        time = ds['Time'][:]
        lat = ds['Latitude'][:]
        lon = ds['Longitude'][:]
    lat_indices = np.searchsorted(lats, lat)
    lon_indices = np.searchsorted(lons, lon)
    time_indices = np.abs(time - 240, dtype=int, casting='unsafe')
    return influence, time_indices, lat_indices, lon_indices

def convolute(fp, fluxes):
    """Convolve the footprint and the fluxes"""
    # Initialise an empty dictionary to return
    returndict = {v: np.nan for v in variables}

    influence, time_indices, lat_indices, lon_indices = fp
    for v in variables:
        # Select the (sparse) indices from the (dense, 3d) fluxes
        # This speeds up computation
        fluxes_sel = fluxes[v][time_indices, lat_indices, lon_indices]
        returndict[v] = (influence * fluxes_sel).sum()

    returndict += influence * get_nastya_flux_constant("Finland")

    return returndict

def get_fpfile(station, date):
    """Get the footprint filename 
    Return the string if present, return None if no file is present"""
    filepath = f'{fpdir}/footprint_{station}_{date.year}x{date.month:02}x{date.day:02}x{date.hour:02}x*.nc'
    filelst = glob.glob(filepath)
    # Check if there's only one file 
    if len(filelst) == 1:
        fpfile = filelst[0]
        return fpfile
    else:
        print(f'No file {filepath} found. ')
        return
    
def get_last_times(t, path, lat, lon, agl, npars=250):
    """Get last recorded time of every particle for a certain station, using 
    the RData files from the STILT output.

    Input variables:
    t: datetime object
    path: path to RData files

    Output variables:
    last_times: dataframe with last times of particles at station

    """ 
    # Open the .RDatafile in python as pandas dataframe
    f = f'{path}/.RData{t.year}x{t.month:02}x{t.day:02}x{t.hour:02}x{lat}x{lon}x{agl}x{npars}'
    data = pyreadr.read_r(f)
    varname = f.split('/')[-1][6:]
    foot = data[varname]
    # Get the last time for each particle, as the minimum (most negative) time
    last_times = foot.iloc[foot.groupby('index')['time'].idxmin()]
    return last_times

def get_hour_idx(t):
    """Get 3-hourly index from datetime object.
    
    Input variables:
        t: datetime object
    Output variables:
        3-hourly index
    """
    # This probably could be cleaner. 
    # The thing we're achieving here is that CTE mole fractions are in 3-hourly 'bins'
    # And we need to get the index of that bin.
    if t.hour >= 22:
        if t.hour == 22 and t.minute > 30 or t.hour == 23:
            return t.hour//3
    if t.hour % 3 == 0 :
        return t.hour//3
    elif t.hour % 3 == 2:
        return t.hour // 3 + 1
    else:
        if t.minute > 30:
            return  t.hour // 3 + 1
        else: return t.hour//3


def get_bg(start, row, bgdir='/projects/0/ctdas/PARIS/DATA/background/STILT/'):
    """Load background concentration into netCDF Dataset at a certain location and time. 
    GLOBAL_LONS and GLOBAL_LATS are the global lon and lat arrays from the TM5 model.
    
    Input variables:
        start: datetime object
        row: row from last_times dataframe
    Output variables:
        bg: background concentration at location and time
    
    """
    # Calculate the start time of the particle
    dt = row['time'] / 60 # hours
    time = start + dtm.timedelta(hours=dt)
    # Get the background file to open
    # And get the indices to read. Reading only certain indices is much, much faster
    # Than loading everything and selecting later (netCDF uses 'lazy loading')
    bgfile = f'{bgdir}/3d_molefractions_1x1_{time.year}{time.month:02}{time.day:02}.nc'
    lat_idx = np.argmin(abs(row['lat'] - GLOBAL_LATS))
    lon_idx = np.argmin(abs(row['lon'] - GLOBAL_LONS))
    time_idx = get_hour_idx(time)
    # Read in the CTE background for the location of the particle
    with nc.Dataset(bgfile) as ds:
        pressures = ds['pressure'][time_idx, :, lat_idx, lon_idx]
        height_idx = np.argmin(abs(pressures - row['pres'] * 100))
        bg = ds['co2'][time_idx, height_idx, lat_idx, lon_idx]
    return bg

def calculate_mean_bg(fpfile):
    """Calculate mean background concentration for a certain footprint.
    Uses the get_last_times() and get_bg() functions.
    Input:
        fpfile: str: path location of the footprint file. Required to get the Rdata file location
    Returns: 
        float: background: the mean background for the particles released for this fpfile"""
    # Get the name of the footprintfile
    fpname = fpfile.split('/')[-1]
    # Parse to get the location and number of particles
    lat, lon, agl, npars = fpname.split('x')[4:8]
    # Number of particles is stripped of any zeros. (could be better coded!)
    npars = npars[:-3]
    # Parse date into a pandas datetime
    time = pd.to_datetime('-'.join(fpname.split('_')[2].split('x')[:4]))
    # Get the last time (i.e. the last known location of all particles released)
    last_times = get_last_times(t=time, path=fpdir, npars=npars, lat=lat, lon=lon, agl=agl)
    # Get the background concentration (from CTE)
    # for each last time and location of the released particles
    # And average them
    bg = []
    for y, row in last_times.iterrows():
        bg.append(get_bg(time, row, bgdir=bgpath))
    
    return np.mean(bg)

def get_mfdict(obsdate, station):
    """Combine the functions to get a dictionary of background and enhancements
    for the current date and station
    Input:
        obsdate: pd.Datetime: date of the observation for which to calculate the sensitivities
        station: str: 3-letter code of the station
    Returns:
        tuple: tuple of (station (str), enhncements (dict)). The station is returned for easier post-processing"""
    
    print(f'Getting mole fractions for {obsdate} {station}')
    # First, get the footprint and footprint file. 
    # This allows us to do some selection
    fpfile = get_fpfile(station, obsdate)
    fp = get_fp(fpfile)
    print('Got footprint')
    if fp is not None:
        # If we have a footprint, we calculate the enhancements and background
        # And add it to a dict to return
        resdict = convolute(fp, fluxdict_all)
        print('Calculated mole fractions')
        bg = calculate_mean_bg(fpfile)
        print('Calculated background')
        resdict['background'] = bg
    else:
        # If there's no footprint, just return an empty dict.
        # Generally, no footprint means that we're skipping the observation anyway.
        resdict = {}
        print('No footprint!')
    return station, resdict
        

# Here, we set some things for the run. 
What stations to run, what dates?

In [30]:
times_loaded = pd.DatetimeIndex([])
obsdates = pd.date_range('2021-03-01', '2021-03-03', freq='H')

# Shorter time and stationlist for testing!
stationlist = stationlist[:10]

# Initialise fluxes. Overwritten (filled) later.
fluxes = np.empty((MAX_LEN_FP, NLATS, NLONS)) * np.nan
fluxdict_all = {v: fluxes.copy() for v in variables}

# Initialise the return format: a dict of dataframes
resdf = pd.DataFrame(columns=variables, index=obsdates)
resdict_all = {k: resdf for k in stationlist}

# This is the main loop. It calculates everything and takes long!
In some tests I did on the login node, I did about 2mins/hour (3 stations, MP-ing over 3 cores). The real speed will depend on the (amount of) selected stations and amount of processors asked. 

Never ask more processors than you have stations. 

Always ask as many processors in your python script as in your bash script.

In [31]:
for obsdate in obsdates:
    # First load in the fluxes. We DO NOT do this in parallel
    # As we use the same fluxes for all stations
    # And parellising this would only result in a lot of memory used
    fluxdict_all, times_loaded = load_fluxes(fluxdict_all, obsdate, times_loaded)
    
    # Parallelise over stations. These all use the same fluxes, but different footprints
    func = partial(get_mfdict, obsdate)
    with Pool(1) as pool:
        result = pool.map(func, stationlist)
    # This automatically waits for all stations for that day to be processed.
    print(f'Got results for {obsdate}')
    
    # Put the result in some nicer format.
    for station, resdict in result:
        for k, v in resdict.items():
            resdict_all[station].loc[obsdate, k] = v


Loading fluxes for 240 hours
Loaded fluxes for 2021-02-19 01:00:00
Loaded fluxes for 2021-02-19 02:00:00
Loaded fluxes for 2021-02-19 03:00:00
Loaded fluxes for 2021-02-19 04:00:00
Loaded fluxes for 2021-02-19 05:00:00
Loaded fluxes for 2021-02-19 06:00:00
Loaded fluxes for 2021-02-19 07:00:00
Loaded fluxes for 2021-02-19 08:00:00
Loaded fluxes for 2021-02-19 09:00:00
Loaded fluxes for 2021-02-19 10:00:00
Loaded fluxes for 2021-02-19 11:00:00
Loaded fluxes for 2021-02-19 12:00:00
Loaded fluxes for 2021-02-19 13:00:00
Loaded fluxes for 2021-02-19 14:00:00
Loaded fluxes for 2021-02-19 15:00:00
Loaded fluxes for 2021-02-19 16:00:00
Loaded fluxes for 2021-02-19 17:00:00
Loaded fluxes for 2021-02-19 18:00:00
Loaded fluxes for 2021-02-19 19:00:00
Loaded fluxes for 2021-02-19 20:00:00
Loaded fluxes for 2021-02-19 21:00:00
Loaded fluxes for 2021-02-19 22:00:00
Loaded fluxes for 2021-02-19 23:00:00
Loaded fluxes for 2021-02-20 00:00:00
Loaded fluxes for 2021-02-20 01:00:00
Loaded fluxes for 202

In [32]:
# Write the result to some csv files for processing
for k, d in resdict_all.items():
    d.to_csv(f'atmopsheric_molefractions_AI2_{k}.csv')