# Dev notebook for patching code

Related to issue [#117](https://github.com/impresso/impresso-text-acquisition/issues/117)

This notebook contains the code used to perform some of the simpler patches necessary on the canonical data. 
In particular patches n°1 and n°6:
- n°1: Adding a property `iiif_img_base_uri` at the top level of all page JSONs for a given set of journals, with the base uri of the iiif image API for the specific page. 
    - This patch concerns the journals `FedGazDe`, `FedGazFr` and `NZZ`.
- n°6: Adding a property `iiif_manifest_uri` at the top level of all issue JSONs for a given set of journals, with the uri to the specific issue's manifest in the IIIF presentation API. 
    - This patch concerns the journals `arbeitgeber`, `handelsztg`.

The result of these patches will be logged and documented in the manifest files created alongside these patches, and stored in the S3 as well as in the `impresso-data-release` GitHub repository.

### Imports

In [None]:
import os
import boto3
import json
import logging
import jsonlines
from impresso_commons.utils import s3
from impresso_commons.path.path_s3 import fetch_files, list_files, list_newspapers
from impresso_commons.utils.s3 import fixed_s3fs_glob
from impresso_commons.versioning.data_manifest import DataManifest
from text_importer.importers.core import upload_issues, upload_pages, remove_filelocks
from smart_open import open as smart_open_function
from impresso_commons.versioning.helpers import counts_for_canonical_issue
import dask.bag as db
from typing import Any, Callable
import git
from text_importer.utils import init_logger
import copy
from dask.distributed import Client
from filelock import FileLock
import shutil



In [None]:
IMPRESSO_STORAGEOPT = s3.get_storage_options()

In [None]:
logger = logging.getLogger()

## Functions

In [None]:
def add_property(object_dict: dict[str, Any], prop_name: str, prop_function: Callable[[str], str], function_input: str):
    object_dict[prop_name] = prop_function(function_input)
    logger.debug("%s -> Added property %s: %s", object_dict['id'], prop_name, object_dict[prop_name])
    return object_dict

In [None]:
def write_error(
    thing_id: str,
    origin_function: str,
    error: Exception, 
    failed_log: str
) -> None:
    """Write the given error of a failed import to the `failed_log` file.

    Args:
        thing (NewspaperIssue | NewspaperPage | IssueDir): Object for which
            the error occurred.
        error (Exception): Error that occurred and should be logged.
        failed_log (str): Path to log file for failed imports.
    """
    note = (
        f"Error in {origin_function} for {thing_id}: {error}"
    )

    logger.exception(note)

    with open(failed_log, "a+") as f:
        f.write(note + "\n")

In [None]:
def write_jsonlines_file(filepath: str, contents: str | list[str], content_type: str, failed_log: str | None = None) -> None:
    
    os.makedirs(os.path.dirname(filepath), exist_ok =True)

    # put a file lock to avoid the overwriting of files due to parallelization
    lock = FileLock(filepath + ".lock", timeout=13)

    try:
        with lock:
            with smart_open_function(filepath, 'ab') as fout:
                writer = jsonlines.Writer(fout)

                writer.write_all(contents)

                logger.info(f'Written {len(contents)} {content_type} to {filepath}')
                writer.close()
    except Exception as e:
        logger.error(f"Error for {filepath}")
        logger.exception(e)
        if failed_log is not None:
            write_error(os.path.basename(filepath), 'write_jsonlines_file()', e, failed_log)

In [None]:
def write_upload_issues(
    key: tuple[str, str],
    issues: list[dict[str, Any]],
    output_dir: str,
    bucket_name: str,
    failed_log: str | None = None
) -> tuple[str, str]:
    """Compress issues for a Journal-year in a json file and upload them to s3.

    The compressed ``.bz2`` output file is a JSON-line file, where each line
    corresponds to an individual issue document in the canonical format.

    Args:
        key (str): Hyphen separated Newspaper ID and year of input issues, e.g. `GDL-1900`.
        issues (list[dict[str, Any]]): A list of issues as dicts.
        output_dir (str): Local output directory.
        bucket_name (str): Name of S3 bucket where to upload the file.

    Returns:
        Tuple[str, str]: Label following the template `<NEWSPAPER>-<YEAR>` and 
            the path to the the compressed `.bz2` file.
    """
    newspaper, year = key
    filename = f'{newspaper}-{year}-issues.jsonl.bz2'
    filepath = os.path.join(output_dir, newspaper, filename)
    logger.info(f'Compressing {len(issues)} JSON files into {filepath}')

    write_jsonlines_file(filepath, issues, 'issues', failed_log)

    remove_filelocks(os.path.join(output_dir, newspaper))

    return upload_issues('-'.join(key), filepath, bucket_name)

In [None]:
def write_upload_pages(
    key: str,
    pages: list[dict[str, Any]],
    output_dir: str,
    bucket_name: str,
    failed_log: str | None = None,
    #uploaded_pages = UPLOADED_PAGES,
) -> tuple[str, tuple[bool, str]]:
    """Compress pages for a given edition in a json file and upload them to s3.

    The compressed ``.bz2`` output file is a JSON-line file, where each line
    corresponds to an individual page document in the canonical format.

    Args:
        key (str): Canonical ID of the newspaper issue (e.g. GDL-1900-01-02-a).
        pages (list[dict[str, Any]]): The list of pages for the provided key.
        output_dir (str): Local output directory.
        bucket_name (str): Name of S3 bucket where to upload the file.

    Returns:
        Tuple[str, str]: Label following the template `<NEWSPAPER>-<YEAR>` and 
            the path to the the compressed `.bz2` file.
    """
    newspaper, year, month, day, edition = key.split('-')
    filename = f'{key}-pages.jsonl.bz2'
    filepath = os.path.join(output_dir, newspaper, f'{newspaper}-{year}', filename)
    logger.info(f'Compressing {len(pages)} JSON files into {filepath}')
    
    #stat_key = f'{newspaper}-{year}'
    #if stat_key not in uploaded_pages:
    #    uploaded_pages[stat_key] = []
    #for p in pages:
    #    uploaded_pages[stat_key].append(p['id'])
        
    write_jsonlines_file(filepath, pages, 'pages', failed_log)

    remove_filelocks(os.path.dirname(filepath))

    return key, (upload_pages(key, filepath, bucket_name))

In [None]:
def to_pairs(pages: list[dict[str, Any]]) -> tuple[str, list[dict[str, Any]]]:
    issues_present = set()
    for page in pages:
        issue_id = '-'.join(page['id'].split('-')[:-1])
        issues_present.add(issue_id) 

    issues = list(issues_present)
    assert len(issues)==1, "there should only be one issue"
    return issues[0], pages

In [None]:
# adapted from https://github.com/impresso/impresso-data-sanitycheck/blob/master/sanity_check/contents/stats.py#L241
def canonical_stats_from_issue_bag(fetched_issues: db.core.Bag) -> list[dict[str, Any]]:
    """Computes number of issues and pages per newspaper from canonical data in s3.

    :param str s3_canonical_bucket: S3 bucket with canonical data.
    :return: A pandas DataFrame with newspaper ID as the index and columns `n_issues`, `n_pages`.
    :rtype: pd.DataFrame

    """
    pages_count_df = (
        fetched_issues.map(
            lambda i: {
                "np_id": i["id"].split('-')[0], 
                "year":i["id"].split('-')[1], 
                "id": i['id'], 
                "issue_id": i['id'], 
                "n_pages": len(set(i['pp'])),
                "n_content_items": len(i['i']),
                "n_images": len([item for item in i['i'] if item['m']['tp']=='image'])
            }
        )
        .to_dataframe(meta={'np_id': str, 'year': str, 
                            'id': str, 'issue_id': str, 
                            "n_pages": int, 'n_images': int,
                            'n_content_items': int})
        .set_index('id')
        .persist()
    )

    # cum the counts for all values collected
    aggregated_df = (pages_count_df
            .groupby(by=['np_id', 'year'])
            .agg({"n_pages": sum, 'issue_id': 'count', 'n_content_items': sum, 'n_images': sum})
            .rename(columns={'issue_id': 'issues', 'n_pages': 'pages', 
                             'n_content_items': 'content_items_out', 'n_images':'images'})
            .reset_index()
    )

    # return as a list of dicts
    return aggregated_df.to_bag(format='dict').compute()

In [None]:
def process_pages_of_issue(
    key: str, 
    pages: list[dict[str, Any]],
    manifest: DataManifest,
    issue_stats: list[dict],
    failed_log: str | None = None 
) -> tuple[bool, str]:
    newspaper, year, month, day, edition = key.split('-')

    if not manifest.has_title_year_key(newspaper, year):
        current_stats = [d for d in issue_stats if d['np_id']==newspaper and d['year']==year][0]
        # reduce the number of stats to consider at each step
        issue_stats.remove(current_stats)
        # remove unwanted keys from the dict
        del current_stats['np_id']
        del current_stats['year']
        success = manifest.replace_by_title_year(newspaper, year, current_stats)
        if not success:
            logger.warning("Problem encountered when trying to add %s for %s-%s", current_stats, newspaper, year)

    key, filepath = write_upload_pages(key, pages, manifest.temp_dir, manifest.output_bucket_name, failed_log)

    return key, (filepath, manifest)
        

In [None]:

def empty_folder(dir_path: str):
    if os.path.exists(dir_path):
        shutil.rmtree(dir_path)
        logger.info("Emptied directory at %s", dir_path)
    os.mkdir(dir_path)
    

# SWA - Patch 6

The patch consists of adding a new `iiif_manifest_uri` property mapping to the IIIF presentation API for the given issue.

In [None]:
# initialize values for patch
SWA_TITLES = ['arbeitgeber', 'handelsztg']
SWA_IIIF_BASE_URI = 'https://ub-iiifpresentation.ub.unibas.ch/impresso_sb'
PROP_NAME = 'iiif_manifest_uri'

error_log = '/home/piconti/impresso-text-acquisition/text_importer/data/patch_logs/patch_6_swa_errors.log'

init_logger(logger, logging.DEBUG, '/home/piconti/impresso-text-acquisition/text_importer/data/patch_logs/patch_6_swa.log')
logger.info("Patching titles %s: adding %s property at issue level", SWA_TITLES, PROP_NAME)

In [None]:
# define patch function
def swa_manifest_uri(issue_id: str, swa_iiif: str = SWA_IIIF_BASE_URI) -> str:
    """
    https://ub-iiifpresentation.ub.unibas.ch/impresso_sb/[issue canonical ID]-issue/manifest
    """
    return os.path.join(swa_iiif, '-'.join([issue_id, 'issue']), 'manifest')

In [None]:
# initialise manifest to keep track of updates
canonical_repo = git.Repo('/home/piconti/impresso-text-acquisition')
s3_input_bucket = 'canonical-data'
s3_output_bucket = 'canonical-staging'
# previous manifest is not in the output bucket --> provide it as argument
previous_manifest_path = 's3://canonical-data/canonical_v0-0-1.json' 
temp_dir = '/scratch/piconti/impresso/patches_temp'
patched_fields=[PROP_NAME]
schema_path = '/home/piconti/impresso-text-acquisition/text_importer/impresso-schemas/json/versioning/manifest.schema.json'

# empty the temp folder before starting processing to prevent duplication of content inside the files.
empty_folder(temp_dir)

swa_patch_6_manifest = DataManifest(
    data_stage = 'canonical',
    s3_output_bucket = s3_output_bucket,
    s3_input_bucket = s3_input_bucket,
    git_repo = canonical_repo,
    temp_dir = temp_dir,
    patched_fields=patched_fields,
    previous_mft_path = previous_manifest_path
)

Perform the patch, tracking updates and upload results

In [None]:
# download the issues of interest for this patch
swa_issues, _ = fetch_files('canonical-data', True, type='issues', newspapers_filter=SWA_TITLES)

In [None]:

# patch them keeping track of the data that's been modified
yearly_patched_issues = {}

for issue in swa_issues:
    # key is title-year
    title, year = issue['id'].split('-')[:2]
    key = '-'.join([title, year])
    if key in yearly_patched_issues:
        yearly_patched_issues[key].append(add_property(issue, PROP_NAME, swa_manifest_uri, issue['id']))
    else:
        yearly_patched_issues[key] = [add_property(issue, PROP_NAME, swa_manifest_uri, issue['id'])]
    
    success= swa_patch_6_manifest.add_by_title_year(title, year, counts_for_canonical_issue(issue))
    if not success:
        print("counts not added for %s-%s", title, year)

# write and upload the updated issues to s3
for key, issues in yearly_patched_issues.items():
    write_upload_issues(key.split('-'), issues, temp_dir, s3_output_bucket, error_log)

# finalize the manifest and export it
note = f"Patching titles {SWA_TITLES}: adding {PROP_NAME} property at issue level"
swa_patch_6_manifest.append_to_notes(note)
swa_patch_6_manifest.compute(export_to_git_and_s3 = False)
swa_patch_6_manifest.validate_and_export_manifest(path_to_schema=schema_path, push_to_git=True)
    

# FedGaz + NZZ – Patch 1

The patch consists of adding a new `iiif_img_base_uri` property mapping to the base uri of the IIIF image API for the given page.

In [None]:
client = Client(n_workers=16, threads_per_worker=2)
client

In [None]:
# initialize values for patch
UZH_TITLES = ['FedGazDe', 'FedGazFr', 'NZZ']
IMPRESSO_IIIF_BASE_URI = "https://impresso-project.ch/api/proxy/iiif/"
PROP_NAME = 'iiif_img_base_uri'
UPLOADED_PAGES = {}

error_log = '/home/piconti/impresso-text-acquisition/text_importer/data/patch_logs/patch_1_fedgaz_errors.log'

init_logger(logger, logging.INFO, '/home/piconti/impresso-text-acquisition/text_importer/data/patch_logs/patch_1_fedgaz_nzz.log')
logger.info("Patching titles %s: adding %s property at page level", UZH_TITLES, PROP_NAME)

In [None]:
# define patch function
def uzh_image_base_uri(page_id: str, impresso_iiif: str = IMPRESSO_IIIF_BASE_URI) -> str:
    """
    https://impresso-project.ch/api/proxy/iiif/[page canonical ID]
    """
    return os.path.join(impresso_iiif, page_id)

In [None]:
# initialise manifest to keep track of updates
canonical_repo = git.Repo('/home/piconti/impresso-text-acquisition')
s3_input_bucket = 'canonical-data'
s3_output_bucket = 'canonical-staging' #'canonical-sandbox'
# previous manifest is not in the output bucket --> provide it as argument
previous_manifest_path = 's3://canonical-staging/canonical_v0-0-2.json' 
temp_dir = '/scratch/piconti/impresso/patches_temp'
patched_fields=[PROP_NAME]
schema_path = '/home/piconti/impresso-text-acquisition/text_importer/impresso-schemas/json/versioning/manifest.schema.json'

empty_folder(temp_dir)

nzz_patch_1_manifest = DataManifest(
    data_stage = 'canonical',
    s3_output_bucket = s3_output_bucket,
    s3_input_bucket = s3_input_bucket,
    git_repo = canonical_repo,
    temp_dir = temp_dir,
    patched_fields=patched_fields,
    previous_mft_path = previous_manifest_path
)

Perform the patch, tracking updates and upload results

In [None]:
logger.info("Fetiching the page and issues files form S3...")
# download the issues of interest for this patch
uzh_issues, uzh_pages = fetch_files('canonical-data', False, 'both', UZH_TITLES)

# compute the statistics that correspond to this
logger.info("Computing the canonical statistics on the issues...")
stats_from_issues = canonical_stats_from_issue_bag(uzh_issues)

In [None]:
logger.info("Updating the page files and uploading them to s3...")
# patch the pages and write them back to s3.
uzh_patched_pages = (
    uzh_pages
        .map_partitions(
            lambda pages: [add_property(p, PROP_NAME, uzh_image_base_uri, p['id']) for p in pages]
        )
        .map_partitions(to_pairs)
        .map_partitions(
            lambda issue: write_upload_pages(   
                issue[0], issue[1],
                output_dir=temp_dir,
                bucket_name=s3_output_bucket,
                failed_log=error_log,
            )
        )
).compute()

In [None]:
issue_stats = copy.deepcopy(stats_from_issues)

logger.info("Done uploading the page files to s3, filling in the manifest...")
# fill in the manifest statistics and prepare issues to be uploaded to their new s3 bucket.
issues_with_patched_pages = {}
for issue_id, (success, path) in zip(uzh_patched_pages[::2], uzh_patched_pages[1::2]): #uzh_patched_pages:
    title, year, month, day, edition = issue_id.split('-')
    
    if success:
        if not nzz_patch_1_manifest.has_title_year_key(title, year):
            current_stats = [d for d in issue_stats if d['np_id']==title and d['year']==year][0]
            # reduce the number of stats to consider at each step
            issue_stats.remove(current_stats)
            # remove unwanted keys from the dict
            del current_stats['np_id']
            del current_stats['year']

            #if len(UPLOADED_PAGES['-'.join([title, year])]) != current_stats['pages']:
            #    logger.warning("Mismatch in the number of pages for %s-%s", title, year)
            #    print("!!!! Mismatch in the number of pages for %s-%s", title, year)
            add_ok = nzz_patch_1_manifest.replace_by_title_year(title, year, current_stats)

        # if patching and addition to manifest was successful, the issue can be copied to the new bucket
        specific_issue = [i for i in uzh_issues if i['id']==issue_id]
        
        assert len(specific_issue) == 1, f"More than one issue had the exact issue id: {issue_id}"

        key = '-'.join([title, year])
        if key not in issues_with_patched_pages:
            issues_with_patched_pages[key] = specific_issue
        else:
            issues_with_patched_pages[key].extend(specific_issue)
    elif not success:
        logger.warning("The pages for issue %s were not correctly uploaded", issue_id)

logger.info("Uploading the issue files to the new bucket")
# write and upload the issues to the new s3 bucket
for key, issues in issues_with_patched_pages.items():
    success, issue_path = write_upload_issues(key.split('-'), issues, temp_dir, s3_output_bucket, error_log)
    if not success:
        logger.warning("The copy of issues %s had a problem", key)

logger.info("Finalizing, computing and exporting the manifest")
# finalize the manifest and export it
note = f"Patching titles {UZH_TITLES}: adding {PROP_NAME} property at page level"
nzz_patch_1_manifest.append_to_notes(note)
nzz_patch_1_manifest.compute(export_to_git_and_s3 = False)
nzz_patch_1_manifest.validate_and_export_manifest(path_to_schema=schema_path, push_to_git=True)