# RScript-Bridge

> Bridge between Stactics AICore framework and RScript prediction scripts

In [None]:
#| default_exp rscript

### Some things to set up first

Notebooks use nbdev thingses and `addroot` makes importing from
the repo-directory more convenient.

In [None]:
from nbdev.showdoc import *
import logging
#import addroot

In [None]:
#| export

import os, logging, json, hashlib
import fcntl, subprocess

from collections import namedtuple

from corebridge.aicorebridge import AICoreModule
from corebridge.core import init_console_logging

Loading corebridge.aicorebridge 0.2.27 from /home/fenke/repos/corebridge/corebridge/aicorebridge.py


AICore uses an `assets` dir from which we can read files, like scripts
and a `save` dir were modules can write and read files.

In [None]:
#| export

syslog = init_console_logging(__name__, logging.DEBUG, timestamp=False)

In [None]:

assets_dir = os.path.join(os.path.abspath(os.getcwd()), 'assets', 'rscript')
save_dir = os.path.join(os.path.abspath(os.getcwd()), 'saves', 'rscript')


In [None]:
#| exports

def get_asset_path(script_name, assets_dir:str): 
    return os.path.join(assets_dir, script_name)
def get_rscript_libpath(save_dir:str):
    return os.path.join(save_dir, 'libs')
def get_save_path(datafile_name:str, save_dir:str): 
    return os.path.join(save_dir, datafile_name)


In [None]:
#| hide
from tabulate import tabulate

In [None]:
#| hide
def markdown_table(feature_dict):
    return Markdown(tabulate(
      [[v for v in row.values()] for row in feature_dict],
      headers=[k for k in feature_dict[0].keys()],
        tablefmt='github'
    ))

def markdown_flow_table(flow_table):
    columns = set([C for S in flow_table for C in flow_table[S]])
    print(columns)
    return markdown_table()(
        [
            dict(file=fn, **{c:rd.get(c,' ') for c in columns}) 
            for fn, rd in flow_table.items()
        ]    
    )

In [None]:
#| hide

from IPython.display import display
from IPython.display import Markdown


In [None]:
#| hide
def display_table(feature_dict):
    display(markdown_table(feature_dict))
    
data_file_flow = {}

def display_flow_table(flow_table):
    columns = set([C for S in data_file_flow for C in data_file_flow[S]])
    print(columns)
    display(markdown_table(        [
            dict(file=fn, **{c:rd.get(c,' ') for c in columns}) 
            for fn, rd in flow_table.items()
        ]    
    ))


## Running R code

Scripts written in R can be run from a Python program using `subprocess` and `Rscript`.



### `Rscript`

A script can be run from the commandline with

    Rscript ascript.R



### `subproces`

[Python's `subprocess`module](https://docs.python.org/3.11/library/subprocess.html#) has the tools to execute external programs like `Rscript`


In [None]:

subprocess.run(['Rscript',get_asset_path('hello.R', assets_dir)], capture_output=True).stdout.decode('UTF-8')

'[1] "hello world"\n'

## Example: sapflow prediction scripts



### `Data_preparation.R`

#### Libraries

* lubridate
* stringer
* zoo

#### Input

* `Data/Meta_data.csv`
* `Data/Sapflux_Tilia_train.csv`
* `Data/Weather_Tilia_train.csv`
* `Data/Weather_Tilia_pred.csv`

#### Output

* `Modelling_data.RData`
* `Prediction_data.RData`


In [None]:
data_file_flow['Data_preparation.R'] = {
      "in": [
         "Data/Meta_data.csv",
         "Data/Sapflux_Tilia_train.csv",
         "Data/Weather_Tilia_train.csv",
         "Data/Weather_Tilia_pred.csv"
      ],
      "out": [
         "Modelling_data.RData",
         "Prediction_data.RData"
      ],
    'libs':['lubridate', 'stringr', 'zoo']
}


### `Prediction_part1.R`

#### Libraries

* lubridate
* stringr
* mgcv

#### Input

* `Modelling_data.RData`

#### Output

* `Fitted_models.RData`
* `Weights.RData`



In [None]:
data_file_flow['Prediction_part1.R'] = {
      "in": [
         "Modelling_data.RData"
      ],
      "out": [
         "Fitted_models.RData",
         "Weights.RData"
      ],
    'libs':['lubridate', 'stringr', 'mgcv']
}



### `Prediction_part2.R`

#### Libraries

* lubridate
* stringr
* mgcv

#### Input

* `Fitted_models.RData`
* `Weights.RData`
* `Modelling_data.RData`
* `Prediction_data.RData`

#### Output

* `Predicted_sapflux.RData`

In [None]:
data_file_flow['Prediction_part2.R'] = {
    "in":[
        'Fitted_models.RData',
        'Weights.RData',
        'Modelling_data.RData',
        'Prediction_data.RData'
    ],
    "out":[
        'Predicted_sapflux.RData'
    ],
    'libs':['lubridate', 'stringr', 'mgcv']
}


### `Prediction_part3.R`

#### Libraries

* lubridate
* stringr

#### Input

* `Predicted_sapflux.RData`

#### Output

* `Predicted_water_usage.RData`


In [None]:
data_file_flow['Prediction_part3.R'] = {
    'in':['Predicted_sapflux.RData'],
    'out':['Predicted_water_usage.RData'],
    'libs':['lubridate', 'stringr']
}

In [None]:
#| hide
from functools import reduce


In [None]:
#| hide
script_order = dict(zip(data_file_flow.keys(), range(len(data_file_flow.keys()))))

# add the name to the objects
data_file_flow = {
    script_order[k]:{**v, 'name':k}
    for k,v in data_file_flow.items()
}


data_files = reduce(
    lambda Y,X:Y if (X in Y) else [*Y,X],
    [
        f
        for S,P in data_file_flow.items() # patterns
        for D,F in P.items()
        for f in F
        if D in ['in','out']
        
    ],
    []
)
display(Markdown(tabulate(
    [
        [F.split('/')[-1]]+[
            'in' if F in P['in'] else 'out' if F in P['out'] else '--' 
            for S,P in data_file_flow.items()
        ] 
        for F in data_files
    ],
    headers=['data-file / script'] + [I['name'] for I in data_file_flow.values()],
    tablefmt='github'
)))

| data-file / script          | Data_preparation.R   | Prediction_part1.R   | Prediction_part2.R   | Prediction_part3.R   |
|-----------------------------|----------------------|----------------------|----------------------|----------------------|
| Meta_data.csv               | in                   | --                   | --                   | --                   |
| Sapflux_Tilia_train.csv     | in                   | --                   | --                   | --                   |
| Weather_Tilia_train.csv     | in                   | --                   | --                   | --                   |
| Weather_Tilia_pred.csv      | in                   | --                   | --                   | --                   |
| Modelling_data.RData        | out                  | in                   | in                   | --                   |
| Prediction_data.RData       | out                  | --                   | in                   | --                   |
| Fitted_models.RData         | --                   | out                  | in                   | --                   |
| Weights.RData               | --                   | out                  | in                   | --                   |
| Predicted_sapflux.RData     | --                   | --                   | out                  | in                   |
| Predicted_water_usage.RData | --                   | --                   | --                   | out                  |

In [None]:
data_files

['Data/Meta_data.csv',
 'Data/Sapflux_Tilia_train.csv',
 'Data/Weather_Tilia_train.csv',
 'Data/Weather_Tilia_pred.csv',
 'Modelling_data.RData',
 'Prediction_data.RData',
 'Fitted_models.RData',
 'Weights.RData',
 'Predicted_sapflux.RData',
 'Predicted_water_usage.RData']

In [None]:
data_file_flow

{0: {'in': ['Data/Meta_data.csv',
   'Data/Sapflux_Tilia_train.csv',
   'Data/Weather_Tilia_train.csv',
   'Data/Weather_Tilia_pred.csv'],
  'out': ['Modelling_data.RData', 'Prediction_data.RData'],
  'libs': ['lubridate', 'stringr', 'zoo'],
  'name': 'Data_preparation.R'},
 1: {'in': ['Modelling_data.RData'],
  'out': ['Fitted_models.RData', 'Weights.RData'],
  'libs': ['lubridate', 'stringr', 'mgcv'],
  'name': 'Prediction_part1.R'},
 2: {'in': ['Fitted_models.RData',
   'Weights.RData',
   'Modelling_data.RData',
   'Prediction_data.RData'],
  'out': ['Predicted_sapflux.RData'],
  'libs': ['lubridate', 'stringr', 'mgcv'],
  'name': 'Prediction_part2.R'},
 3: {'in': ['Predicted_sapflux.RData'],
  'out': ['Predicted_water_usage.RData'],
  'libs': ['lubridate', 'stringr'],
  'name': 'Prediction_part3.R'}}

## Import R libraries

Importing libraries can be done with

    Rscript -e 'install.packages("drat", repos="https://cloud.r-project.org")'

In [None]:
print(subprocess.run(['Rscript','--version', ], capture_output=True).stdout.decode('UTF-8'))


Rscript (R) version 4.2.2 (2022-10-31)



In [None]:
rversion = subprocess.run(['Rscript','--version', ], capture_output=True)
print(rversion.stdout.decode('UTF-8'))

Rscript (R) version 4.2.2 (2022-10-31)



### User library folder

In [None]:
#| exports
def get_rscript_env(libfolder:str):
    if os.environ.get('R_LIBS_USER'):
        return dict(**os.environ)
    else:
        return dict(**os.environ, R_LIBS_USER=str(libfolder))

In [None]:
assert get_rscript_env(get_rscript_libpath(save_dir)).get('R_LIBS_USER') == get_rscript_libpath(save_dir), 'rscript environment not set as expected'

### Used libraries

In [None]:
list(set([L for V in data_file_flow.values() for L in V['libs']]))

['stringr', 'zoo', 'lubridate', 'mgcv']

In [None]:
run_script_result = subprocess.run(['Rscript','-e', "library(lubridate)"], capture_output=True)
print(run_script_result.stderr.decode('UTF-8'), run_script_result.returncode)

Error in library(lubridate) : there is no package called ‘lubridate’
Execution halted
 1


In [None]:
[os.path.exists(os.path.join(get_rscript_libpath(save_dir), L)) for L in list(set([L for V in data_file_flow.values() for L in V['libs']]))]

[True, True, True, False]

In [None]:
[os.path.join(get_rscript_libpath(save_dir), L) for L in list(set([L for V in data_file_flow.values() for L in V['libs']]))]

['/home/fenke/repos/corebridge/nbs/saves/rscript/libs/stringr',
 '/home/fenke/repos/corebridge/nbs/saves/rscript/libs/zoo',
 '/home/fenke/repos/corebridge/nbs/saves/rscript/libs/lubridate',
 '/home/fenke/repos/corebridge/nbs/saves/rscript/libs/mgcv']

In [None]:
#| exports
def check_rscript_libs(libs:list, libfolder:str):
    """Quick check if for all the R packages in libs a folder exists in libfolder"""
    return all([os.path.exists(os.path.join(libfolder, L)) for L in libs])

def check_rscript_lib(lib:str, libfolder:str) -> bool:
    """Checks if a R package is installed in libfolder

    Parameters
    ----------
    lib : str
        name of the package
    libfolder : str
        path to the library folder

    Returns
    -------
    bool
        True if the package is installed, False otherwise
    """

    run_script_result = subprocess.run(['Rscript','-e', f"library({lib})"], env=get_rscript_env(libfolder), capture_output=True)
    if run_script_result.returncode != 0:
        print('STDERR\n', run_script_result.stderr.decode('UTF-8'))
        print('STDOUT\n', run_script_result.stdout.decode('UTF-8'))
    return run_script_result.returncode == 0

In [None]:
check_rscript_libs(list(set([L for V in data_file_flow.values() for L in V['libs']])), get_rscript_libpath(save_dir))

False

In [None]:
check_rscript_lib('mgcv', get_rscript_libpath(save_dir))

True

In [None]:
check_rscript_lib('zoo', get_rscript_libpath(save_dir))

True

## Installing libraries

In [None]:
#| exports

def install_R_package_wait(pkg:str|list, workdir:str, repo='https://cloud.r-project.org'):
    """
    Checks and if neccesary installs an R package

    Parameters
    ----------
    pkg : str|list
        name(s) of the package(s)
    """

    if isinstance(pkg, str):
        return install_R_package_wait([pkg], libfolder, repo)
    
    libfolder=os.path.join(workdir, 'libs')
    os.makedirs(libfolder, exist_ok=True)
    syslog.debug(f"Using libfolder {libfolder} for packages")
    
    env = dict(os.environ)
    env['R_LIBS_USER'] = os.path.abspath(libfolder) 
    syslog.debug(F"Using libfolder {env['R_LIBS_USER']} for R_LIBS_USER")

    
    for pkg_i in pkg: # ['generics', 'timechange', 'rlang', 'stringi'] + 
        print(f"\nInstalling package {pkg_i}, testing attach ...")
        if not check_rscript_lib(pkg_i, libfolder):
            print(f"Package {pkg_i} not attached. Installing {pkg_i}")
            run_script_install = subprocess.run([
                    'Rscript','-e', 
                    f"install.packages('{pkg_i}', repos='{repo}', lib='{libfolder}', dependencies=TRUE)"
                ], capture_output=True, env=env)
            
            if run_script_install.returncode != 0:
                print(f"installing {pkg_i}, returned code {run_script_install.returncode} ... ")
                print('STDOUT--------------\n', run_script_install.stdout.decode('UTF-8'))
                print('STDERR--------------\n', run_script_install.stderr.decode('UTF-8'))

            elif not check_rscript_lib(pkg_i, libfolder): # not in cache
                print(f"Attach after installing for {pkg_i} failed ... install logs below")
                print('STDOUT--------------\n', run_script_install.stdout.decode('UTF-8'))
                print('STDERR--------------\n', run_script_install.stderr.decode('UTF-8'))
            else:
                print(f"Attach after installation was successful. Library {pkg_i} appears to have been installed")

        else:
            print(f"Attach successful. Library {pkg_i} appears to have been installed")
            



In [None]:
install_R_package_wait(['generics', 'timechange', 'rlang'], save_dir)

DEBUG	28210	root	1929856308.py	18	Using libfolder /home/fenke/repos/corebridge/nbs/saves/rscript/libs for packages
DEBUG	28210	root	1929856308.py	22	Using libfolder /home/fenke/repos/corebridge/nbs/saves/rscript/libs for R_LIBS_USER



Installing package generics, testing attach ...
Attach successful. Library generics appears to have been installed

Installing package timechange, testing attach ...
Attach successful. Library timechange appears to have been installed

Installing package rlang, testing attach ...
Attach successful. Library rlang appears to have been installed


In [None]:
# install_R_package_wait(
#     ['zoo'],
#     libfolder=get_rscript_libpath(save_dir))

In [None]:
# install_R_package_wait(
#     sorted(list(set([L for V in data_file_flow.values() for L in V['libs']]))),
#     libfolder=get_rscript_libpath(save_dir))

## Running the scripts

### Installing scripts

In [None]:
#| exports

def unpack_assets(assets_dir:str, save_dir:str):
    """
    Unpack the assets folder to the save_dir
    """
    unpack_result = subprocess.Popen(
        ['unzip', '-o', '-d', save_dir, os.path.join(assets_dir, '*.zip')],
        stdout=subprocess.PIPE,
        stderr=subprocess.PIPE
    )
    return unpack_result

In [None]:
unpack_result = unpack_assets(assets_dir, save_dir)

In [None]:
unpack_result.args

['unzip',
 '-o',
 '-d',
 '/home/fenke/repos/corebridge/nbs/saves/rscript',
 '/home/fenke/repos/corebridge/nbs/assets/rscript/*.zip']

In [None]:
unpack_result.poll()

In [None]:
print(unpack_result.stdout.read().decode('UTF-8'))

Archive:  /home/fenke/repos/corebridge/nbs/assets/rscript/SapflowPredictionAssets.zip
  inflating: /home/fenke/repos/corebridge/nbs/saves/rscript/Data/Weather_Tilia_train.RData  
  inflating: /home/fenke/repos/corebridge/nbs/saves/rscript/Data/Weather_Tilia_train.csv  
 extracting: /home/fenke/repos/corebridge/nbs/saves/rscript/Data/Weather_Tilia_pred.RData  
  inflating: /home/fenke/repos/corebridge/nbs/saves/rscript/Data/Weather_Tilia_pred.csv  
  inflating: /home/fenke/repos/corebridge/nbs/saves/rscript/Data/Sapflux_Tilia_train.RData  
  inflating: /home/fenke/repos/corebridge/nbs/saves/rscript/Data/Sapflux_Tilia_train.csv  
 extracting: /home/fenke/repos/corebridge/nbs/saves/rscript/Data/Meta_data.RData  
  inflating: /home/fenke/repos/corebridge/nbs/saves/rscript/Data/Meta_data.csv  
  inflating: /home/fenke/repos/corebridge/nbs/saves/rscript/Data/Historical_data.RData  
  inflating: /home/fenke/repos/corebridge/nbs/saves/rscript/Prediction_part3.R  
  inflating: /home/fenke/repos

### Checksum calculation

Each script has it's own set of input files and should be run to
update it's output when either it's inputs have changed or it's 
expected output does not exist.

We can check for filechanges using a hashing algorithm, for 
instance MD5 or SHA-256. These are available either in Python
or from the commandline.

Lets look at the commandline version of MD5, on linux this is
`md5sum`, with the input file for the preparation stage:

In [None]:
print(json.dumps(data_file_flow[list(data_file_flow.keys())[0]]['in'], indent=3))

[
   "Data/Meta_data.csv",
   "Data/Sapflux_Tilia_train.csv",
   "Data/Weather_Tilia_train.csv",
   "Data/Weather_Tilia_pred.csv"
]


md5sum will output hashes to stdout, which `subprocess.run` captures for us

In [None]:
flow_object_index = 0
input_files = data_file_flow[flow_object_index]['in']

print(json.dumps(input_files, indent=3))

[
   "Data/Meta_data.csv",
   "Data/Sapflux_Tilia_train.csv",
   "Data/Weather_Tilia_train.csv",
   "Data/Weather_Tilia_pred.csv"
]


In [None]:

md5_encode_result = subprocess.run(
    ['md5sum','-b']+
    input_files, 
    cwd=save_dir,
    capture_output=True)
print(md5_encode_result.stdout.decode('UTF-8'))

4bed61a77505bfd52032591d5c3a6050 *Data/Meta_data.csv
6d705d98caa6618a4a990c3742c16564 *Data/Sapflux_Tilia_train.csv
1232592f9488ce4fbb4ae11ba5be0349 *Data/Weather_Tilia_train.csv
366dac1bf64003d1d08fca6121c036bd *Data/Weather_Tilia_pred.csv



If we want to check the files we run it with the `-c` option and a file with the previously calculated checksums

In [None]:
script_name = data_file_flow[flow_object_index]['name']

checksum_file = get_save_path(f"input-checksum-{script_name.split('.')[0]}", save_dir)
with open(checksum_file, 'wt') as cf:
    cf.write(md5_encode_result.stdout.decode('UTF-8'))

In [None]:
md5_check_result = subprocess.run(
    ['md5sum', '-c', checksum_file], 
    cwd=save_dir,
    capture_output=True)
print(md5_check_result.stdout.decode('UTF-8'))
print(f"Run returned code {md5_check_result.returncode}")
if md5_check_result.returncode:
    print(md5_check_result.stderr.decode('UTF-8'))

Data/Meta_data.csv: OK
Data/Sapflux_Tilia_train.csv: OK
Data/Weather_Tilia_train.csv: OK
Data/Weather_Tilia_pred.csv: OK

Run returned code 0


Had there been a change to a file it would have looked like

In [None]:
md5_check_result = subprocess.run(
    ['md5sum', '-c', checksum_file+'-modified'], 
    cwd=save_dir,
    capture_output=True)
print(md5_check_result.stdout.decode('UTF-8'))
print(f"Run returned code {md5_check_result.returncode}")


Run returned code 1


We don't really need specifics, only the return code will
do for our purpose.

### Checking files


#### Generating names

In [None]:
#| exports

read_chunk_size = 1024 * 32
def calc_hash_from_flowobject(flow_object:dict)->str:
    '''Calculate a unique hash for a given flow object'''
    return hashlib.md5(repr(flow_object).encode('UTF-8')).hexdigest()

def calc_hash_from_files(files:list, save_dir:str)->str:
    '''Calculate hash from the contents of the input files'''
    hashobj = hashlib.md5()

    # iterate over files 
    for data_file in files:
        full_name = os.path.join(save_dir, data_file)
        if not os.path.isfile(full_name):
            continue
        
        with open(full_name, 'rb') as f:
            # loop till the end of the file
            while True:
                # read only 1024 bytes at a time
                chunk = f.read(read_chunk_size)
                if not chunk:
                    break
                
                hashobj.update(chunk)
        
    return hashobj.hexdigest()

def calc_hash_from_input_files(flow_object:dict, save_dir:str)->str:
    '''Calculate hash from the contents of the input files for a given flow object'''
    return calc_hash_from_files(flow_object['in'], save_dir)

def calc_hash_from_data_files(flow_object:dict, save_dir:str)->str:
    '''Calculate hash from the contents of the input files for a given flow object'''
    return calc_hash_from_files(flow_object['in'] + flow_object['out'], save_dir)


In [None]:
calc_hash_from_flowobject(data_file_flow[flow_object_index])

'da4b2413f6a22c19a8a7823e6564e746'

In [None]:
calc_hash_from_input_files(data_file_flow[flow_object_index], save_dir)

'32095cd16a83a2c63f1ab51a58ed96c9'

In [None]:
calc_hash_from_data_files(data_file_flow[flow_object_index], save_dir)

'65e6d36ac0c8aadb1fb4ee48d4ff88f3'

#### Inputs

In [None]:
#| exports
def check_script_inputs(flow_object:dict, save_dir:str)->bool:
    """ 
    Check if the input files for a script are up-to-date, returns True if up-to-date.
    """

    checksum_file = get_save_path(f"input-checksum-{calc_hash_from_flowobject(flow_object)}", save_dir)
    checksum_file = get_save_path(f"input-checksum-{calc_hash_from_flowobject(flow_object)}", save_dir)
    md5_check_result = subprocess.run(
        ['md5sum', '-c', checksum_file], 
        cwd=save_dir,
        capture_output=True)
    syslog.debug(f"Checksum check result for Flow object: {flow_object['name']}: {md5_check_result.returncode}, checksum file: {checksum_file}")
    
    return int(md5_check_result.returncode) == 0

In [None]:
check_script_inputs(data_file_flow[1], save_dir)

False

#### Outputs
The output is easily checked for existence with `isfile`.

In [None]:
#| exports
def check_script_output(flow_object:dict, save_dir:str)->bool:
    """ 
    Check if the output files for a script exist, returns True if they all exist.
    """

    return all([
        os.path.isfile(get_save_path(F, save_dir)) 
        for F in flow_object['out']
    ])

In [None]:
check_script_output(data_file_flow[1], save_dir)

True

#### Generating the checksum file

In [None]:
#| exports

def generate_checksum_file(flow_object:dict, save_dir:str)->bool:
    """Generates the checksum file for a given flow object"""

    input_files = flow_object['in']
    md5_encode_result = subprocess.run(
        ['md5sum','-b']+
        input_files, 
        cwd=save_dir,
        capture_output=True)
    
    checksum_file = get_save_path(f"input-checksum-{calc_hash_from_flowobject(flow_object)}", save_dir)
    syslog.debug(f"Checksum file for Flow object: {flow_object['name']} created return {md5_encode_result.returncode}, checksum file: {checksum_file}")
    with open(checksum_file, 'wt') as cf:
        cf.write(md5_encode_result.stdout.decode('UTF-8'))

    return md5_encode_result.returncode == 0 and check_script_inputs(flow_object, save_dir)

In [None]:
generate_checksum_file(data_file_flow[0], save_dir)

DEBUG	28210	root	2564925326.py	14	Flow object: Data_preparation.R, checksum file: /home/fenke/repos/corebridge/nbs/saves/rscript/input-checksum-da4b2413f6a22c19a8a7823e6564e746


True

#### Running once

We don't need and don't _want't_ to run the script if more then once. This 
is not a problem when a script has finished and updated the checksum file, 
but we also want to prevent near simultaneous runs in a multiprocessing 
environment.

We'll use file locking from fcntl directly


In [None]:

# Use fcntl file locking to prevent multiple processes from running the same code at the same time.
# see https://docs.python.org/3/library/fcntl.html#fcntl.flock

# Create a filename based on input-file contents
lock_file = get_save_path(f"lock-{calc_hash_from_input_files(data_file_flow[0], save_dir)}", save_dir)


Now we use `fcntl.flock` with flags `fcntl.LOCK_EX | fcntl.LOCK_NB` to lock the file for exclusive access, while
an exception is thrown if it's already locked.


In [None]:
with open(lock_file, 'wt') as cf:
    fcntl.flock(cf, fcntl.LOCK_EX | fcntl.LOCK_NB)
    with open(lock_file, 'wt') as cf2:
        try:
            fcntl.flock(cf2, fcntl.LOCK_EX | fcntl.LOCK_NB)
        except BlockingIOError as locked_error:
            print(locked_error)

[Errno 11] Resource temporarily unavailable


The locks are removed when the file is closed, how convenient

In [None]:
with open(lock_file, 'wt') as cf:
    fcntl.flock(cf, fcntl.LOCK_EX | fcntl.LOCK_NB)
with open(lock_file, 'wt') as cf:
    fcntl.flock(cf, fcntl.LOCK_EX | fcntl.LOCK_NB)


## Putting it together

### Synchroneous

We need to run a script when either any of it's inputs have changed or any 
of it's outputs do not exist. Return True if a follow-up script should be 
executed, False if nothing changed or executing the script failed.


In [None]:
#| exports

def run_rscript_wait(flow_object, assets_dir:str, save_dir:str):
    """ Run a script in R 
        args:
            flow_object: dict of flow object
        returns:
            bool: True if a follow-up script might need to be run, False if not

    """
    syslog.debug(f"Running script {flow_object['name']}")
    # Check if output exists and inputs have not changed and return False if 
    # output exists and inputs have not changed
    if check_script_output(flow_object, save_dir) and check_script_inputs(flow_object, save_dir):
        return True
    
    # Create the lock file
    lock_file = get_save_path(f"lock-{calc_hash_from_flowobject(flow_object)}", save_dir)
    with open(lock_file, 'wt') as cf:
        try:
            syslog.debug(f"Locking {lock_file}")
            # Get exclusive lock on the file, is released on file close
            fcntl.flock(cf, fcntl.LOCK_EX | fcntl.LOCK_NB)

            # run the script
            run_script_result = subprocess.run(
                ['Rscript', '--vanilla', get_asset_path(flow_object['name'], assets_dir)],
                cwd=save_dir,
                capture_output=True
            )
            
            # check the return code
            if run_script_result.returncode:
                cf.write(f"Run returned code {run_script_result.returncode}\n")
                cf.write(f"STDOUT------------\n{run_script_result.stdout.decode('UTF-8')}\n")
                cf.write(f"STDERR------------\n{run_script_result.stderr.decode('UTF-8')}\n")
                return False

        except BlockingIOError as locked_error:
            syslog.debug(locked_error)
            return False

    
    # check the output and generate the checksum file
    return check_script_output(flow_object, save_dir) and generate_checksum_file(flow_object, save_dir)
    

In [None]:
#run_rscript_wait(data_file_flow[0], assets_dir, save_dir) 

In [None]:
#run_rscript_wait(data_file_flow[1], assets_dir, save_dir)

In [None]:
#run_rscript_wait(data_file_flow[2], assets_dir, save_dir)

In [None]:
def clear_results(flow_object :dict, save_dir:str):
    """Clear the results of a given flow object"""
    for fname in flow_object['out']:
        try:
            os.remove(get_save_path(fname, save_dir))
        except FileNotFoundError:
            pass

In [None]:
[clear_results(O, save_dir) for O in data_file_flow.values()]

[None, None, None, None]

### Asynchronous

In the API we can not wait for a script to finish and we'll use Popen instead. This
means we'll have to keep track of the process.

In [None]:
#| exports

RScriptProcess = namedtuple('RScriptProcess', ['flow_object', 'lock_file', 'stdout','stderr', 'popen_args', 'popen'])

#### Asynchronous RScript processing ------------------------------------------------

def run_rscript_nowait(flow_object, workdir:str, pkg_repo:str='https://cloud.r-project.org') -> RScriptProcess:
    """ Run a script in R 
        args:
            flow_object: dict of flow object
            workdir: working directory
            pkg_repo: CRAN package repository
        returns:
            RScriptProcess: Popen container object for the script
    """
    
    syslog.debug(f"Starting rscript for {flow_object['name']}")

    # lockfile -------------------------------------------------------------------
    os.makedirs(os.path.join(workdir, 'temp'), exist_ok=True)
    def get_temp_path(lname):
        return os.path.join(workdir, 'temp', lname)
    
    lock_name = 'run_flow_object-'+calc_hash_from_flowobject(flow_object)

    # lock maintenance
    if run_rscript_nowait.lock_objects.get(lock_name): 
        lock_object = run_rscript_nowait.lock_objects[lock_name]
        if not lock_object.lock_file.closed:
            syslog.debug(f"Lockfile is open for {flow_object['name']} ({lock_name})")
            # If the lockfile is open, check if the process is still running
            
            if lock_object.popen is None:
                syslog.debug(f"No process running for {flow_object['name']} ({lock_name})")
            elif lock_object.popen.poll() is None:
                syslog.debug(f"Script is still running for {flow_object['name']} ({lock_name})")
                return lock_object
            else:
                syslog.debug(f"Script has finished for {flow_object['name']} ({lock_name}), returned {lock_object.popen.returncode}")
                # since poll return not-None the script has finished so close the lockfile
                lock_object.lock_file.close()
                lock_object.stdout.close()
                lock_object.stderr.close()
                if lock_object.popen.returncode != 0:
                    syslog.error(f"Script failed for {flow_object['name']} ({lock_name}), returned {lock_object.popen.returncode}")
                    syslog.error(f"Args were: {lock_object.popen_args}")
                    with open(lock_object.stdout.name, 'rb') as so:
                        syslog.error(f"STDOUT\n{so.read().decode('UTF-8')}")
                    with open(lock_object.stderr.name, 'rb') as se:
                        syslog.error(f"STDERR\n{se.read().decode('UTF-8')}")
                else:
                    syslog.debug(f"Script was successful for {flow_object['name']} ({lock_name})")
                    generate_checksum_file(flow_object, workdir)

                #os.remove(lock_object.stdout.name)
                #os.remove(lock_object.stderr.name)


    # Check if output exists and inputs have not changed and return False if 
    # output exists and inputs have not changed
    if check_script_output(flow_object, workdir) and check_script_inputs(flow_object, workdir):
        syslog.debug(f"Output and inputs are up-to-date for {flow_object['name']}")
        return run_rscript_nowait.lock_objects.get(lock_name)

    # Create the lock file -----------------------------------------------------------
    syslog.debug(f"Preparing to run scripts for {flow_object['name']}, creating lockfile ({lock_name})")
    cf = open(get_temp_path(f"lock-{lock_name}"), 'wt')
    
    try:
        # Set lock on lockfile
        fcntl.flock(cf, fcntl.LOCK_EX | fcntl.LOCK_NB)

        so = open(get_temp_path(f"stdout-{lock_name}"), 'wt')
        se = open(get_temp_path(f"stderr-{lock_name}"), 'wt')

        # check libs
        libfolder=os.path.join(workdir, 'libs')
        os.makedirs(libfolder, exist_ok=True)
        syslog.debug(f"Using libfolder {libfolder} for packages")
        
        env = dict(os.environ)
        env['R_LIBS_USER'] = os.path.abspath(libfolder) 
        syslog.debug(F"Using libfolder {env['R_LIBS_USER']} for R_LIBS_USER")
        
        if not check_rscript_libs(flow_object['libs'], libfolder):
            for pkg_i in flow_object['libs']:
                syslog.debug(f"Checking lib {pkg_i} for {flow_object['name']} ({lock_name})")
                if not check_rscript_lib(pkg_i, libfolder):
                    syslog.debug(f"Starting installation of {pkg_i} for {flow_object['name']} ({lock_name})")
                    popen_args = [
                            'Rscript','-e', 
                            f"install.packages('{pkg_i}', repos='{pkg_repo}', lib='{libfolder}', dependencies=TRUE)",
                        ]
                    run_script_install = subprocess.Popen(
                        popen_args, 
                        cwd=workdir,
                        stdout=so,
                        stderr=se,
                        encoding='UTF-8',
                        env=env,
                    )
                    run_rscript_nowait.lock_objects[lock_name] =  RScriptProcess(flow_object, cf, so, se, popen_args, run_script_install)
                    return run_rscript_nowait.lock_objects.get(lock_name)
                    
        
        syslog.debug(f"Libs are up-to-date, starting script for {flow_object['name']} ({lock_name})")
        # run the script
        popen_args = ['Rscript', flow_object['name']]
        popen_run = subprocess.Popen(
            popen_args,
            cwd=workdir,
            stdout=so,
            stderr=se,
            encoding='UTF-8',
            env=env,
        )

        run_rscript_nowait.lock_objects[lock_name] =  RScriptProcess(flow_object, cf, so, se, popen_args, popen_run)
            
    except BlockingIOError as locked_error:
        cf.close()
        #syslog.error(f"{flow_object['name']} is locked, cannot run", exc_info=locked_error)

    syslog.debug(f"Done with {flow_object['name']}.")

    return run_rscript_nowait.lock_objects.get(lock_name)

run_rscript_nowait.lock_objects = {}

In [None]:
for flow_object in data_file_flow.values():
    syslog.info(f"{flow_object['name']} --------------------")
    startresult = run_rscript_nowait(flow_object, workdir=save_dir)
    
    #print(f"Args: {startresult.popen_args if startresult else None}")

INFO	28210	root	2844065539.py	2	Data_preparation.R --------------------
DEBUG	28210	root	503119086.py	17	Starting rscript for Data_preparation.R
DEBUG	28210	root	2579484379.py	13	Checksum check result for Flow object: Data_preparation.R: 0, checksum file: /home/fenke/repos/corebridge/nbs/saves/rscript/input-checksum-da4b2413f6a22c19a8a7823e6564e746
DEBUG	28210	root	503119086.py	62	Output and inputs are up-to-date for Data_preparation.R
INFO	28210	root	2844065539.py	2	Prediction_part1.R --------------------
DEBUG	28210	root	503119086.py	17	Starting rscript for Prediction_part1.R
DEBUG	28210	root	2579484379.py	13	Checksum check result for Flow object: Prediction_part1.R: 0, checksum file: /home/fenke/repos/corebridge/nbs/saves/rscript/input-checksum-6ea0e8b3895468772af891ee7b90a11e
DEBUG	28210	root	503119086.py	62	Output and inputs are up-to-date for Prediction_part1.R
INFO	28210	root	2844065539.py	2	Prediction_part2.R --------------------
DEBUG	28210	root	503119086.py	17	Starting rscrip

In [None]:
for name, process in run_rscript_nowait.lock_objects.items():
    if process.popen:
        print(f"process {name} for {process.flow_object['name']} is done? {process.popen.poll()}")
        print(f"args: {process.popen_args}")

process run_flow_object-6ea0e8b3895468772af891ee7b90a11e for Prediction_part1.R is done? 0
args: ['Rscript', 'Prediction_part1.R']
process run_flow_object-0ae0a2f9f9ce7fe6231402e57d595772 for Prediction_part3.R is done? 0
args: ['Rscript', 'Prediction_part3.R']
process run_flow_object-70dfaa2a3c99ed6ac2c53df7e144b4d8 for Prediction_part2.R is done? 0
args: ['Rscript', 'Prediction_part2.R']


In [None]:
for flow_object in data_file_flow.values():
    syslog.info(f"Checking {flow_object['name']}")
    if check_script_output(flow_object, save_dir) and check_script_inputs(flow_object, save_dir):
        syslog.info(f"Output and inputs are up-to-date for {flow_object['name']}")
    else:
        print([
            (F,os.path.isfile(get_save_path(F, save_dir)) )
            for F in flow_object['out']
        ])
        checksum_file = get_save_path(f"input-checksum-{calc_hash_from_flowobject(flow_object)}", save_dir)
        md5_check_result = subprocess.run(
            ['md5sum', '-c', checksum_file], 
            cwd=save_dir,
            capture_output=True)
        
        print(md5_check_result.returncode, md5_check_result.stderr.decode('UTF-8')) 

INFO	28210	root	1138821752.py	2	Checking Data_preparation.R
INFO	28210	root	1138821752.py	4	Output and inputs are up-to-date for Data_preparation.R
INFO	28210	root	1138821752.py	2	Checking Prediction_part1.R
INFO	28210	root	1138821752.py	2	Checking Prediction_part2.R
INFO	28210	root	1138821752.py	2	Checking Prediction_part3.R


[('Fitted_models.RData', True), ('Weights.RData', True)]
1 md5sum: /home/fenke/repos/corebridge/nbs/saves/rscript/input-checksum-6ea0e8b3895468772af891ee7b90a11e: No such file or directory

[('Predicted_sapflux.RData', True)]
1 md5sum: /home/fenke/repos/corebridge/nbs/saves/rscript/input-checksum-70dfaa2a3c99ed6ac2c53df7e144b4d8: No such file or directory

[('Predicted_water_usage.RData', True)]
1 md5sum: /home/fenke/repos/corebridge/nbs/saves/rscript/input-checksum-0ae0a2f9f9ce7fe6231402e57d595772: No such file or directory



In [None]:
for process in run_rscript_nowait.lock_objects.values():
    if process.popen and process.popen.poll() is not None:
        print(f"Closing lockfile {process.lock_file.name}")
        process.lock_file.close()

Closing lockfile /home/fenke/repos/corebridge/nbs/saves/rscript/temp/lock-run_flow_object-da4b2413f6a22c19a8a7823e6564e746


### AICore module class

In [None]:
class AICoreRModule(AICoreModule):
    pass

### References

In [None]:
#| hide
import nbdev; nbdev.nbdev_export()