# MODS XML Analysis Notebook

This notebook retrieves and analyzes MODS XML metadata from an API endpoint.

In [1]:
import xml.etree.ElementTree as ET
import pandas as pd
import matplotlib.pyplot as plt
import seaborn as sns
import os
import logging
from datetime import datetime
import random
from pathlib import Path
import warnings
import openpyxl
from openpyxl.styles import Font, Alignment, PatternFill
from openpyxl.utils import get_column_letter
import re
from concurrent.futures import ThreadPoolExecutor
from itertools import islice
import math

warnings.filterwarnings('ignore')

XML_DIR = "mods_records"  # Directory containing XML files

#SAMPLE=100
SAMPLE=None

## Set up Logging

In [2]:
def setup_logging():
    """Configure logging with both file and console handlers."""
    # Create logs directory if it doesn't exist
    os.makedirs('logs', exist_ok=True)
    
    # Create formatters for different levels of detail
    brief_formatter = logging.Formatter('%(message)s')
    verbose_formatter = logging.Formatter('%(asctime)s - %(levelname)s - %(message)s')
    
    # Create and configure handlers
    log_filename = f"logs/mods_analysis_{datetime.now().strftime('%Y%m%d_%H%M%S')}.log"
    
    # Clear any existing handlers
    logger = logging.getLogger()
    for handler in logger.handlers[:]:
        logger.removeHandler(handler)
        
    # File handler - gets everything with full detail
    file_handler = logging.FileHandler(log_filename)
    file_handler.setLevel(logging.DEBUG)
    file_handler.setFormatter(verbose_formatter)
    
    # Console handler - gets just the important stuff briefly
    console_handler = logging.StreamHandler()
    console_handler.setLevel(logging.INFO)
    console_handler.setFormatter(brief_formatter)
    
    # Configure root logger
    logger.setLevel(logging.DEBUG)
    logger.addHandler(file_handler)
    logger.addHandler(console_handler)
    
    return logger


## Data Processing Functions

For every record, extract the specific data you want.

In [3]:
def extract_publisher(record):
    """Extract publisher information from MODS record."""
    publisher = record.find(".//{*}publisher")
    if publisher is not None and publisher.text:
        return publisher.text.strip()
    return None

def extract_classifications(record):
    """Extract classification numbers from MODS record."""
    classifications = {
        'lcc': None,
        'ddc': None,
        'other': []  # For any other classification schemes
    }
    
    for classification in record.findall(".//{*}classification"):
        if classification.text:
            authority = classification.get('authority', '').lower()
            value = classification.text.strip()
            
            if authority == 'lcc':
                classifications['lcc'] = value
            elif authority == 'ddc':
                classifications['ddc'] = value
            else:
                # Store other classification schemes with their authority
                classifications['other'].append({
                    'authority': authority,
                    'value': value
                })
    
    return classifications

def extract_identifiers(record):
    """Extract standard identifiers from MODS record, excluding local identifiers."""
    identifiers = {
        'issn': None,
        'issn_l': None, 
        'lccn': None,
        'oclc': None,
        'doi': None,
        'isbn': None,
        'stock_number': None,
        'issn_other_online': None,
    }
    
    # Extract standard identifiers
    for identifier in record.findall(".//{*}identifier"):
        id_type = identifier.get('type', '')
        
        # Skip local identifiers
        if id_type == 'local':
            continue
            
        # Skip invalid identifiers
        if identifier.get('invalid') == 'yes':
            continue
            
        if identifier.text:
            value = identifier.text.strip()
            
            if id_type == 'issn':
                identifiers['issn'] = value
            elif id_type == 'issn-l':
                identifiers['issn_l'] = value
            elif id_type == 'lccn':
                identifiers['lccn'] = value
            elif id_type == 'oclc':
                identifiers['oclc'] = value
            elif id_type == 'doi':
                identifiers['doi'] = value
            elif id_type == 'isbn':
                identifiers['isbn'] = value
            elif id_type == 'stock number':
                identifiers['stock_number'] = value
    
    # Look for online version ISSN in relatedItems
    for related_item in record.findall(".//{*}relatedItem"):
        # Check if this is an online version
        is_online = False
        display_label = related_item.get('displayLabel', '').lower()
        other_type = related_item.get('otherType', '').lower()
        item_type = related_item.get('type', '').lower()
        
        if ('online' in display_label or 
            'online' in other_type or 
            'online' in item_type):
            is_online = True
        
        if is_online:
            # Look for ISSN within this relatedItem
            for identifier in related_item.findall(".//{*}identifier[@type='issn']"):
                if identifier.text:
                    identifiers['issn_other_online'] = identifier.text.strip()
                    break  # Take the first one we find
                
    return identifiers

    
def extract_genres(record):
    """Extract genre information from both direct genre tags and subject/genre elements,
    standardizing singular/plural forms."""
    genres = set()
    
    # Helper function to standardize genre text
    def standardize_genre(genre_text):
        genre_text = re.sub(r'\.+$', '', genre_text.strip().lower())
        
        if genre_text.lower() == "periodical":
            return "periodicals"
        return genre_text.lower()
    
    # Get genres from direct genre tags
    for genre in record.findall(".//{*}genre"):
        if genre is not None and genre.text:
            genres.add(standardize_genre(genre.text))
    
    # Get genres from subject/genre elements
    for subject in record.findall(".//{*}subject"):
        for genre in subject.findall(".//{*}genre"):
            if genre is not None and genre.text:
                genres.add(standardize_genre(genre.text))
    
    return sorted(set(genres))

def extract_start_date(origin_info):
    """Extract start date from originInfo element."""
    date_issued = origin_info.find(".//{*}dateIssued[@encoding='marc'][@point='start']")
    if date_issued is not None and date_issued.text:
        try:
            return int(date_issued.text)
        except ValueError:
            return None
    return None

def extract_end_date(origin_info):
    """Extract start date from originInfo element."""
    date_issued = origin_info.find(".//{*}dateIssued[@encoding='marc'][@point='end']")
    if date_issued is not None and date_issued.text:
        try:
            return int(date_issued.text)
        except ValueError:
            return None
    return None

def extract_lcsh_subjects(record):
    """Extract subject topics from LCSH."""
    subjects = set()
    for subject in record.findall(".//{*}subject[@authority='lcsh']"):
        topics = subject.findall(".//{*}topic")
        for topic in topics:
            if topic is not None and topic.text:
                subjects.add(topic.text.lower())
    return list(subjects)

def extract_fast_subjects(record):
    """Extract subject topics from LCSH."""
    subjects = set()
    for subject in record.findall(".//{*}subject[@authority='fast']"):
        topics = subject.findall(".//{*}topic")
        for topic in topics:
            if topic is not None and topic.text:
                subjects.add(topic.text.lower())
    return list(subjects)

def extract_record_identifier(record):
    """Extract ALMA identifier from MODS record with fallbacks."""
    try:
        # Try to get ALMA identifier
        record_info = record.find(".//{*}recordInfo/{*}recordIdentifier[@source='MH:ALMA']")
        if record_info is not None and record_info.text:
            return record_info.text
        
        # First fallback: any record identifier
        record_info = record.find(".//{*}recordInfo/{*}recordIdentifier")
        if record_info is not None and record_info.text:
            return f"record_{record_info.text}"
        
        # Second fallback: title
        title_info = record.find(".//{*}titleInfo/{*}title")
        if title_info is not None and title_info.text:
            return sanitize_filename(title_info.text)
            
        # Final fallback
        return "untitled_record"
    except Exception as e:
        print(f"Error extracting identifier: {e}")
        return "error_record"

def determine_format(record):
    """Determine if the resource is print, electronic, microfilm, microfiche, or online."""
    formats = set()
    
    # Check physical description
    phys_desc = record.find(".//{*}physicalDescription")
    if phys_desc is not None:
        formats.add('probably print')
        # Check all form elements
        for form in phys_desc.findall(".//{*}form"):
            if form is not None and form.text:
                formats.add(form.text.lower())

    # Check for microform information in reproduction notes
    for note in record.findall(".//{*}note[@type='reproduction']"):
        if note is not None and note.text:
            note_text = note.text.lower()
            if 'microfilm' in note_text:
                formats.add('microfilm')
            if 'microfiche' in note_text:
                formats.add('microfiche')
    
    # Check for microfilm in relatedItems
    for related_item in record.findall(".//{*}relatedItem"):
        # Check various attributes that might indicate microfilm version
        display_label = related_item.get('displayLabel', '').lower()
        other_type = related_item.get('otherType', '').lower()
        item_type = related_item.get('type', '').lower()
        
        # Also check the title within relatedItem
        title_element = related_item.find(".//{*}title")
        title_text = title_element.text.lower() if title_element is not None and title_element.text else ''
        
        # If any of these mention microfilm, add it to formats
        if any('microfilm' in x for x in [display_label, other_type, item_type, title_text]):
            formats.add('microfilm')
    
    # Check for any online-related information in relatedItems
    for related_item in record.findall(".//{*}relatedItem"):
        # Check various attributes that might indicate online version
        display_label = related_item.get('displayLabel', '').lower()
        other_type = related_item.get('otherType', '').lower()
        item_type = related_item.get('type', '').lower()
        
        if any('online' in x for x in [display_label, other_type, item_type]):
            formats.add("online")
            break

    # NEW: Check for specific genre values that indicate online format
    for genre in record.findall(".//{*}genre"):
        if genre is not None and genre.text:
            genre_text = genre.text.strip().lower()
            if (genre_text == "computer network resources." or 
                genre_text == "electronic journals."):
                formats.add("online")
                break
    
    return list(formats) if formats else ["unknown"]

def determine_resource_type(record):
    """Determine if the resource is a journal/serial or book."""
    origin_info = record.find(".//{*}originInfo")
    if origin_info is not None:
        issuance = origin_info.find(".//{*}issuance")
        if issuance is not None and issuance.text:
            return issuance.text.strip()  # Return the text value instead of Element
    return "unknown"

def extract_locations(record):
    """Extract physical locations and Harvard repositories from MODS record."""
    locations = set()
    
    # Get physical locations
    for location in record.findall(".//{*}physicalLocation"):
        if location is not None and location.text:
            locations.add(location.text.strip())
    
    # Get Harvard repositories from extension
    for repository in record.findall(".//{*}HarvardRepository"):
        if repository is not None and repository.text:
            locations.add(repository.text.strip())
    
    # Return as sorted list
    return sorted(list(locations)) if locations else []

def extract_subject_authorities(record):
    """Extract unique subject authorities from MODS record."""
    authorities = set()
    
    for subject in record.findall(".//{*}subject"):
        authority = subject.get('authority')
        if authority:  # Only add if authority exists
            authorities.add(authority.lower())  # Normalize to lowercase
    
    return sorted(list(authorities))  # Return as sorted list

def extract_place(record):
    """Extract place information from MODS record, preferring text version."""
    origin_info = record.find(".//{*}originInfo")
    if origin_info is not None:
        # First try to find a placeTerm with type="text"
        for place in origin_info.findall(".//{*}placeTerm[@type='text']"):
            if place is not None and place.text:
                return place.text.strip()
    
    return None

def extract_language(record):
    """Extract language information from MODS record, preferring text version."""
    language_element = record.find(".//{*}language")
    if language_element is not None:
        # Look for languageTerm with type="text"
        language_term = language_element.find(".//{*}languageTerm[@type='text']")
        if language_term is not None and language_term.text:
            return language_term.text.strip()
    
    return None

## Read in data from XML Files

In [4]:
def get_xml_files(directory, sample_size=None):
    """
    Get list of XML files from directory, optionally sampling random subset.
    
    Args:
        directory (str): Path to directory containing XML files
        sample_size (int, optional): Number of files to randomly sample. If None, use all files.
    
    Returns:
        list: Paths to XML files to process
    """
    # Get all XML files in directory
    xml_files = list(Path(directory).glob('*.xml'))
    logger.info(f"Found {len(xml_files)} XML files in {directory}")
    
    if not xml_files:
        logger.error(f"No XML files found in {directory}")
        return []
    
    # Sample if requested and possible
    if sample_size and sample_size < len(xml_files):
        xml_files = random.sample(xml_files, sample_size)
        logger.info(f"Randomly sampled {sample_size} files for processing")
    
    return xml_files


## Process the XML files

In [5]:
def process_xml_files(xml_files, chunk_size=100000):
    """
    Process XML files in chunks and save intermediate results.
    
    Args:
        xml_files (list): List of paths to XML files
        chunk_size (int): Number of files to process before saving intermediate results
    
    Returns:
        pandas.DataFrame: Combined data from all processed files
    """
    all_publications = []
    total_files = len(xml_files)
    chunk_number = 1
    
    # Process files in chunks
    for chunk in chunk_files(xml_files, chunk_size):
        chunk_publications = []
        chunk_start = (chunk_number - 1) * chunk_size
        
        # Use ThreadPoolExecutor for parallel processing of files within chunk
        with ThreadPoolExecutor(max_workers=4) as executor:
            futures = []
            for xml_file in chunk:
                future = executor.submit(process_single_file, xml_file)
                futures.append(future)
            
            # Process results as they complete
            for i, future in enumerate(futures):
                try:
                    result = future.result()
                    if result:
                        chunk_publications.append(result)
                    
                    # Log progress within chunk
                    current_file = chunk_start + i + 1
                    if current_file % 10000 == 0 or current_file == total_files:
                        logger.info(f"Processed {current_file}/{total_files} files")
                        
                except Exception as e:
                    logger.error(f"Error processing file: {str(e)}")
        
        # Create DataFrame for chunk and save intermediate result
        if chunk_publications:
            chunk_df = pd.DataFrame(chunk_publications)
            intermediate_file = f"mods_analysis_chunk_{chunk_number}_{datetime.now().strftime('%Y%m%d_%H%M%S')}.xlsx"
            save_formatted_excel(chunk_df, intermediate_file)
            logger.info(f"Saved intermediate chunk {chunk_number} to {intermediate_file}")
        
        # Add chunk results to overall results
        all_publications.extend(chunk_publications)
        chunk_number += 1
    
    # Create final DataFrame from all processed records
    df = pd.DataFrame(all_publications)
    logger.info(f"Created final DataFrame with {len(df)} records from {total_files} files")
    
    return df

## Process a Single Record

In [6]:
def process_single_file(xml_file):
    """Process a single XML file and return publication data."""
    try:
        # Read and parse XML file
        tree = ET.parse(xml_file)
        root = tree.getroot()
        
        # Process the record
        publication_data = process_single_record(root, str(xml_file.name))
        logger.debug(f"Successfully processed {xml_file}")
        return publication_data
        
    except ET.ParseError as e:
        logger.error(f"Failed to parse {xml_file}: {str(e)}")
    except Exception as e:
        logger.error(f"Error processing {xml_file}: {str(e)}")
    return None


In [7]:
def process_single_record(record, filename):
    """Process a single MODS record and return structured data."""
    try:
        # Extract basic information
        title_info = record.find(".//{*}titleInfo/{*}title")
        title = title_info.text if title_info is not None else "Unknown Title"
        
        origin_info = record.find(".//{*}originInfo")
        
        # Use existing extraction functions
        publication_data = {
            'filename': filename,
            'title': title,
            'publisher': extract_publisher(record),
            'start_year': extract_start_date(origin_info) if origin_info is not None else None,
            'end_year': extract_end_date(origin_info) if origin_info is not None else None,
            'format': determine_format(record),
            'type': determine_resource_type(record),
            'lcsh_subjects': extract_lcsh_subjects(record),
            'fast_subjects': extract_fast_subjects(record),
            'genres': extract_genres(record),
            'locations': extract_locations(record),
            'authorities': extract_subject_authorities(record),
            'place': extract_place(record),  # Add new place field
            'language': extract_language(record),  # Add new language field
            **extract_identifiers(record),
            'record_identifier': extract_record_identifier(record),
            **extract_classifications(record)
        }
        
        return publication_data
        
    except Exception as e:
        logger.error(f"Error processing record: {str(e)}")
        return None


## Break the giant list of files up into smaller chunks

In [8]:
def chunk_files(files, chunk_size=100000):
    """Generator to split files into chunks."""
    iterator = iter(files)
    return iter(lambda: list(islice(iterator, chunk_size)), [])


## Format the output XLS

In [9]:
def save_formatted_excel(df, output_file):
    """Save DataFrame to Excel with consistent formatting."""
    # First save with pandas
    df.to_excel(output_file, index=False)
    
    # Then open with openpyxl for formatting
    wb = openpyxl.load_workbook(output_file)
    ws = wb.active
    
    # Define styles
    regular_font = Font(size=10)
    header_font = Font(size=10, bold=True)
    alignment = Alignment(wrap_text=True, vertical='top')
    header_fill = PatternFill(start_color='E0E0E0', end_color='E0E0E0', fill_type='solid')
    
    # Format header row
    for cell in ws[1]:
        cell.font = header_font
        cell.alignment = alignment
        cell.fill = header_fill
    
    # Format data rows
    for row in ws.iter_rows(min_row=2):  # Start from second row
        for cell in row:
            cell.font = regular_font
            cell.alignment = alignment
    
    # Set all column widths to 12
    for column in ws.columns:
        column_letter = get_column_letter(column[0].column)
        ws.column_dimensions[column_letter].width = 12
    
    # Freeze the header row
    ws.freeze_panes = 'A2'
    
    # Add filters to header row
    ws.auto_filter.ref = ws.dimensions
    
    # Save the formatted workbook
    wb.save(output_file)
    logger.info(f"\nSaved formatted analysis results to {output_file}")


## Main Analysis

In [10]:
global logger
logger = setup_logging()

logger.info(f"Starting MODS analysis on directory: {XML_DIR}")
    
# Get XML files to process
xml_files = get_xml_files(XML_DIR, SAMPLE)
if not xml_files:
    logger.error("No XML files found")

# Calculate expected processing time (assuming ~0.1s per file)
estimated_hours = len(xml_files) * 0.1 / 3600
logger.info(f"Estimated processing time: {estimated_hours:.1f} hours")

# Process files and create DataFrame
df = process_xml_files(xml_files)

# Save final results
output_file = f"mods_analysis_final_{datetime.now().strftime('%Y%m%d_%H%M%S')}.xlsx"
save_formatted_excel(df, output_file)
logger.info(f"Saved final analysis results to {output_file}")

Starting MODS analysis on directory: mods_records
Found 885565 XML files in mods_records
Estimated processing time: 24.6 hours
Processed 10000/885565 files
Processed 20000/885565 files
Processed 30000/885565 files
Processed 40000/885565 files
Processed 50000/885565 files
Processed 60000/885565 files
Processed 70000/885565 files
Processed 80000/885565 files
Processed 90000/885565 files
Processed 100000/885565 files

Saved formatted analysis results to mods_analysis_chunk_1_20250214_135356.xlsx
Saved intermediate chunk 1 to mods_analysis_chunk_1_20250214_135356.xlsx
Processed 110000/885565 files
Processed 120000/885565 files
Processed 130000/885565 files
Processed 140000/885565 files
Processed 150000/885565 files
Processed 160000/885565 files
Processed 170000/885565 files
Processed 180000/885565 files
Processed 190000/885565 files
Processed 200000/885565 files

Saved formatted analysis results to mods_analysis_chunk_2_20250214_135722.xlsx
Saved intermediate chunk 2 to mods_analysis_chunk

PermissionError: [Errno 13] Permission denied: 'mods_analysis_final_20250214_142225.xlsx'