## Generate BagIt Archives for OBIS-USA Data

#### Tristan P. Wellman<br>Science Analytics and Synthesis (SAS)<br>U.S. Geological Survey, Denver, Colorado
#### last modified 2/17/2019

Script creates BagIt archives for the OBIS-USA ScienceBase Collection


In [1]:
# Python packages
#
import os
import re
import tarfile
import bagit
import tempfile
import shutil
import json
import pandas as pd
import datetime
import requests
import uuid
import collections

In [2]:
# Definition to set-up optional archive arguments 
#
# Functions:
# 1) Specify archive metadata, and 
# 2) Specify search criteria to select data files from ScienceBase items. 
# 3) customize defaults inputs via kwargs 

def archive_options(sbitem, **kwargs):

    archive_func = collections.OrderedDict()
        
    # Archive metadata (defaults) -
    #
    archive_func['archive_meta'] = collections.OrderedDict([
        ('Archive-Tag-Name', 'Archive of ScienceBase Item'),
        ('Archive-Prcessing-Date', datetime.datetime.now().isoformat()),
        ('Archive-Host-Machine', str(uuid.uuid1())),
        ('Archive-Job-Number', str(uuid.uuid4())),
        ('Source-Agency-Name', 'United States Geological Survey'),
        ('Source-Agency-Physical-Address', 'Denver Federal Center, Building 810, Lakewood, Colorado, USA'),
        ('Source-Agency-Group', 'Science Analytics and Synthesis (SAS), Core Science Systems'),
        ('Source-Agency-Contact-Name', 'John Doe'),
        ('Source-Agency-Contact-Phone', '999-999-9999'),
        ('Source-Agency-Contact-Email', 'jdoe@usgs.gov'),
        ('Source-Agency-Data-Source', sbitem['link']['url']),
        ('Source-Agency-Data-Title',sbitem['title']),
    ])
    
#   Search criteria - 
#
#   Inputs: "include" and "exclude" keys with search parameters. 
#          first key (include or exclude) is performed first, 
#          second key is performed second
# 
#   function: selects files to include and/or exclude using search parameters
#
#   Search Parameters
#       1) 'ignore': do not use  
#       2) 'all': selects all files 
#       3) custom (text, regex) search term  e.g. '\.nc' selects files with .nc extension

#   Include all item files except (exclude) those with .nc* file extensions 
#
    archive_func['search'] = collections.OrderedDict([('include' ,'all'), ('exclude' , '\.nc')])
    
    
#   Overwrite or add key-values via **kwargs (optional) 

    if 'custom_meta' in kwargs:
        for key in kwargs['custom_meta']:
            if key in archive_func:
                for sub_key in kwargs['custom_meta'][key]:
                    archive_func[key][sub_key] = kwargs['custom_meta'][key][sub_key]
            else:
                archive_func[key] = kwargs['custom_meta'][key]
            

    return archive_func

In [3]:
# Python class to archive ScienceBase item using BagIt
#
class archive_sbitem():
    
    '''Class to archive ScienceBase data by item number.
       Functions include: select, retrieve, describe, package, 
       and compress archive content'''
    
    
    # Store ScienceBase item information
    #
    def __init__(self, sbitem, **kwargs):

        '''Performs all processing steps in sequence to create BagIt archive'''
            
        # task sequence (workflow)    
        #
        self.sbitem = sbitem
        self._inputs(**kwargs)
        self._gen_archive()
        self._sbfile_select()
        self._get_datafiles()
        self._add_meta()
        self._validate()
        self._tar_archive()
        
        return print("\t{}: {}".format('Archive completed for ScienceBase item: ', sbitem['id']))
        
        
    # Access get_item capabilities
    #
    def __getitem__(self, item):
        return getattr(self, item)
     
        
    # Create Bagit object and temporary workspace 
    #
    def _gen_archive(self):
        
        '''Structures BagIt folder object'''
        
        self.bagit_folder = tempfile.mkdtemp()
        self.data_folder = os.path.join(self.bagit_folder, 'data')
        self.archive = bagit.make_bag(self.bagit_folder, checksum=['sha256'])
        
        
    # Register search criteria (actual functions) and archive metadata
    #
    def _inputs(self, **kwargs):
        
        '''Stores input search criteria and archive metadata'''
        
        # search criteria (include, exclude tags)
        #
        if 'search' not in kwargs:
            self.search = {'include' :'all', 'exclude' : None}
        elif 'include' not in kwargs['search'] or 'exclude' not in kwargs['search']:
            self.search = {'include' :'all', 'exclude' : None}
        else:
            self.search = kwargs['search']
            
        if isinstance(self.search['include'], str):
            self.search['include'] = [self.search['include']]
        if isinstance(self.search['exclude'], str):
            self.search['exclude'] = [self.search['exclude']]
            
        for key in self.search:
            parlist = []
            for criteria in self.search[key]:
                if criteria.lower() == 'all':
                    parlist.append("(.*?)")
                elif criteria.lower() == 'ignore':
                    parlist.append(None)
                else:
                    parlist.append(criteria)
            self.search.update({key:parlist})
                        
        # archive metadata record
        #
        if 'archive_meta' not in kwargs:
            self.archive_meta = None
        else:
            self.archive_meta = kwargs['archive_meta']
    
        
    # Select ScienceBase file content using quasi-flexible search criteria
    #
    def _sbfile_select(self):
    
        '''Identifies ScienceBase files to archive based on search criteria'''
            
        cdict = {'include': True, 'exclude' : False}
         
        # search through ScienceBase item (files and facets keywords)
        #
        self.file_select = []
        self.file_name = []
        if 'facets' in self.sbitem:
            for fdic in self.sbitem['facets']:
                if 'files' in fdic:
                    for dfile in fdic['files']:
                        file_chk = None
                        for item in self.search.items():
                            fchk = cdict[item[0].lower()]
                            if item[1] is not None:
                                for criteria in item[1]:
                                    regx_srch = r"{}".format(criteria)
                                    if re.search(regx_srch, dfile['name']):
                                        file_chk = fchk                
                        if file_chk:
                            self.file_select.append(dfile['downloadUri'])
                            self.file_name.append(dfile['name'])
                        
        if 'files' in self.sbitem:
            for dfile in self.sbitem['files']:         
                file_chk = None
                for item in self.search.items():
                    fchk = cdict[item[0].lower()]
                    if item[1] is not None:
                        for criteria in item[1]:
                            regx_srch = r"{}".format(criteria)
                            if re.search(regx_srch, dfile['name']):
                                file_chk = fchk            
                if file_chk:
                    self.file_select.append(dfile['downloadUri'])
                    self.file_name.append(dfile['name'])
    
            
    # stream file retrieve, file insertion into archive folder 
    #
    def _get_datafiles(self):
        
        '''Streams ScienceBase files into BagIt data folder'''
        
        for indx, file_path in enumerate(self.file_select):
            request = requests.get(file_path, stream=True)
            if request.status_code == 200:
                bag_path = self.data_folder + '/' + self.file_name[indx] 
                with open(bag_path, 'wb') as f:
                    request.raw.decode_content = True
                    shutil.copyfileobj(request.raw, f)            
        sbitem_fname  = self.data_folder + '/' + 'ScienceBase_record_' + self.sbitem['id'] + '.json'
        with open(sbitem_fname, 'w') as f:
            json.dump(self.sbitem, f)   
        self.archive.save(manifests=True, processes=3)
    
    
    # Customize archive metadata  
    #
    def _add_meta(self):
        
        '''Adds metadata to BagIt folder'''
        
        if self.archive_meta:
            self.archive.info.update(self.archive_meta)
            self.archive.save(manifests=True, processes=4)  
                
                
    # Validate archive
    #
    def _validate(self):
        
        '''validation check for BagIt folder'''
        
        self.validate = []
        if self.archive.is_valid():
            self.validate.append("Bagit archive is structurally valid")
        else:
            self.validate.append("Bagit archive is structurally invalid")
        try:
            self.archive.validate()
        except bagit.BagValidationError as e:
            for d in e.details:
                if isinstance(d, bagit.ChecksumMismatch):
                    self.validate.append("expected %s to have %s checksum of %s but found %s" %
                          (d.path, d.algorithm, d.expected, d.found))
                    
                    
    # Package archive folder in *.tar compressed format (save to local archive_folder)
    #
    def _tar_archive(self):
        
        '''Compresses BagIt folder and save to archive folder'''
        
        # Ensure archive folder exists
        #
        dirname = 'Archives'
        tar_directory = os.path.join(os.getcwd(),dirname)
        if not os.path.isdir(tar_directory):  
            try:  
                os.mkdir(tar_directory)
            except OSError: 
                print("Creation of the directory %s failed" % tar_directory)

        # Save archive as *.tar file
        #
        #sbitem_title = self.sbitem['title']   
        #tar_filename = (re.sub(r'\W+', '', sbitem_title).replace(' ','')) + '.tgz'
        tar_filename = 'ScienceBase_Archive_' + self.sbitem['id'] + '.tgz'
        with tarfile.open('./' + dirname + '/' + tar_filename, "w:gz") as tar:
            tar.add(self.bagit_folder, arcname=tar_filename.strip('.tgz'))
            self.tar_folder = tar_filename

In [4]:
# Query ScienceBase API for OBIS-USA parent record information
#
sb_url = 'https://www.sciencebase.gov/catalog/items?parentId=579b64c6e4b0589fa1c98118&max=1000&format=json&fields=id,title,'
OBIS_sbinfo = requests.get(sb_url).json()
total_items = len(OBIS_sbinfo['items'])
print('{}: {}\n'.format('OBIS-USA Item record count: ', total_items))

OBIS-USA Item record count: : 139



In [5]:
# TEST: Archive multiple ScienceBase items (associated files + item information) in OBIS-USA collection
#

# Customize default BagIt (archive) metadata for OBIS-USA collection

aux_dict = {}
aux_dict['archive_meta'] = collections.OrderedDict([
        ('Archive-Tag-Name', 'OBIS_USA Archive'),
        ('Source-Agency-Contact-Name', 'Abby Benson'),
        ('Source-Agency-Contact-Email', 'abenson@usgs.gov'),
        ('Source-Agency-Contact-Phone', '303.202.4087'),
    ])


# For testing - limit number of ScienceBase records (items) to process. 
# switch for loop (below) or set to ~1e6 to process all archives in parent collection
max_rec = 3


# Iterate through ScienceBase items, Create archives for each 
#
#for indx, sbitem in enumerate(OBIS_sbinfo['items']):
for indx, sbitem in enumerate(OBIS_sbinfo['items'][0:min(max_rec, total_items)]):
    filename = 'ScienceBase_Archive_' + self.sbitem['id'] + '.tgz'
    tarpath = os.path.join(os.path.join(os.getcwd(),'Archives'), filename) 
    if os.path.exists(tarpath): 
        print('{}) {}\n\t{}\n\t{}'.format('Ignoring Archive (exists):', indx+1, 
                                          sbitem['title'][0:80], sbitem['link']['url']))
    else:
        print('{} {}\n\t{}\n\t{}'.format('Creating Archive:', indx+1,  
                                          sbitem['title'][0:80], sbitem['link']['url']))
        url = str(sbitem['link']['url']) + '?format=json' 
        sb_item = requests.get(url).json()
        archive_func = archive_options(sbitem, custom_meta = aux_dict)
        BagIt_Archive = archive_sbitem(sb_item, **archive_func)
        
print("\n *** Archiving complete ***")


Creating Archive: 1
	NMFS-COPEPOD: The Global Plankton Database, WEBSEC Sub-Collection
	https://www.sciencebase.gov/catalog/item/570e9a18e4b0ef3b7ca253aa
	Archive completed for ScienceBase item: : 570e9a18e4b0ef3b7ca253aa
Creating Archive: 2
	USGS South Florida Fish and Invertebrate Assessment Network- Harvest
	https://www.sciencebase.gov/catalog/item/53a1cc5fe4b0403a441545a5
	Archive completed for ScienceBase item: : 53a1cc5fe4b0403a441545a5
Creating Archive: 3
	National Museum of Natural History - Invertebrate Zoology
	https://www.sciencebase.gov/catalog/item/570d64fee4b0ef3b7ca14e33
	Archive completed for ScienceBase item: : 570d64fee4b0ef3b7ca14e33

 *** Archiving complete ***


In [6]:
# Send Bagit archives(s) somewhere in the world .....

# do something amazing here ...
