# Rebuilt Manifest Recomputation

### Imports

In [None]:
import os
import shutil
import json
import boto3
import pandas as pd
import git
from dask import bag as db
from dask import dataframe as dd
import numpy as np
from pathlib import Path
from time import strftime
import copy
from typing import Any

from impresso_commons.utils.s3 import (fixed_s3fs_glob, alternative_read_text, upload,
                                       get_storage_options, get_boto3_bucket, IMPRESSO_STORAGEOPT)
from impresso_commons.versioning.helpers import (DataStage, read_manifest_from_s3, 
                                                 validate_stage, clone_git_repo,
                                                 write_and_push_to_git, write_dump_to_fs)
from impresso_commons.path import parse_canonical_filename
from impresso_commons.path.path_fs import IssueDir
from impresso_commons.path.path_s3 import read_s3_issues, list_newspapers
from impresso_commons.versioning.data_statistics import NewspaperStatistics, POSSIBLE_GRANULARITIES
from impresso_commons.versioning.data_manifest import DataManifest
from impresso_commons.versioning.helpers import counts_for_rebuilt
from collections import defaultdict

## Functions

In [None]:
def compute_rebuilt_stats_for_manifest(
    s3_rebuilt_files: db.core.Bag,
) -> list[dict[str, Any]]:

    # all the rebuilt articles in the bag are form the same newspaper and year
    # define locally the nunique() aggregation function for dask
    def chunk(s):
        # The function applied to the individual partition (map)
        return s.apply(lambda x: list(set(x)))

    def agg(s):
        # The function which will aggregate the result from all the partitions (reduce)
        s = s._selected_obj
        return s.groupby(level=list(range(s.index.nlevels))).sum()

    def finalize(s):
        # The optional function that will be applied to the result of the agg_tu functions
        return s.apply(lambda x: len(set(x)))

    # aggregating function implementing np.nunique()
    tunique = dd.Aggregation("tunique", chunk, agg, finalize)

    # for each rebuilt content-item, fetch the information necessary to compute the statistics we seek
    rebuilt_count_df = (
        s3_rebuilt_files.map(lambda rf: counts_for_rebuilt(rf, include_np=True))
        .to_dataframe(meta={'np_id': str, 'year': str, 'issue_id': str,
                            'n_content_items': int, 'n_tokens': int})
        .persist()
    )

    # agggregate them at the scale of the entire corpus
    # first groupby title, year and issue to also count the individual issues present
    aggregated_df = (rebuilt_count_df
            #.groupby(by=['np_id', 'year', 'issue_id']) 
            #.agg({'n_content_items': sum, 'n_tokens': sum})
            #.reset_index()
            .groupby(by=['np_id', 'year']) 
            .agg({"issue_id": tunique, 'n_content_items': sum, 'n_tokens': sum})
            .rename(columns={'issue_id': 'issues', 'n_content_items': 'content_items_out', 'n_tokens': 'ft_tokens'})
            .reset_index()
    )

    print("Obtaining the yearly rebuilt statistics for the entire corpus")
    # return as a list of dicts
    return aggregated_df.to_bag(format='dict').compute()

## Rebuilt manifest of Run 2

### Information to initialize the Manifest object

In [None]:
s3_rebuilt_bucket = "rebuilt-staging"
#manifest_out_name = 'rebuilt_v0-0-1.json'

pycommons_repo = git.Repo('/home/piconti/impresso-pycommons')
mft_s3_input_bucket = 'canonical-staging' # bucket corresponding to the input data of the data currently in 'rebuilt-data'
mft_s3_output_bucket = 'rebuilt-staging' #'rebuilt-data' #'rebuilt-sandbox'
# there is no previous manifest
temp_dir = '/home/piconti/temp_rebuilt_v0-2-0'

rebuilt_mft_run_2 = DataManifest(
    data_stage = 'rebuilt', # DataStage.REBUILT also works
    s3_output_bucket = mft_s3_output_bucket,
    s3_input_bucket = mft_s3_input_bucket,
    git_repo = pycommons_repo,
    temp_dir = temp_dir,
    staging=True,
)

In [None]:
newspapers = [
    'arbeitgeber', 'handelsztg', 'actionfem', 'deletz1893', 'demitock', 'diekwochen', 
    'dunioun', 'indeplux', 'kommmit', 'lunion', 'luxembourg1935', 'luxland'
]

In [None]:
# fetch the rebuilt files
rebuilt_files_2 = []

for np in newspapers:
    rebuilt_files_2.extend(fixed_s3fs_glob(os.path.join(s3_rebuilt_bucket, np, '*.jsonl.bz2')))

rebuilt_bag = db.read_text(rebuilt_files_2, storage_options=IMPRESSO_STORAGEOPT).map(json.loads)

In [None]:
run_2_stats = compute_rebuilt_stats_for_manifest(rebuilt_bag)

In [None]:
rebuilt_stats = copy.deepcopy(run_2_stats)

print("Populating the manifest with the resulting yearly statistics...")
# populate the manifest with these statistics
for stats in rebuilt_stats:
    title = stats['np_id']
    year = stats['year']
    del stats["np_id"]
    del stats["year"]
    rebuilt_mft_run_2.add_by_title_year(title, year, stats)

print("Finalizing the manifest, and computing the result...")

note = f"Rebuilt of newspaper articles for {newspapers}."
rebuilt_mft_run_2.append_to_notes(note)
rebuilt_mft_run_2.compute(export_to_git_and_s3 = True)
#rebuilt_mft_run_2.validate_and_export_manifest(path_to_schema=schema_path, push_to_git=False)

In [None]:
rebuilt_mft_run_2.manifest_data

### Compute the statistics based on the current rebuilt data

In [None]:
print(f"Fetching the rebuilt files from the bucket s3://{s3_rebuilt_bucket}")

rebuilt_files = fixed_s3fs_glob(os.path.join(s3_rebuilt_bucket, '*.jsonl.bz2'))

print("Reading the contents of the fetched files...")
# lazy object, to comput once reduced
rebuilt_bag = db.read_text(rebuilt_files, storage_options=IMPRESSO_STORAGEOPT).map(json.loads)

# for each rebuilt content-item, fetch the information necessary to compute the statistics we seek
rebuilt_count_df = (
    rebuilt_bag.map(
        lambda ci: {
            "np_id": ci["id"].split('-')[0], 
            "year": ci["id"].split('-')[1], 
            "issue_id": '-'.join(ci['id'].split('-')[:-1]), # count the issues represented
            "n_content_items": 1, # each element of the bag corresponds to one content-item
            "n_tokens": len(ci['ft'].split()) if 'ft' in ci else 0 # split on spaces to count tokens
        }
    )
    .to_dataframe(meta={'np_id': str, 'year': str, 'issue_id': str,
                        'n_content_items': int, 'n_tokens': int})
    .persist()
)

# agggregate them at the scale of the entire corpus
# first groupby title, year and issue to also count the individual issues present
aggregated_df = (rebuilt_count_df
        .groupby(by=['np_id', 'year', 'issue_id']) 
        .agg({'n_content_items': sum, 'n_tokens': sum})
        .reset_index()
        .groupby(by=['np_id', 'year']) 
        .agg({'issue_id': 'count', 'n_content_items': sum, 'n_tokens': sum})
        .rename(columns={'issue_id': 'issues', 'n_content_items': 'content_items_out', 'n_tokens': 'ft_tokens'})
        .reset_index()
)

print("Obtaining the yearly rebuilt statistics for the entire corpus")
# return as a list of dicts
stats_as_dict = aggregated_df.to_bag(format='dict').compute()

In [None]:
rebuilt_stats = copy.deepcopy(stats_as_dict)

print("Populating the manifest with the resulting yearly statistics...")
# populate the manifest with these statistics
for stats in rebuilt_stats:
    title = stats['np_id']
    year = stats['year']
    del stats["np_id"]
    del stats["year"]
    rebuilt_mft_0.add_by_title_year(title, year, stats)

print("Finalizing the manifest, and computing the result...")

note = f"Initial Manifest computed retroactively on the rebuilt data present in the bucket s3://{s3_rebuilt_bucket}."
rebuilt_mft_0.append_to_notes(note)
rebuilt_mft_0.append_to_notes("test", False)
rebuilt_mft_0.compute(export_to_git_and_s3 = False)
rebuilt_mft_0.validate_and_export_manifest(path_to_schema=schema_path, push_to_git=False)