# Development of Archive-Upload Script

This notebook is used for initial development of script features
and for various experiments.  The final script can be found in
`archive-upload.py`

In [2]:
import time
from pathlib import Path, PurePosixPath
import yaml
import pickle
import bz2
from urllib.parse import urlparse
from pprint import pprint
import logging, logging.handlers
import boto3

## Notes on Design

* When a finished file is found, compressed, and archived, immediately try
  to upload.  If the upload fails, add the file to the "database" of files
  that need to be uploaded.  That database of pending uploads should hold
  the local file path and the destination S3 bucket.  It could just be a list
  of two-tuples that is pickled to disk in the ~/.archive-upload directory.
* For the newly finished file, may just want to append it to the "upload pending"
  list, and then after that start trying to work through the list, uploading
  the oldest file first.
* The AWS sync command may not be the best because if someone cleans out the S3
  bucket, the script will try to re-upload all of the local files in the archive
  directory.  The approach above solves that problem by only uploading a file once
  to S3.  It also has the advantage of only requiring Python boto3 methods (not
  needing a AWS command line utility)
* Archiving and Uploading application log files may be tricky.  Rotating file handler will
  keep producing files of the same names: archive-upload.log, archive-upload.log.1, etc.  One idea
  might be to take the last X lines of log file at the end of the day (or all lines) and copy
  those lines into a day-specific log file: 2019-07-23_archive-upload.log. This could
  be done with a separate Cron job.  Then this archive-upload utility
  could archive and upload those files.

In [26]:
# Read in the configuration file that controls execution of the script.
cfg_fn = 'archive-config-example.yaml'
config = yaml.safe_load(open(cfg_fn, 'r'))
pprint(config)

{'directories': [{'archive-dir': '/home/tabb99/arch-test/archive/data',
                  'bucket-and-key': 'dataacq.analysisnorth.com/powerhouse/kwethluk/data',
                  'delete-after': 365,
                  'directory': '/home/tabb99/arch-test/data',
                  'file-patterns': [{'finished-secs': 60, 'pattern': '*.csv'},
                                    {'pattern': '*.txt'}]},
                 {'archive-dir': '/home/tabb99/arch-test/archive/daily-logs',
                  'bucket-and-key': 'dataacq.analysisnorth.com/powerhouse/kwethluk/daily-logs',
                  'delete-after': 30,
                  'directory': '/home/tabb99/arch-test/daily-logs',
                  'file-patterns': [{'finished-secs': 10,
                                     'pattern': '*.log'}]}],
 'log-file-dir': '/home/tabb99/arch-test/log',
 'log-level': 'INFO'}


## Sample Files to Upload

In [27]:
%%bash
cd /home/tabb99/arch-test
touch data/2019-07-06_data.csv
touch data/2019-07-07_data.csv
touch daily-logs/2019-07-07_errors.log
touch daily-logs/2019-07-06_errors.log
touch data/junk
touch daily-logs/junk

## Initialization

In [4]:
# set the log level. Because we are setting this on the logger, it will apply
# to all handlers (unless maybe you set a specific level on a handler?).
logging.root.setLevel(getattr(logging, config['log-level']))

# create a rotating file handler
# Create Log file directory if it does not exist
p_log_dir = Path(config['log-file-dir'])
p_log_dir.mkdir(parents=True, exist_ok=True)
p_log = p_log_dir / 'archive-upload.log'

fh = logging.handlers.RotatingFileHandler(p_log, maxBytes=200000, backupCount=5)

# create formatter and add it to the handler
formatter = logging.Formatter('%(asctime)s - %(levelname)s - %(module)s - %(message)s')
fh.setFormatter(formatter)

# create a handler that will print to console as well.
console_h = logging.StreamHandler()
console_h.setFormatter(formatter)

# add the handlers to the root logger
logging.root.addHandler(fh)
logging.root.addHandler(console_h)

logging.info('Script start.')

# Create Application working directory if it does not exist.
p_app = Path('~').expanduser() / '.archive-upload'
p_app.mkdir(exist_ok=True)

# Path to pickle file holding the list of files that need to be uploaded
# but haven't been yet.
p_up_pending = p_app / 'upload_pending.pkl'

# Read a list of pending uploads, if file is present, otherwise,
# set to empty list.
if p_up_pending.exists():
    with p_up_pending.open('rb') as fin:
        upload_pending = pickle.load(fin)
else:
    upload_pending = []
upload_pending

2019-07-07 15:39:31,082 - INFO - <ipython-input-4-1769626a71b6> - Script start.


[]

In [21]:
upload_pending = []

In [57]:
# Loop through the list of directories, looking for completed files.
for dr in config['directories']:
    # Path to directory holding data files
    p_dr = Path(dr['directory'])
    
    # Path to directory where finished, compressed files will be archived
    p_archive = Path(dr['archive-dir'])
    
    # make the archive directory if it does not exists
    p_archive.mkdir(parents=True, exist_ok=True)
    
    # Loop through file patterns
    for pat in dr['file-patterns']:
        for p_f in p_dr.glob(pat['pattern']):
            # p_f is a Path to a file matching the pattern.
            # test to see if it is a completed file
            file_age = time.time() - p_f.stat().st_mtime
            if file_age > pat['finished-secs']:
                p_arch_fn = p_archive / (p_f.name + '.bz2')
                try:
                    with bz2.open(p_arch_fn, "wb") as fout:
                      fout.write(p_f.read_bytes())

                    # add the archive file to the upload list. I'm converting the Path objects
                    # to strings so the pickle is more straight-forward.
                    new_upload = (str(p_arch_fn), str(PurePosixPath(dr['bucket-and-key']) / p_arch_fn.name))
                    upload_pending.append(new_upload)
                                  
                    # delete the source file
                    p_f.unlink()
                    
                    logging.info(f'Archived {p_f}')
                    
                except Exception as e:
                    logging.exception(f'Error attempting to archive {p_f}')            
    
    # Check for files to delete in the archive directory
    # but only delete if the file is not in the upload pending list
    if len(upload_pending):
        pending_file_list = list(zip(*upload_pending))[0]
    else:
        pending_file_list = []
    if 'delete-after' in dr and dr['delete-after'] > 0:
        max_age = dr['delete-after'] * 24 * 3600.
        for p_f in p_archive.glob('*.bz2'):
            file_age = time.time() - p_f.stat().st_mtime
            if not str(p_f) in pending_file_list and file_age > max_age:
                p_f.unlink()
                logging.info(f'{p_f} deleted due to exceeding max age.')

pprint(upload_pending)

2019-07-07 11:49:51,780 - INFO - <ipython-input-57-30f3160d6c3c> - /home/tabb99/arch-test/archive/xyz/another01.txt.bz2 deleted due to exceeding max age.
2019-07-07 11:49:51,782 - INFO - <ipython-input-57-30f3160d6c3c> - /home/tabb99/arch-test/archive/xyz/another02.txt.bz2 deleted due to exceeding max age.


[]


## To Do

* In code, supply default values for many of the configuration entries,
  such as 'finished-secs' (5 seconds).
* copy log file once daily to a location that is watched by this
  utility.

In [63]:
# Upload files
s3 = boto3.resource('s3')

# need to copy the list to iterate across it because this
# codes deletes items out of the original list.
for fn, bucket_key in upload_pending.copy():
    # if this file no longer exists, delete it from the upload list.
    if not Path(fn).exists():
        upload_pending.remove((fn, bucket_key))
        logging.info(f'{fn} does not exist, so will not be uploaded.')
        
    # split the bucket + key into a bucket and a key.  The urlparse
    # function does this well, except for leaving a leading slash on the
    # key.
    parts = urlparse('s3://' + bucket_key)
    bucket = parts.netloc
    key = parts.path[1:]   # remove leading slash
    try:
        s3.meta.client.upload_file(fn, bucket, key)
        upload_pending.remove((fn, bucket_key))
        logging.info(f'Uploaded {fn}')
    except Exception as e:
        logging.exception('Error attempting to upload {fn}')
print(upload_pending)

[]


In [35]:
# save the upload pending list
# Really should put this in a Finally clause so that with any weird errors
# this list will be saved.
with p_up_pending.open('wb') as fout:
    pickle.dump(upload_pending, fout)


In [24]:
l1 = [ (1, 2), (3, 4), (5, 6)]
list(zip(*l1))[0]

(1, 3, 5)