In [1]:
##################################################
## Script to manually generate checksum for bags
## and check if internal checksum matches
##################################################
## GNU General Public License, version 2
## Author: UTSC DSU 
##################################################

In [2]:
import xml.etree.ElementTree as ET
import pandas as pd
import configparser
import os
import zipfile
import hashlib
import re

In [3]:
def find_latest_file_version(zip, datastream_ids_to_check):
    #list of files matching a given id
    id_matching_files = []
    
    #latest version of the file found and the version #
    latest_file = ''
    latest_ver = 0
    
    #list of all the latest files given
    latest_files = []
    
    #find latest version of all the datastream_ids_to_check
    all_files = zip.namelist()
    for id in datastream_ids_to_check:
        #find all files matching the id
        for f in all_files:
            file_parts = f.split('.')
            if (file_parts[0] == id):
                id_matching_files.append(f)
        #loop through all the files and find the latest version
        #print("[%s]" % ', '.join(map(str, id_matching_files)))
        for f in id_matching_files:
            file_parts = f.split('.')
            latest_file = f
            latest_ver = file_parts[1]
            if(file_parts[1] > latest_ver):
                latest_file = f
                latest_ver = file_parts[1]
        
        #if file exists add it otherwise add the id
        if (latest_file == ''):
            latest_files.append(id)
        else:
            latest_files.append(latest_file)
            
        #set everything back for next set of ids
        latest_file = ''
        latest_ver = 0
        id_matching_files = []
        
    return latest_files

In [8]:
#Manually generate checksums for datastream_ids_to_check files 
#within the initial_zip data folder 
#(gets latest version of file or just the file you input)
def manually_generate_checksums(initial_zip, datastream_ids_to_check, bag_zip_location):
    #open initial zip file and save it as init_zip
    init_zip = zipfile.ZipFile(initial_zip)

    #path for bag data content within init_zip
    #bag_full_path = os.path.splitext(initial_zip)[0]+'/data/'
    #bag_location = bag_full_path.split('/')[-3] +'/data/'
   # print(bag_location)
    #bag_name = os.path.splitext(initial_zip.split('/')[-1][4:])[0]

    #the location for the foxml files
    #bag_zip_location = bag_location + bag_name + '_foxml_atomzip.zip'
    
    #open zip file within data
    zip = zipfile.ZipFile(init_zip.open(bag_zip_location))
    print("\nGenerating checksums for bag stored at " + bag_zip_location)
    
    latest_files = find_latest_file_version(zip, datastream_ids_to_check)
    
    #list of files to check through can set to zip.namelist() 
    #to look through all files within the foxml folder
    filenames = latest_files
    #filenames = zip.namelist()

    #store the files and their checksums in a dict
    manual_checksum_list = {}
    #loop for checking all files in the filenames list
    for filename in filenames:
        filename = filename.strip()
        #try to generate checksum for file (if it exists)
        try:
            file = zip.open(filename)

            #generate md5 checksum for the file
            hash_md5 = hashlib.md5()
            for chunk in iter(lambda: file.read(4096), b""):
                hash_md5.update(chunk)
            generated_checksum = hash_md5.hexdigest()
            #print(generated_checksum)
            
            #print('Generated manual checksum for \'' + filename + '\'')

            manual_checksum_list[os.path.basename(filename)] = generated_checksum
        except:
            #if file doesnt exist print to output
            print('File named \'' + filename + '\' does not exist (ignoring it)')

    #print('Finished manually generating checksum for all files within data folder')
    return manual_checksum_list

In [9]:
#Get internal checksum stored in the foxml.xml file and compare them
#against manually generated checksum  (calls the manually_generate_checksum method)
def datastream_checksum_validator(initial_zip_loc, datastream_ids_to_check):
    
    status = True
    
    #open initial zip file
    init_zip = zipfile.ZipFile(initial_zip_loc)

    #path for bag contents within init_zip
    bag_full_path = os.path.splitext(initial_zip_loc)[0]+'/data/'
    bag_location = bag_full_path.split('/')[-3] +'/data/'

    bag_name = os.path.splitext(initial_zip_loc.split('/')[-1][4:])[0]
    
    #get root of xml tree
    #bag_xmldata_path = bag_location+'foxml.xml'
    
    bag_zip_location = bag_location + bag_name + '_foxml_atomzip.zip'
    
    #get list of foxmls within the data folder
    foxml_file_list = []
    for file in init_zip.namelist():
        if file.startswith(bag_location) and file.endswith('_foxml_atomzip.zip'):
            foxml_file_list.append(file)
      
    #loop through every foxml file
    for bag_zip_location in foxml_file_list:
        zip = zipfile.ZipFile(init_zip.open(bag_zip_location))

        bag_xmldata_path = 'atommanifest.xml'

        #call to generate checksums from files in data_stream_ids_to_check 
        manual_checksum_list = manually_generate_checksums(initial_zip_loc, datastream_ids_to_check, bag_zip_location)

        #find the root of the xml file
        #root = ET.parse(zip.open(bag_xmldata_path)).getroot()
        root = ET.parse(zip.open('atommanifest.xml')).getroot()

        #find the "datastreamVersion" tag in the xml file (contains filenames)
        datastreamVersion_list = root.findall("./{http://www.w3.org/2005/Atom}entry")

     #   print(len(datastreamVersion_list))
     #   print("testing HERE: " +datastreamVersion_list)

        #print('Comparing with internal checksum stored at: ' + bag_xmldata_path)

        #loop through the files that have their manually generated checksum
        for filename in manual_checksum_list:
            #loop through find all datastreamVersion tags (contains filename)
            #can redo to make it run faster... 
            for c_tag in datastreamVersion_list:
                generated_checksum = manual_checksum_list[filename]

                #find contentDigest tags (contains internally stored checksum)
                digest_tag_all =c_tag.findall ('./{http://www.w3.org/2005/Atom}category')
                file_name_tag = c_tag.findall('./{http://www.w3.org/2005/Atom}content')
                if len(file_name_tag) != 0:
                    #print(file_name_tag)
                    tag_file_name = file_name_tag[0].get('src')
                    tag_file_name = os.path.splitext(tag_file_name)[0]
                else:
                    tag_file_name = ''
                tag_internal_checksum = ''
                for digest_tag in digest_tag_all:
                    if digest_tag.get('scheme') == "info:fedora/fedora-system:def/model#digest":
                        tag_internal_checksum = digest_tag.get('term')

                #check if internally stored checksum exists
                #if so find the two checksums match given they have the same filename
                #print("TESTING " + tag_file_name +" "+ tag_internal_checksum +" "+ os.path.splitext(filename)[0])
                if (tag_internal_checksum != '' and\
                    tag_file_name == os.path.splitext(filename)[0] and\
                    tag_internal_checksum == generated_checksum): 

                    print("PASS: File " + tag_file_name +  " with internal checksum \'"\
                          + tag_internal_checksum + "\' matches manual checksum")

                #if the checksums dont match print the non matching one and return false
                elif (tag_internal_checksum != '' and\
                    tag_file_name == os.path.splitext(filename)[0] and\
                    tag_internal_checksum != generated_checksum):

                    print("FAIL: File " + tag_file_name + \
                          " has a conflicting generated and internal checksum as follows:")
                    data = {'internally stored checksum': [tag_internal_checksum], \
                            'manually generated checksum': [generated_checksum]}
                    df = pd.DataFrame(data)
                    print(df)
                    status = False
            #print('\n')
    return status