# Debug manifest aggregation functions

## Imports

In [1]:
from impresso_essentials.versioning.aggregators import tunique
from impresso_essentials.versioning.data_manifest import DataManifest
from impresso_essentials.versioning.helpers import validate_stage, DataStage
from impresso_essentials.versioning import compute_manifest
import git
import dask.bag as db
import dask.dataframe as dd
from impresso_essentials.io.s3 import fixed_s3fs_glob, IMPRESSO_STORAGEOPT
from impresso_essentials.utils import KNOWN_JOURNALS
import json
import os
from typing import Any
from ast import literal_eval
from collections import Counter
from tqdm import tqdm
import pandas as pd

## 1. Agregators for text-reuse (from text-reuse passages)

In [4]:
manifest_config = {
    "data_stage": "text-reuse",
    "output_bucket": "41-processed-data-staging/text-reuse/text-reuse_v1-0-0/passim_output_run_2/tr_passages",
    "input_bucket": "s3://31-passim-rebuilt-staging/passim",
    "git_repository": "/home/piconti/impresso-passim",
    "newspapers": [],
    "temp_directory": "/scratch/piconti/impresso/git_temp_folder",
    "previous_mft_s3_path": "",
    "is_staging": True,
    "is_patch": False,
    "patched_fields": [],
    "push_to_git": False,
    "only_counting": False,
    "notes": "Computing the manifest for text-reuse passages.",
    "file_extensions": "jsonl.bz2",
    "compute_altogether": True
}

passages_s3_path = "s3://41-processed-data-staging/text-reuse/text-reuse_v1-0-0/passim_output_run_2/tr_passages/"

In [5]:
s3_files = compute_manifest.get_files_to_consider(manifest_config)
s3_files

{'000000.jsonl.bz2': ['s3://41-processed-data-staging/text-reuse/text-reuse_v1-0-0/passim_output_run_2/tr_passages/000000.jsonl.bz2'],
 '000001.jsonl.bz2': ['s3://41-processed-data-staging/text-reuse/text-reuse_v1-0-0/passim_output_run_2/tr_passages/000001.jsonl.bz2'],
 '000002.jsonl.bz2': ['s3://41-processed-data-staging/text-reuse/text-reuse_v1-0-0/passim_output_run_2/tr_passages/000002.jsonl.bz2'],
 '000003.jsonl.bz2': ['s3://41-processed-data-staging/text-reuse/text-reuse_v1-0-0/passim_output_run_2/tr_passages/000003.jsonl.bz2'],
 '000004.jsonl.bz2': ['s3://41-processed-data-staging/text-reuse/text-reuse_v1-0-0/passim_output_run_2/tr_passages/000004.jsonl.bz2'],
 '000005.jsonl.bz2': ['s3://41-processed-data-staging/text-reuse/text-reuse_v1-0-0/passim_output_run_2/tr_passages/000005.jsonl.bz2'],
 '000006.jsonl.bz2': ['s3://41-processed-data-staging/text-reuse/text-reuse_v1-0-0/passim_output_run_2/tr_passages/000006.jsonl.bz2'],
 '000007.jsonl.bz2': ['s3://41-processed-data-staging/t

In [6]:
len(s3_files)

4323

In [13]:
repo = git.Repo(manifest_config["git_repository"])
stage = validate_stage(manifest_config["data_stage"])
stage

<DataStage.TEXT_REUSE: 'text-reuse'>

In [14]:
manifest = DataManifest(
    data_stage=stage,
    s3_output_bucket=manifest_config["output_bucket"],
    s3_input_bucket=manifest_config["input_bucket"],
    git_repo=repo,
    temp_dir=manifest_config["temp_directory"],
    staging=manifest_config["is_staging"],
    is_patch=manifest_config["is_patch"],
    only_counting=manifest_config["only_counting"],
)
manifest

<impresso_essentials.versioning.data_manifest.DataManifest at 0x7f573dd23fe0>

### Implement the aggregator for the text-reuse

In [17]:
first_file = '000000.jsonl.bz2'
first_s3_path = s3_files[first_file]
first_s3_path

['s3://41-processed-data-staging/text-reuse/text-reuse_v1-0-0/passim_output_run_2/tr_passages/000000.jsonl.bz2']

In [20]:
processed_files = db.read_text(
        first_s3_path, storage_options=IMPRESSO_STORAGEOPT
    ).map(json.loads)

In [21]:
processed_files.take(1)

({'id': 'c101-oecaen-1940-10-16-a-i0062@759:1352',
  'begin': 759,
  'ci_id': 'oecaen-1940-10-16-a-i0062',
  'cluster_id': 'tr-all-v1-24-c101',
  'date': '1940-10-16',
  'end': 1352,
  'pages': [{'id': 'oecaen-1940-10-16-a-p0004',
    'seq': 4,
    'regions': [{'start': 759,
      'length': 593,
      'coords': {'x': 2405, 'y': 1707, 'w': 549, 'h': 407}}]}],
  'cluster_size': 217,
  'text': 'N. 5% 38. 100,90.\nObligations Trésor 4 1/2 1933. 925 ;\nobligations Trésor 4% 1934. 920 ; obli-\ngations Trésor 5% 1935. 980 ; obliga-\ntions Trésor 4% 1936 B. 130 ; Outil-\nlage National 4 1/2 1932, 900 ; P. T. T.\n4 1/2 1929, 503 ; P. T. T. 5% 1934-35,\n995 ; P. T. T. 6% 1938, 1016 ; Caisse\nAutonome 4 1/2 29. 857.\nBons du Trésor 5% 1934, 1005 ; bons\ndu Trésor 4 1/2 1934, 985 ; bons du\nTrésor 4% 1935, 1035 ; bons du Trésor\n5% septembre 1937, 1005 ; oons du\nTrésor 5% décembre 1937. 1026 : bons\ndu Trésor 5 1/2 1938. 1020 ; Crédit Na-\ntional 5% 1919, 515 ; Crédit National\n5% 1920. 502 ; Cré

In [23]:
count_df = (
    processed_files.map(
        lambda passage: {
            "np_id": passage["ci_id"].split("-")[0],
            "year": passage["ci_id"].split("-")[1],
            "issues": "-".join(passage["id"].split("-")[:-1]),
            "content_items_out": passage["ci_id"],
            "text_reuse_passages": 1,
            "text_reuse_clusters": passage["cluster_id"]
        }
    ).to_dataframe(
        meta={
            "np_id": str,
            "year": str,
            "issues": str,
            "content_items_out": str,
            "text_reuse_passages": int,
            "text_reuse_clusters": str,
        }
    )
    # .explode("ne_entities")
    .persist()
)
count_df.head()

Unnamed: 0,np_id,year,issues,content_items_out,text_reuse_passages,text_reuse_clusters
0,oecaen,1940,c101-oecaen-1940-10-16-a,oecaen-1940-10-16-a-i0062,1,tr-all-v1-24-c101
1,oecaen,1940,c101-oecaen-1940-11-15-a,oecaen-1940-11-15-a-i0104,1,tr-all-v1-24-c101
2,oecaen,1940,c101-oecaen-1940-11-16-a,oecaen-1940-11-16-a-i0081,1,tr-all-v1-24-c101
3,oecaen,1940,c101-oecaen-1940-11-27-a,oecaen-1940-11-27-a-i0030,1,tr-all-v1-24-c101
4,oecaen,1940,c101-oecaen-1940-11-28-a,oecaen-1940-11-28-a-i0080,1,tr-all-v1-24-c101


In [25]:
aggregated_df = (
    count_df.groupby(by=["np_id", "year"])
    .agg(
        {
            "issues": tunique,
            "content_items_out": tunique,
            "text_reuse_passages": sum,
            "text_reuse_clusters": tunique,
        }
    )
    .reset_index()
).persist()
aggregated_df.head()

Unnamed: 0,np_id,year,issues,content_items_out,text_reuse_passages,text_reuse_clusters
0,BNN,1892,1,1,1,1
1,CDV,1843,1,1,1,1
2,CDV,1844,1,1,1,1
3,CDV,1849,1,1,1,1
4,CDV,1851,1,1,1,1


In [37]:
len(count_df[count_df['np_id']=='oecaen']), count_df[count_df['np_id']=='oecaen'].head()

(154,
     np_id  year                    issues          content_items_out  \
 0  oecaen  1940  c101-oecaen-1940-10-16-a  oecaen-1940-10-16-a-i0062   
 1  oecaen  1940  c101-oecaen-1940-11-15-a  oecaen-1940-11-15-a-i0104   
 2  oecaen  1940  c101-oecaen-1940-11-16-a  oecaen-1940-11-16-a-i0081   
 3  oecaen  1940  c101-oecaen-1940-11-27-a  oecaen-1940-11-27-a-i0030   
 4  oecaen  1940  c101-oecaen-1940-11-28-a  oecaen-1940-11-28-a-i0080   
 
    text_reuse_passages text_reuse_clusters  
 0                    1   tr-all-v1-24-c101  
 1                    1   tr-all-v1-24-c101  
 2                    1   tr-all-v1-24-c101  
 3                    1   tr-all-v1-24-c101  
 4                    1   tr-all-v1-24-c101  )

In [32]:
aggregated_df[aggregated_df['np_id']=='oecaen'].head(15)

Unnamed: 0,np_id,year,issues,content_items_out,text_reuse_passages,text_reuse_clusters
1600,oecaen,1912,6,6,6,6
1601,oecaen,1913,1,1,1,1
1602,oecaen,1914,2,2,2,2
1603,oecaen,1916,1,1,1,1
1604,oecaen,1917,2,2,2,2
1605,oecaen,1918,2,2,2,2
1606,oecaen,1919,2,2,2,2
1607,oecaen,1921,3,3,3,3
1608,oecaen,1922,2,2,2,2
1609,oecaen,1923,5,5,5,5


In [40]:
aggregated_df[aggregated_df['np_id']=='oecaen']['issues'].sum().compute()

np.int64(154)

In [26]:
agg_bag = aggregated_df.to_bag(format="dict").compute()
agg_bag

[{'np_id': 'BNN',
  'year': '1892',
  'issues': 1,
  'content_items_out': 1,
  'text_reuse_passages': 1,
  'text_reuse_clusters': 1},
 {'np_id': 'CDV',
  'year': '1843',
  'issues': 1,
  'content_items_out': 1,
  'text_reuse_passages': 1,
  'text_reuse_clusters': 1},
 {'np_id': 'CDV',
  'year': '1844',
  'issues': 1,
  'content_items_out': 1,
  'text_reuse_passages': 1,
  'text_reuse_clusters': 1},
 {'np_id': 'CDV',
  'year': '1849',
  'issues': 1,
  'content_items_out': 1,
  'text_reuse_passages': 1,
  'text_reuse_clusters': 1},
 {'np_id': 'CDV',
  'year': '1851',
  'issues': 1,
  'content_items_out': 1,
  'text_reuse_passages': 1,
  'text_reuse_clusters': 1},
 {'np_id': 'CDV',
  'year': '1853',
  'issues': 2,
  'content_items_out': 2,
  'text_reuse_passages': 2,
  'text_reuse_clusters': 2},
 {'np_id': 'CL',
  'year': '1885',
  'issues': 2,
  'content_items_out': 2,
  'text_reuse_passages': 2,
  'text_reuse_clusters': 2},
 {'np_id': 'CON',
  'year': '1902',
  'issues': 2,
  'content_i

In [27]:
for d in agg_bag:
    if 'oecaen' in d['np_id']:
        print(d)

{'np_id': 'oecaen', 'year': '1912', 'issues': 6, 'content_items_out': 6, 'text_reuse_passages': 6, 'text_reuse_clusters': 6}
{'np_id': 'oecaen', 'year': '1913', 'issues': 1, 'content_items_out': 1, 'text_reuse_passages': 1, 'text_reuse_clusters': 1}
{'np_id': 'oecaen', 'year': '1914', 'issues': 2, 'content_items_out': 2, 'text_reuse_passages': 2, 'text_reuse_clusters': 2}
{'np_id': 'oecaen', 'year': '1916', 'issues': 1, 'content_items_out': 1, 'text_reuse_passages': 1, 'text_reuse_clusters': 1}
{'np_id': 'oecaen', 'year': '1917', 'issues': 2, 'content_items_out': 2, 'text_reuse_passages': 2, 'text_reuse_clusters': 2}
{'np_id': 'oecaen', 'year': '1918', 'issues': 2, 'content_items_out': 2, 'text_reuse_passages': 2, 'text_reuse_clusters': 2}
{'np_id': 'oecaen', 'year': '1919', 'issues': 2, 'content_items_out': 2, 'text_reuse_passages': 2, 'text_reuse_clusters': 2}
{'np_id': 'oecaen', 'year': '1921', 'issues': 3, 'content_items_out': 3, 'text_reuse_passages': 3, 'text_reuse_clusters': 3}


### apply logic to more files

In [None]:
for filename, s3_filepath in s3_files.items():

    processed_files = db.read_text(
        s3_filepath, storage_options=IMPRESSO_STORAGEOPT
    ).map(json.loads)



In [2]:
input_bucket_name = "s3://31-passim-rebuilt-staging"

in_bucket = input_bucket_name.replace("s3://", "")
split_path = in_bucket.split('/')
split_path[0],'/'.join(split_path[1:])

('31-passim-rebuilt-staging', '')

In [11]:
s3_fs = [j for part_j in s3_files.values() for j in part_j]
s3_fs

['s3://41-processed-data-staging/text-reuse/text-reuse_v1-0-0/passim_output_run_2/tr_passages/000000.jsonl.bz2',
 's3://41-processed-data-staging/text-reuse/text-reuse_v1-0-0/passim_output_run_2/tr_passages/000001.jsonl.bz2',
 's3://41-processed-data-staging/text-reuse/text-reuse_v1-0-0/passim_output_run_2/tr_passages/000002.jsonl.bz2',
 's3://41-processed-data-staging/text-reuse/text-reuse_v1-0-0/passim_output_run_2/tr_passages/000003.jsonl.bz2',
 's3://41-processed-data-staging/text-reuse/text-reuse_v1-0-0/passim_output_run_2/tr_passages/000004.jsonl.bz2',
 's3://41-processed-data-staging/text-reuse/text-reuse_v1-0-0/passim_output_run_2/tr_passages/000005.jsonl.bz2',
 's3://41-processed-data-staging/text-reuse/text-reuse_v1-0-0/passim_output_run_2/tr_passages/000006.jsonl.bz2',
 's3://41-processed-data-staging/text-reuse/text-reuse_v1-0-0/passim_output_run_2/tr_passages/000007.jsonl.bz2',
 's3://41-processed-data-staging/text-reuse/text-reuse_v1-0-0/passim_output_run_2/tr_passages/00

In [12]:
processed_files = db.read_text(
    s3_fs, storage_options=IMPRESSO_STORAGEOPT
).map(json.loads)

In [13]:
processed_files.take(1)

({'id': 'c101-oecaen-1940-10-16-a-i0062@759:1352',
  'begin': 759,
  'ci_id': 'oecaen-1940-10-16-a-i0062',
  'cluster_id': 'tr-all-v1-24-c101',
  'date': '1940-10-16',
  'end': 1352,
  'pages': [{'id': 'oecaen-1940-10-16-a-p0004',
    'seq': 4,
    'regions': [{'start': 759,
      'length': 593,
      'coords': {'x': 2405, 'y': 1707, 'w': 549, 'h': 407}}]}],
  'cluster_size': 217,
  'text': 'N. 5% 38. 100,90.\nObligations Trésor 4 1/2 1933. 925 ;\nobligations Trésor 4% 1934. 920 ; obli-\ngations Trésor 5% 1935. 980 ; obliga-\ntions Trésor 4% 1936 B. 130 ; Outil-\nlage National 4 1/2 1932, 900 ; P. T. T.\n4 1/2 1929, 503 ; P. T. T. 5% 1934-35,\n995 ; P. T. T. 6% 1938, 1016 ; Caisse\nAutonome 4 1/2 29. 857.\nBons du Trésor 5% 1934, 1005 ; bons\ndu Trésor 4 1/2 1934, 985 ; bons du\nTrésor 4% 1935, 1035 ; bons du Trésor\n5% septembre 1937, 1005 ; oons du\nTrésor 5% décembre 1937. 1026 : bons\ndu Trésor 5 1/2 1938. 1020 ; Crédit Na-\ntional 5% 1919, 515 ; Crédit National\n5% 1920. 502 ; Cré

In [22]:
print(KNOWN_JOURNALS[0])
filtered = processed_files.filter(lambda x: KNOWN_JOURNALS[0] in x['ci_id']).persist()

oecaen


In [25]:
filtered.take(10)

({'id': 'c101-oecaen-1940-10-16-a-i0062@759:1352',
  'begin': 759,
  'ci_id': 'oecaen-1940-10-16-a-i0062',
  'cluster_id': 'tr-all-v1-24-c101',
  'date': '1940-10-16',
  'end': 1352,
  'pages': [{'id': 'oecaen-1940-10-16-a-p0004',
    'seq': 4,
    'regions': [{'start': 759,
      'length': 593,
      'coords': {'x': 2405, 'y': 1707, 'w': 549, 'h': 407}}]}],
  'cluster_size': 217,
  'text': 'N. 5% 38. 100,90.\nObligations Trésor 4 1/2 1933. 925 ;\nobligations Trésor 4% 1934. 920 ; obli-\ngations Trésor 5% 1935. 980 ; obliga-\ntions Trésor 4% 1936 B. 130 ; Outil-\nlage National 4 1/2 1932, 900 ; P. T. T.\n4 1/2 1929, 503 ; P. T. T. 5% 1934-35,\n995 ; P. T. T. 6% 1938, 1016 ; Caisse\nAutonome 4 1/2 29. 857.\nBons du Trésor 5% 1934, 1005 ; bons\ndu Trésor 4 1/2 1934, 985 ; bons du\nTrésor 4% 1935, 1035 ; bons du Trésor\n5% septembre 1937, 1005 ; oons du\nTrésor 5% décembre 1937. 1026 : bons\ndu Trésor 5 1/2 1938. 1020 ; Crédit Na-\ntional 5% 1919, 515 ; Crédit National\n5% 1920. 502 ; Cré

In [24]:
mapped = processed_files.map(lambda x: (x['ci_id'].split('-')[0], x)).persist()
mapped.take(4)

Exception ignored in: <bound method IPythonKernel._clean_thread_parent_frames of <ipykernel.ipkernel.IPythonKernel object at 0x7f2ef2fcdca0>>
Traceback (most recent call last):
  File "/scratch/piconti/.conda/envs/essentials/lib/python3.12/site-packages/ipykernel/ipkernel.py", line 775, in _clean_thread_parent_frames
    def _clean_thread_parent_frames(

KeyboardInterrupt: 


KeyboardInterrupt: 

In [28]:
passage_id = 'oecaen-1940-10-16-a-i0062'
"-".join(passage_id.split("-")[:-1])

'oecaen-1940-10-16-a'

In [29]:
count_df = (
    filtered.map(
        lambda passage: {
            "np_id": passage["ci_id"].split("-")[0],
            "year": passage["ci_id"].split("-")[1],
            "issues": "-".join(passage["ci_id"].split("-")[:-1]),
            "content_items_out": passage["ci_id"],
            "text_reuse_passages": 1,
            "text_reuse_clusters": passage["cluster_id"]
        }
    ).to_dataframe(
        meta={
            "np_id": str,
            "year": str,
            "issues": str,
            "content_items_out": str,
            "text_reuse_passages": int,
            "text_reuse_clusters": str,
        }
    )
    .persist()
)
count_df

Unnamed: 0_level_0,np_id,year,issues,content_items_out,text_reuse_passages,text_reuse_clusters
npartitions=4323,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1,Unnamed: 6_level_1
,string,string,string,string,int64,string
,...,...,...,...,...,...
...,...,...,...,...,...,...
,...,...,...,...,...,...
,...,...,...,...,...,...


In [33]:
aggregated_df = (
    count_df.groupby(by=["np_id", "year"])
    .agg(
        {
            "issues": tunique,
            "content_items_out": tunique,
            "text_reuse_passages": sum,
            "text_reuse_clusters": tunique,
        }
    )
    .reset_index()
    .sort_values('year')
).persist()

In [34]:
aggregated_df.head(npartitions=-1)

Unnamed: 0,np_id,year,issues,content_items_out,text_reuse_passages,text_reuse_clusters
0,oecaen,1912,290,4260,9087,8245
0,oecaen,1913,351,5670,12300,11158
0,oecaen,1914,344,5291,11782,11355
0,oecaen,1915,356,6666,14641,14369
0,oecaen,1916,353,6972,16154,15736


In [32]:
aggregated_df.sort_values('year').head(npartitions=-1)

Unnamed: 0,np_id,year,issues,content_items_out,text_reuse_passages,text_reuse_clusters
0,oecaen,1912,290,4260,9087,8245
0,oecaen,1913,351,5670,12300,11158
0,oecaen,1914,344,5291,11782,11355
0,oecaen,1915,356,6666,14641,14369
0,oecaen,1916,353,6972,16154,15736


## 2. Agregators for topics

In [9]:
manifest_config = {
    "data_stage": "topics",
    "output_bucket": "41-processed-data-staging/topics/topics-tm-mallet_infer_seed42_v2.0.1-multilingual_v2-0-1",
    "input_bucket": "s3://22-rebuilt-final",
    "git_repository": "/home/piconti/impresso-essentials",
    "newspapers": [],
    "temp_directory": "/scratch/piconti/impresso/git_temp_folder",
    "previous_mft_s3_path": "",
    "is_staging": True,
    "is_patch": False,
    "patched_fields": [],
    "push_to_git": False,
    "relative_git_path": "",
    "only_counting": False,
    "model_id": "topics-tm-mallet_infer_seed42_v2.0.1",
    "run_id": "topics-tm-mallet_infer_seed42_v2.0.1-multilingual_v2-0-1",
    "notes": "Debug manifest on the topics - would need to be recomputed with the correct git repo.",
    "file_extensions": "jsonl.bz2",
    "compute_altogether": False
}

topics_s3_path = "s3://41-processed-data-staging/topics/topics-tm-mallet_infer_seed42_v2.0.1-multilingual_v2-0-1/"

In [3]:
s3_files = compute_manifest.get_files_to_consider(manifest_config)
s3_files

{'ACI': ['s3://41-processed-data-staging/topics/topics-tm-mallet_infer_seed42_v2.0.1-multilingual_v2-0-1/ACI/ACI-1832.jsonl.bz2'],
 'AV': ['s3://41-processed-data-staging/topics/topics-tm-mallet_infer_seed42_v2.0.1-multilingual_v2-0-1/AV/AV-1880.jsonl.bz2',
  's3://41-processed-data-staging/topics/topics-tm-mallet_infer_seed42_v2.0.1-multilingual_v2-0-1/AV/AV-1881.jsonl.bz2',
  's3://41-processed-data-staging/topics/topics-tm-mallet_infer_seed42_v2.0.1-multilingual_v2-0-1/AV/AV-1886.jsonl.bz2'],
 'BDC': ['s3://41-processed-data-staging/topics/topics-tm-mallet_infer_seed42_v2.0.1-multilingual_v2-0-1/BDC/BDC-1839.jsonl.bz2'],
 'BLB': ['s3://41-processed-data-staging/topics/topics-tm-mallet_infer_seed42_v2.0.1-multilingual_v2-0-1/BLB/BLB-1845.jsonl.bz2',
  's3://41-processed-data-staging/topics/topics-tm-mallet_infer_seed42_v2.0.1-multilingual_v2-0-1/BLB/BLB-1846.jsonl.bz2',
  's3://41-processed-data-staging/topics/topics-tm-mallet_infer_seed42_v2.0.1-multilingual_v2-0-1/BLB/BLB-1847.json

In [4]:
repo = git.Repo(manifest_config["git_repository"])
stage = validate_stage(manifest_config["data_stage"])
stage

<DataStage.TOPICS: 'topics'>

In [10]:
manifest = DataManifest(
    data_stage=stage,
    s3_output_bucket=manifest_config["output_bucket"],
    s3_input_bucket=manifest_config["input_bucket"],
    git_repo=repo,
    temp_dir=manifest_config["temp_directory"],
    staging=manifest_config["is_staging"],
    is_patch=manifest_config["is_patch"],
    only_counting=manifest_config["only_counting"],
    push_to_git=manifest_config["push_to_git"],
    model_id=manifest_config["model_id"],
    run_id=manifest_config["run_id"],
)
manifest

<impresso_essentials.versioning.data_manifest.DataManifest at 0x7f6feeb7fc20>

In [None]:
def compute_stats_in_topics_bag(
    s3_topics
) -> list[dict[str, Any]]:
    """Compute stats on a dask bag of topic modeling output content-items.

    Args:
        s3_topics (db.core.Bag): Bag with the contents of topics files.
        client (Client | None, optional): Dask client. Defaults to None.

    Returns:
        list[dict[str, Any]]: List of counts that match topics DataStatistics keys.
    """

    def flatten_lists(list_elem):
        final_list = []
        for str_list in list_elem:
            assert isinstance(str_list, str)
            if str_list == '[]':
                final_list.append('no-topic')
            else:
                for elem in literal_eval(str_list):
                    final_list.append(elem)
        
        return final_list
        

    def freq(x, col=('topics_fd')):
        x[col] = dict(Counter(literal_eval(x[col])))
        return x
    
    def to_freq_dict(list_elem):
        #print(f"'[]' in list_elem: {'[]' in list_elem}")
        #print(f"type before: {type(list_elem)}, {isinstance(list_elem, str)}, {isinstance(list_elem, list)}, {isinstance(list_elem, dict)}")
        #print(f"type first elem: {type(list_elem[0])}, {isinstance(list_elem[0], str)}, {isinstance(list_elem[0], list)}, {isinstance(list_elem[0], dict)}")
        flat_list = flatten_lists(list_elem)
        #print(f"'no-topic' in flat_list: {'no-topic' in flat_list}")
        #print(f"type middle: {type(flat_list)}, {isinstance(flat_list, str)}, {isinstance(flat_list, list)}, {isinstance(flat_list, dict)}")
        #print(f"type first elem: {type(flat_list[0])}, {isinstance(flat_list[0], str)}, {isinstance(flat_list[0], list)}, {isinstance(flat_list[0], dict)}")
        fd =  dict(Counter(flat_list))
        #print(f"type middle: {type(fd)}, {isinstance(fd, str)}, {isinstance(fd, list)}, {isinstance(fd, dict)}")
        return fd

    
    count_df = s3_topics.map(
        lambda ci: {
            "np_id": ci["ci_id"].split("-")[0],
            "year": ci["ci_id"].split("-")[1],
            "issues": ci["ci_id"].split("-i")[0],
            "content_items_out": 1,
            "topics": sorted(
                [
                    t["t"]
                    for t in ci["topics"]
                    if "t" in t
                ]
            ),  # sorted list to ensure all are the same
        }
    ).to_dataframe(
        meta={
            "np_id": str,
            "year": str,
            "issues": str,
            "content_items_out": int,
            "topics": object,
        }
    )

    count_df["topics"] = count_df["topics"].apply(
        lambda x: x if isinstance(x, list) else [x], meta=('topics', 'object')
    )

    # cum the counts for all values collected
    aggregated_df = (
        count_df
        .explode("topics")
        .groupby(by=["np_id", "year"])
        .agg(
            {
                "issues": tunique,
                "content_items_out": sum,  
                "topics": [tunique, list]
            }
        )
        #.reset_index()
    )#.persist()

    print(aggregated_df.columns)
    aggregated_df.columns = aggregated_df.columns.to_flat_index()
    #aggregated_df =.reset_index()
    aggregated_df = aggregated_df.reset_index().rename(columns = {
        ("np_id", ""): "np_id", 
        ("year", ""):"year", 
        ("issues", "tunique"):"issues", 
        ("content_items_out", "sum"):"content_items_out", 
        ("topics", "tunique"):"topics",
        ("topics", "list"):"topics_fd",
    })
    print(aggregated_df.columns)

    aggregated_df['topics_fd'] = aggregated_df['topics_fd'].apply(to_freq_dict, meta=('topics_fd', 'object'))

    print(aggregated_df.head())
    print("Finished grouping and aggregating stats by title and year.")
    #logger.info("Finished grouping and aggregating stats by title and year.")

    #if client is not None:
        # only add the progress bar if the client is defined
        #progress(aggregated_df)

    # return as a list of dicts
    return aggregated_df.to_bag(format="dict").compute()#.map(freq).compute()


In [13]:
def compute_stats_in_topics_bag(
    s3_topics
) -> list[dict[str, Any]]:
    """Compute stats on a dask bag of topic modeling output content-items.

    Args:
        s3_topics (db.core.Bag): Bag with the contents of topics files.
        client (Client | None, optional): Dask client. Defaults to None.

    Returns:
        list[dict[str, Any]]: List of counts that match topics DataStatistics keys.
    """

    def flatten_lists(list_elem):
        final_list = []
        for str_list in list_elem:
            assert isinstance(str_list, str), "Inside topic aggregator flatten_list, and provided list is not str!"
            if str_list == "[]":
                final_list.append("no-topic")
            else:
                for elem in literal_eval(str_list):
                    final_list.append(elem)

        return final_list

    def to_freq_dict(list_elem):
        flat_list = flatten_lists(list_elem)
        return dict(Counter(flat_list))
    
    def cast(x, col='topics_fd'):
        x[col] = literal_eval(x[col])
        return x

    count_df = s3_topics.map(
        lambda ci: {
            "np_id": ci["ci_id"].split("-")[0],
            "year": ci["ci_id"].split("-")[1],
            "issues": ci["ci_id"].split("-i")[0],
            "content_items_out": 1,
            "topics": sorted(
                [t["t"] for t in ci["topics"] if "t" in t]
            ),  # sorted list to ensure all are the same
        }
    ).to_dataframe(
        meta={
            "np_id": str,
            "year": str,
            "issues": str,
            "content_items_out": int,
            "topics": object,
        }
    )

    count_df["topics"] = count_df["topics"].apply(
        lambda x: x if isinstance(x, list) else [x], meta=("topics", "object")
    )

    # cum the counts for all values collected
    aggregated_df = (
        count_df.explode("topics")
        .groupby(by=["np_id", "year"])
        .agg({"issues": tunique, "content_items_out": sum, "topics": [tunique, list]})
    )

    #aggregated_df = raw_aggregated_df.copy()
    aggregated_df.columns = aggregated_df.columns.to_flat_index()
    aggregated_df = (
        aggregated_df
        .reset_index()
        .rename(
            columns={
                ("np_id", ""): "np_id",
                ("year", ""): "year",
                ("issues", "tunique"): "issues",
                ("content_items_out", "sum"): "content_items_out",
                ("topics", "tunique"): "topics",
                ("topics", "list"): "topics_fd",
            }
        ) 
        .sort_values("year")
    )

    aggregated_df["topics_fd"] = aggregated_df["topics_fd"].apply(
        to_freq_dict, meta=("topics_fd", "object")
    )

    print("Finished grouping and aggregating stats by title and year.")
    #logger.info("Finished grouping and aggregating stats by title and year.")

    #if client is not None:
        # only add the progress bar if the client is defined
        #progress(aggregated_df)

    if len(aggregated_df.head()) != 0:
        # return as a list of dicts
        final_dict = aggregated_df.to_bag(format="dict").map(cast)#.map(freq).compute()
        
        return final_dict.compute()
    else:
        return {}

In [14]:
comp_stats_sum = []
for np_title, np_s3_files in tqdm(s3_files.items()):
    print(f"---- {np_title} ----")
    processed_files = db.read_text(
        np_s3_files, storage_options=IMPRESSO_STORAGEOPT
    ).map(json.loads)

    if len(processed_files.take(1, npartitions=-1))!=0:
        computed_stats = compute_stats_in_topics_bag(processed_files)
        print(computed_stats)
        comp_stats_sum.append(computed_stats)
        manifest = compute_manifest.add_stats_to_mft(manifest, np_title, computed_stats)

  0%|          | 0/135 [00:00<?, ?it/s]

---- ACI ----
Finished grouping and aggregating stats by title and year.


  1%|          | 1/135 [00:03<07:09,  3.20s/it]

[{'np_id': 'ACI', 'year': '1832', 'issues': 1, 'content_items_out': 100, 'topics': 59, 'topics_fd': {'tm-fr-all-v2.0_tp07_fr': 1, 'tm-fr-all-v2.0_tp22_fr': 70, 'tm-fr-all-v2.0_tp27_fr': 1, 'tm-fr-all-v2.0_tp34_fr': 21, 'tm-fr-all-v2.0_tp49_fr': 3, 'tm-fr-all-v2.0_tp82_fr': 23, 'tm-fr-all-v2.0_tp84_fr': 86, 'tm-fr-all-v2.0_tp40_fr': 8, 'tm-fr-all-v2.0_tp52_fr': 3, 'tm-fr-all-v2.0_tp20_fr': 3, 'tm-fr-all-v2.0_tp36_fr': 2, 'tm-fr-all-v2.0_tp83_fr': 1, 'tm-fr-all-v2.0_tp58_fr': 9, 'tm-fr-all-v2.0_tp99_fr': 2, 'tm-fr-all-v2.0_tp14_fr': 1, 'tm-fr-all-v2.0_tp61_fr': 2, 'tm-fr-all-v2.0_tp68_fr': 1, 'tm-fr-all-v2.0_tp79_fr': 2, 'tm-fr-all-v2.0_tp81_fr': 2, 'tm-fr-all-v2.0_tp47_fr': 2, 'tm-fr-all-v2.0_tp90_fr': 1, 'tm-fr-all-v2.0_tp01_fr': 2, 'tm-fr-all-v2.0_tp17_fr': 1, 'tm-fr-all-v2.0_tp97_fr': 2, 'tm-fr-all-v2.0_tp09_fr': 3, 'tm-fr-all-v2.0_tp87_fr': 1, 'tm-fr-all-v2.0_tp89_fr': 2, 'tm-fr-all-v2.0_tp26_fr': 4, 'tm-fr-all-v2.0_tp43_fr': 3, 'tm-fr-all-v2.0_tp02_fr': 25, 'tm-fr-all-v2.0_tp93_fr'

  1%|▏         | 2/135 [00:06<07:24,  3.34s/it]

[{'np_id': 'AV', 'year': '1880', 'issues': 1, 'content_items_out': 247, 'topics': 164, 'topics_fd': {'tm-fr-all-v2.0_tp31_fr': 15, 'tm-fr-all-v2.0_tp47_fr': 15, 'tm-fr-all-v2.0_tp58_fr': 71, 'tm-fr-all-v2.0_tp78_fr': 2, 'tm-fr-all-v2.0_tp80_fr': 7, 'tm-fr-all-v2.0_tp22_fr': 69, 'tm-fr-all-v2.0_tp02_fr': 22, 'tm-fr-all-v2.0_tp46_fr': 11, 'tm-fr-all-v2.0_tp56_fr': 3, 'tm-fr-all-v2.0_tp83_fr': 26, 'tm-fr-all-v2.0_tp23_fr': 13, 'tm-fr-all-v2.0_tp38_fr': 7, 'tm-fr-all-v2.0_tp45_fr': 1, 'tm-fr-all-v2.0_tp59_fr': 1, 'tm-fr-all-v2.0_tp67_fr': 15, 'tm-fr-all-v2.0_tp82_fr': 106, 'tm-fr-all-v2.0_tp84_fr': 152, 'tm-fr-all-v2.0_tp97_fr': 21, 'tm-fr-all-v2.0_tp01_fr': 4, 'tm-fr-all-v2.0_tp69_fr': 20, 'tm-fr-all-v2.0_tp51_fr': 13, 'tm-fr-all-v2.0_tp57_fr': 17, 'tm-fr-all-v2.0_tp40_fr': 36, 'tm-fr-all-v2.0_tp49_fr': 2, 'tm-fr-all-v2.0_tp11_fr': 13, 'tm-fr-all-v2.0_tp20_fr': 7, 'tm-fr-all-v2.0_tp43_fr': 8, 'tm-fr-all-v2.0_tp91_fr': 7, 'tm-fr-all-v2.0_tp17_fr': 5, 'tm-fr-all-v2.0_tp99_fr': 21, 'tm-fr-al

  2%|▏         | 3/135 [00:09<06:56,  3.16s/it]

[{'np_id': 'BDC', 'year': '1839', 'issues': 23, 'content_items_out': 128, 'topics': 122, 'topics_fd': {'tm-fr-all-v2.0_tp58_fr': 77, 'tm-fr-all-v2.0_tp64_fr': 112, 'tm-fr-all-v2.0_tp90_fr': 6, 'tm-fr-all-v2.0_tp16_fr': 33, 'tm-fr-all-v2.0_tp70_fr': 30, 'tm-fr-all-v2.0_tp88_fr': 43, 'tm-fr-all-v2.0_tp44_fr': 4, 'tm-fr-all-v2.0_tp78_fr': 9, 'tm-fr-all-v2.0_tp02_fr': 5, 'tm-fr-all-v2.0_tp54_fr': 14, 'tm-fr-all-v2.0_tp56_fr': 3, 'tm-fr-all-v2.0_tp83_fr': 9, 'tm-fr-all-v2.0_tp95_fr': 11, 'tm-fr-all-v2.0_tp36_fr': 7, 'tm-fr-all-v2.0_tp00_fr': 29, 'tm-fr-all-v2.0_tp18_fr': 14, 'tm-fr-all-v2.0_tp55_fr': 6, 'tm-fr-all-v2.0_tp85_fr': 18, 'tm-fr-all-v2.0_tp06_fr': 3, 'tm-fr-all-v2.0_tp28_fr': 9, 'tm-fr-all-v2.0_tp34_fr': 1, 'tm-fr-all-v2.0_tp57_fr': 1, 'tm-fr-all-v2.0_tp66_fr': 1, 'tm-fr-all-v2.0_tp37_fr': 25, 'tm-fr-all-v2.0_tp52_fr': 43, 'tm-fr-all-v2.0_tp22_fr': 1, 'tm-fr-all-v2.0_tp73_fr': 2, 'tm-fr-all-v2.0_tp87_fr': 20, 'tm-fr-all-v2.0_tp99_fr': 2, 'tm-fr-all-v2.0_tp25_fr': 8, 'tm-fr-all-v2

  3%|▎         | 4/135 [00:13<07:19,  3.35s/it]

[{'np_id': 'BLB', 'year': '1845', 'issues': 1, 'content_items_out': 2, 'topics': 2, 'topics_fd': {'tm-de-all-v2.0_tp03_de': 1, 'tm-de-all-v2.0_tp12_de': 2, 'tm-de-all-v2.0_tp54_de': 1, 'tm-de-all-v2.0_tp68_de': 2, 'tm-de-all-v2.0_tp78_de': 1, 'tm-de-all-v2.0_tp23_de': 1, 'tm-de-all-v2.0_tp89_de': 1}}, {'np_id': 'BLB', 'year': '1846', 'issues': 52, 'content_items_out': 170, 'topics': 139, 'topics_fd': {'tm-de-all-v2.0_tp12_de': 64, 'tm-de-all-v2.0_tp24_de': 52, 'tm-de-all-v2.0_tp47_de': 62, 'tm-de-all-v2.0_tp54_de': 57, 'tm-de-all-v2.0_tp68_de': 123, 'tm-de-all-v2.0_tp83_de': 6, 'tm-de-all-v2.0_tp23_de': 73, 'tm-de-all-v2.0_tp53_de': 4, 'tm-de-all-v2.0_tp22_de': 3, 'tm-de-all-v2.0_tp61_de': 2, 'tm-de-all-v2.0_tp82_de': 1, 'tm-de-all-v2.0_tp87_de': 3, 'tm-de-all-v2.0_tp13_de': 1, 'tm-de-all-v2.0_tp44_de': 2, 'tm-de-all-v2.0_tp99_de': 4, 'tm-de-all-v2.0_tp07_de': 2, 'tm-de-all-v2.0_tp43_de': 5, 'tm-de-all-v2.0_tp14_de': 9, 'tm-de-all-v2.0_tp81_de': 8, 'tm-de-all-v2.0_tp63_de': 5, 'tm-de-a

  3%|▎         | 4/135 [00:14<07:56,  3.63s/it]

Finished grouping and aggregating stats by title and year.





KeyboardInterrupt: 

In [158]:
comp_stats_sum

[[{'np_id': 'BDC',
   'year': '1839',
   'issues': 23,
   'content_items_out': 141,
   'topics': 115,
   'topics_fd': {'tm-fr-all-v2.0_tp10_fr': 17,
    'tm-fr-all-v2.0_tp64_fr': 117,
    'tm-fr-all-v2.0_tp90_fr': 14,
    'tm-fr-all-v2.0_tp16_fr': 54,
    'tm-fr-all-v2.0_tp88_fr': 54,
    'no-topic': 9,
    'tm-fr-all-v2.0_tp78_fr': 11,
    'tm-fr-all-v2.0_tp85_fr': 22,
    'tm-fr-all-v2.0_tp02_fr': 8,
    'tm-fr-all-v2.0_tp54_fr': 14,
    'tm-fr-all-v2.0_tp83_fr': 22,
    'tm-fr-all-v2.0_tp95_fr': 20,
    'tm-fr-all-v2.0_tp70_fr': 49,
    'tm-fr-all-v2.0_tp21_fr': 4,
    'tm-fr-all-v2.0_tp32_fr': 6,
    'tm-fr-all-v2.0_tp46_fr': 1,
    'tm-fr-all-v2.0_tp55_fr': 1,
    'tm-fr-all-v2.0_tp80_fr': 6,
    'tm-fr-all-v2.0_tp00_fr': 8,
    'tm-fr-all-v2.0_tp28_fr': 13,
    'tm-fr-all-v2.0_tp86_fr': 4,
    'tm-fr-all-v2.0_tp12_fr': 1,
    'tm-fr-all-v2.0_tp34_fr': 1,
    'tm-fr-all-v2.0_tp35_fr': 12,
    'tm-fr-all-v2.0_tp56_fr': 10,
    'tm-fr-all-v2.0_tp57_fr': 2,
    'tm-fr-all-v2.0_tp03_f

In [43]:
comp_stats_count = []
for np_title, np_s3_files in s3_files.items():
    if np_title == 'EDA':
        break
    processed_files = db.read_text(
        np_s3_files, storage_options=IMPRESSO_STORAGEOPT
    ).map(json.loads)

    #computed_stats = compute_stats_in_topics_bag(processed_files)
    #print(computed_stats)
    #comp_stats_count.append(computed_stats)

In [None]:
from ast import literal_eval
from collections import Counter

def flatten_lists(list_elem):
    final_list = []
    for str_list in list_elem:
        assert isinstance(str_list, str)
        if str_list == '[]':
            print("Empty list, appending None")
            final_list.append('None')
        else:
            for elem in literal_eval(str_list):
                final_list.append(elem)
    
    return final_list
    

def freq(x, col=('topics_fd')):
    x[col] = dict(Counter(literal_eval(x[col])))
    return x


count_df = (
    processed_files.map(
        lambda ci: {
            "np_id": ci["id"].split("-")[0],
            "year": ci["id"].split("-")[1],
            "issues": ci["id"].split("-i")[0],
            "content_items_out": 1,
            "topics": sorted(
                [
                    t["t"]
                    for t in ci["topics"]
                    if "t" in t
                ]
            ),  # sorted list to ensure all are the same
        }
    ).to_dataframe(
        meta={
            "np_id": str,
            "year": str,
            "issues": str,
            "content_items_out": int,
            "topics": object,
        }
    )
    # .explode("ne_entities")
    # .persist()
)


count_df["topics"] = count_df["topics"].apply(
    lambda x: x if isinstance(x, list) else [x], meta=('topics', 'object')
)
#count_df = count_df.explode("topics").persist()


aggregated_df = (
    count_df
    .explode("topics")
    .groupby(by=["np_id", "year"])
    .agg(
        {
            "issues": tunique,
            "content_items_out": sum,
            "topics": [tunique, list]
        }
    )
    .reset_index()
).persist()

aggregated_df.columns = aggregated_df.columns.to_flat_index()
aggregated_df = aggregated_df.rename(columns = {
    ("np_id", ""): "np_id", 
    ("year", ""):"year", 
    ("issues", "tunique"):"issues", 
    ("content_items_out", "sum"):"content_items_out", 
    ("topics", "tunique"):"topics",
    ("topics", "list"):"topics_fd",
})

aggregated_df['topics_fd'] = aggregated_df['topics_fd'].apply(flatten_lists, meta=('topics_fd', 'object'))

result2 = aggregated_df.to_bag(format="dict").map(freq).compute()

Empty list, appending None
Empty list, appending None
Empty list, appending None
Empty list, appending None
Empty list, appending None
Empty list, appending None
Empty list, appending None
Empty list, appending None
Empty list, appending None
Empty list, appending None
Empty list, appending None
Empty list, appending None
Empty list, appending None
Empty list, appending None
Empty list, appending None
Empty list, appending None
Empty list, appending None
Empty list, appending None
Empty list, appending None
Empty list, appending None
Empty list, appending None
Empty list, appending None
Empty list, appending None
Empty list, appending None
Empty list, appending None
Empty list, appending None
Empty list, appending None
Empty list, appending None
Empty list, appending None
Empty list, appending None
Empty list, appending None
Empty list, appending None
Empty list, appending None
Empty list, appending None
Empty list, appending None
Empty list, appending None
Empty list, appending None
E

In [143]:
result

[{'np_id': 'DVF',
  'year': '1878',
  'issues': 1,
  'content_items_out': 19,
  'topics': 19,
  'topics_fd': {'tm-de-all-v2.0_tp11_de': 4,
   'tm-de-all-v2.0_tp12_de': 4,
   'tm-de-all-v2.0_tp30_de': 5,
   'tm-de-all-v2.0_tp40_de': 2,
   'tm-de-all-v2.0_tp65_de': 3,
   'tm-de-all-v2.0_tp81_de': 3,
   'tm-de-all-v2.0_tp29_de': 2,
   'tm-de-all-v2.0_tp43_de': 2,
   'tm-de-all-v2.0_tp88_de': 2,
   'tm-de-all-v2.0_tp99_de': 4,
   'tm-de-all-v2.0_tp10_de': 3,
   'tm-de-all-v2.0_tp14_de': 1,
   'tm-de-all-v2.0_tp38_de': 2,
   'tm-de-all-v2.0_tp51_de': 1,
   'tm-de-all-v2.0_tp56_de': 2,
   'tm-de-all-v2.0_tp68_de': 2,
   'tm-de-all-v2.0_tp79_de': 1,
   'tm-de-all-v2.0_tp94_de': 1,
   'tm-de-all-v2.0_tp44_de': 1,
   'tm-de-all-v2.0_tp54_de': 4,
   'tm-de-all-v2.0_tp89_de': 3,
   'tm-de-all-v2.0_tp02_de': 6,
   'tm-de-all-v2.0_tp04_de': 1,
   'tm-de-all-v2.0_tp06_de': 2,
   'tm-de-all-v2.0_tp19_de': 2,
   'tm-de-all-v2.0_tp23_de': 6,
   'tm-de-all-v2.0_tp24_de': 1,
   'tm-de-all-v2.0_tp33_de': 

In [152]:
result==result2

True

In [44]:
processed_files.take(1)

({'ci_id': 'DVF-1878-12-28-a-i0001',
  'ts': '2024-12-08T23:46:13Z',
  'lg': 'de',
  'topic_count': 100,
  'topics': [{'t': 'tm-de-all-v2.0_tp12_de', 'p': 0.239},
   {'t': 'tm-de-all-v2.0_tp30_de', 'p': 0.218},
   {'t': 'tm-de-all-v2.0_tp65_de', 'p': 0.198},
   {'t': 'tm-de-all-v2.0_tp87_de', 'p': 0.071},
   {'t': 'tm-de-all-v2.0_tp11_de', 'p': 0.061},
   {'t': 'tm-de-all-v2.0_tp81_de', 'p': 0.054}],
  'min_p': 0.05,
  'topic_model_id': 'tm-de-all-v2.0',
  'topics_git': 'a03a89f',
  'lingproc_run_id': 'lingproc-pos-spacy_v3.6.0-multilingual_v1-0-3',
  'model_id': 'tm-mallet_infer_seed42_v2.0.1-multilingual'},)

In [45]:
count_df = (
    processed_files.map(
        lambda ci: {
            "np_id": ci["ci_id"].split("-")[0],
            "year": ci["ci_id"].split("-")[1],
            "issues": ci["ci_id"].split("-i")[0],
            "content_items_out": 1,
            "topics": sorted(
                [
                    t["t"]
                    for t in ci["topics"]
                    if "t" in t
                ]
            ),  # sorted list to ensure all are the same
        }
    ).to_dataframe(
        meta={
            "np_id": str,
            "year": str,
            "issues": str,
            "content_items_out": int,
            "topics": object,
        }
    )
    # .explode("ne_entities")
    # .persist()
)

In [46]:
count_df["topics"] = count_df["topics"].apply(
    lambda x: x if isinstance(x, list) else [x], meta=('topics', 'object')
)
count_df = count_df.explode("topics").persist()


In [47]:
count_df.head()

Unnamed: 0,np_id,year,issues,content_items_out,topics
0,DVF,1878,DVF-1878-12-28-a,1,"['tm-de-all-v2.0_tp11_de', 'tm-de-all-v2.0_tp1..."
1,DVF,1878,DVF-1878-12-28-a,1,"['tm-de-all-v2.0_tp12_de', 'tm-de-all-v2.0_tp5..."
2,DVF,1878,DVF-1878-12-28-a,1,"['tm-de-all-v2.0_tp11_de', 'tm-de-all-v2.0_tp1..."
3,DVF,1878,DVF-1878-12-28-a,1,"['tm-de-all-v2.0_tp12_de', 'tm-de-all-v2.0_tp5..."
4,DVF,1878,DVF-1878-12-28-a,1,"['tm-de-all-v2.0_tp23_de', 'tm-de-all-v2.0_tp8..."


In [48]:

from ast import literal_eval
from collections import Counter

def freq(x, col=('topics_fd')):
    #print(x[col])
    #concat_list = [elem for l in x[col] for elem in literal_eval(l)]
    x[col] = dict(Counter(literal_eval(x[col])))
    return x

In [53]:
aggregated_df = (
    count_df.groupby(by=["np_id", "year"])
    .agg(
        {
            "issues": tunique,
            "content_items_out": 'count',
            "topics": [tunique, list]
        }
    )
    #.reset_index()
).persist()

In [60]:
aggregated_df.head()

Unnamed: 0_level_0,Unnamed: 1_level_0,issues,content_items_out,topics,topics
Unnamed: 0_level_1,Unnamed: 1_level_1,tunique,count,tunique,list
np_id,year,Unnamed: 2_level_2,Unnamed: 3_level_2,Unnamed: 4_level_2,Unnamed: 5_level_2
DVF,1878,1,6,6,"[['tm-de-all-v2.0_tp11_de', 'tm-de-all-v2.0_tp..."
DVF,1879,103,432,377,"[['tm-de-all-v2.0_tp57_de', 'tm-de-all-v2.0_tp..."
DVF,1880,103,501,393,"[['tm-de-all-v2.0_tp11_de', 'tm-de-all-v2.0_tp..."
DVF,1881,64,278,250,"[['tm-de-all-v2.0_tp23_de', 'tm-de-all-v2.0_tp..."
DVF,1882,300,2106,1669,"[['tm-de-all-v2.0_tp23_de', 'tm-de-all-v2.0_tp..."


In [61]:
aggregated_df.columns

MultiIndex([(           'issues', 'tunique'),
            ('content_items_out',   'count'),
            (           'topics', 'tunique'),
            (           'topics',    'list')],
           )

In [62]:
aggregated_df_test = aggregated_df.copy()
aggregated_df_test.columns = aggregated_df_test.columns.to_flat_index()
aggregated_df_test = aggregated_df_test.reset_index()
aggregated_df_test.head()

Unnamed: 0,np_id,year,"(issues, tunique)","(content_items_out, count)","(topics, tunique)","(topics, list)"
0,DVF,1878,1,6,6,"[['tm-de-all-v2.0_tp11_de', 'tm-de-all-v2.0_tp..."
1,DVF,1879,103,432,377,"[['tm-de-all-v2.0_tp57_de', 'tm-de-all-v2.0_tp..."
2,DVF,1880,103,501,393,"[['tm-de-all-v2.0_tp11_de', 'tm-de-all-v2.0_tp..."
3,DVF,1881,64,278,250,"[['tm-de-all-v2.0_tp23_de', 'tm-de-all-v2.0_tp..."
4,DVF,1882,300,2106,1669,"[['tm-de-all-v2.0_tp23_de', 'tm-de-all-v2.0_tp..."


In [64]:
#aggregated_df.columns = aggregated_df.columns.to_flat_index()
aggregated_df_test = aggregated_df_test.rename(columns = {
    ("np_id", ""): "np_id", 
    ("year", ""):"year", 
    ("issues", "tunique"):"issues", 
    ("content_items_out", "count"):"content_items_out", 
    ("topics", "tunique"):"topics",
    ("topics", "list"):"topics_fd",
})
aggregated_df_test.head()

Unnamed: 0,np_id,year,issues,content_items_out,topics,topics_fd
0,DVF,1878,1,6,6,"[['tm-de-all-v2.0_tp11_de', 'tm-de-all-v2.0_tp..."
1,DVF,1879,103,432,377,"[['tm-de-all-v2.0_tp57_de', 'tm-de-all-v2.0_tp..."
2,DVF,1880,103,501,393,"[['tm-de-all-v2.0_tp11_de', 'tm-de-all-v2.0_tp..."
3,DVF,1881,64,278,250,"[['tm-de-all-v2.0_tp23_de', 'tm-de-all-v2.0_tp..."
4,DVF,1882,300,2106,1669,"[['tm-de-all-v2.0_tp23_de', 'tm-de-all-v2.0_tp..."


In [136]:
aggregated_df['topics_fd'][3].head()

['[]',
 "['tm-de-all-v2.0_tp15_de', 'tm-de-all-v2.0_tp23_de', 'tm-de-all-v2.0_tp24_de', 'tm-de-all-v2.0_tp26_de', 'tm-de-all-v2.0_tp55_de', 'tm-de-all-v2.0_tp57_de', 'tm-de-all-v2.0_tp79_de']",
 '[]',
 "['tm-de-all-v2.0_tp23_de', 'tm-de-all-v2.0_tp71_de', 'tm-de-all-v2.0_tp96_de']",
 "['tm-de-all-v2.0_tp02_de', 'tm-de-all-v2.0_tp36_de', 'tm-de-all-v2.0_tp42_de', 'tm-de-all-v2.0_tp43_de', 'tm-de-all-v2.0_tp74_de', 'tm-de-all-v2.0_tp76_de']",
 "['tm-de-all-v2.0_tp23_de', 'tm-de-all-v2.0_tp43_de', 'tm-de-all-v2.0_tp74_de', 'tm-de-all-v2.0_tp76_de']",
 "['tm-de-all-v2.0_tp02_de', 'tm-de-all-v2.0_tp23_de', 'tm-de-all-v2.0_tp30_de', 'tm-de-all-v2.0_tp43_de', 'tm-de-all-v2.0_tp63_de', 'tm-de-all-v2.0_tp81_de', 'tm-de-all-v2.0_tp96_de']",
 "['tm-de-all-v2.0_tp04_de', 'tm-de-all-v2.0_tp10_de', 'tm-de-all-v2.0_tp27_de', 'tm-de-all-v2.0_tp39_de', 'tm-de-all-v2.0_tp51_de', 'tm-de-all-v2.0_tp56_de', 'tm-de-all-v2.0_tp94_de']",
 "['tm-de-all-v2.0_tp07_de', 'tm-de-all-v2.0_tp08_de', 'tm-de-all-v2.0_t

In [137]:
def flatten_lists(list_elem):
    final_list = []
    for str_list in list_elem:
        assert isinstance(str_list, str)
        if str_list == '[]':
            print("Empty list, appending None")
            final_list.append('None')
        else:
            print("evaluating list")
            for elem in literal_eval(str_list):
                final_list.append(elem)
    
    return final_list
    

In [138]:
aggregated_df['topics_fd'] = aggregated_df['topics_fd'].apply(flatten_lists, meta=('topics_fd', 'object'))
#aggregated_df['topics_fd'] = aggregated_df['topics_fd'].apply(lambda x: [elem for l in x for elem in literal_eval(l)], meta=('topics_fd', 'object'))
#aggregated_df = aggregated_df.drop(('topics', 'list'), axis=1)
aggregated_df.head()

Unnamed: 0,np_id,year,issues,content_items_out,topics,topics_fd
0,DVF,1878,1,19,19,"[tm-de-all-v2.0_tp11_de, tm-de-all-v2.0_tp12_d..."
1,DVF,1879,103,909,820,"[tm-de-all-v2.0_tp13_de, tm-de-all-v2.0_tp15_d..."
2,DVF,1880,103,1114,952,"[tm-de-all-v2.0_tp42_de, tm-de-all-v2.0_tp50_d..."
3,DVF,1881,64,509,458,"[None, tm-de-all-v2.0_tp15_de, tm-de-all-v2.0_..."
4,DVF,1882,300,4020,3586,"[tm-de-all-v2.0_tp01_de, tm-de-all-v2.0_tp23_d..."


In [123]:
test_list = aggregated_df['topics_fd'][3].head()#[0], aggregated_df[('topics', 'list')][0].head()[1]

dict(Counter(test_list))

{'None': 43,
 'tm-de-all-v2.0_tp15_de': 14,
 'tm-de-all-v2.0_tp23_de': 181,
 'tm-de-all-v2.0_tp24_de': 22,
 'tm-de-all-v2.0_tp26_de': 20,
 'tm-de-all-v2.0_tp55_de': 38,
 'tm-de-all-v2.0_tp57_de': 92,
 'tm-de-all-v2.0_tp79_de': 27,
 'tm-de-all-v2.0_tp71_de': 37,
 'tm-de-all-v2.0_tp96_de': 90,
 'tm-de-all-v2.0_tp02_de': 70,
 'tm-de-all-v2.0_tp36_de': 9,
 'tm-de-all-v2.0_tp42_de': 14,
 'tm-de-all-v2.0_tp43_de': 38,
 'tm-de-all-v2.0_tp74_de': 37,
 'tm-de-all-v2.0_tp76_de': 58,
 'tm-de-all-v2.0_tp30_de': 81,
 'tm-de-all-v2.0_tp63_de': 58,
 'tm-de-all-v2.0_tp81_de': 31,
 'tm-de-all-v2.0_tp04_de': 29,
 'tm-de-all-v2.0_tp10_de': 14,
 'tm-de-all-v2.0_tp27_de': 14,
 'tm-de-all-v2.0_tp39_de': 20,
 'tm-de-all-v2.0_tp51_de': 34,
 'tm-de-all-v2.0_tp56_de': 9,
 'tm-de-all-v2.0_tp94_de': 33,
 'tm-de-all-v2.0_tp07_de': 76,
 'tm-de-all-v2.0_tp08_de': 65,
 'tm-de-all-v2.0_tp12_de': 116,
 'tm-de-all-v2.0_tp54_de': 165,
 'tm-de-all-v2.0_tp97_de': 64,
 'tm-de-all-v2.0_tp34_de': 45,
 'tm-de-all-v2.0_tp68_de'

In [139]:
result = aggregated_df.to_bag(format="dict").map(freq).compute()
result

evaluating list
evaluating list
evaluating list
evaluating list
evaluating list
evaluating list
evaluating list
evaluating list
evaluating list
evaluating list
evaluating list
evaluating list
Empty list, appending None
evaluating list
evaluating list
evaluating list
evaluating list
evaluating list
evaluating list
evaluating list
evaluating list
evaluating list
evaluating list
evaluating list
evaluating list
evaluating list
evaluating list
evaluating list
Empty list, appending None
evaluating list
evaluating list
evaluating list
evaluating list
evaluating list
evaluating list
evaluating list
evaluating list
evaluating list
evaluating list
evaluating list
evaluating list
evaluating list
evaluating list
evaluating list
evaluating list
evaluating list
evaluating list
evaluating list
evaluating list
evaluating list
evaluating list
evaluating list
evaluating list
evaluating list
evaluating list
evaluating list
evaluating list
evaluating list
evaluating list
evaluating list
evaluating list
Em

[{'np_id': 'DVF',
  'year': '1878',
  'issues': 1,
  'content_items_out': 19,
  'topics': 19,
  'topics_fd': {'tm-de-all-v2.0_tp11_de': 4,
   'tm-de-all-v2.0_tp12_de': 4,
   'tm-de-all-v2.0_tp30_de': 5,
   'tm-de-all-v2.0_tp40_de': 2,
   'tm-de-all-v2.0_tp65_de': 3,
   'tm-de-all-v2.0_tp81_de': 3,
   'tm-de-all-v2.0_tp29_de': 2,
   'tm-de-all-v2.0_tp43_de': 2,
   'tm-de-all-v2.0_tp88_de': 2,
   'tm-de-all-v2.0_tp99_de': 4,
   'tm-de-all-v2.0_tp10_de': 3,
   'tm-de-all-v2.0_tp14_de': 1,
   'tm-de-all-v2.0_tp38_de': 2,
   'tm-de-all-v2.0_tp51_de': 1,
   'tm-de-all-v2.0_tp56_de': 2,
   'tm-de-all-v2.0_tp68_de': 2,
   'tm-de-all-v2.0_tp79_de': 1,
   'tm-de-all-v2.0_tp94_de': 1,
   'tm-de-all-v2.0_tp44_de': 1,
   'tm-de-all-v2.0_tp54_de': 4,
   'tm-de-all-v2.0_tp89_de': 3,
   'tm-de-all-v2.0_tp02_de': 6,
   'tm-de-all-v2.0_tp04_de': 1,
   'tm-de-all-v2.0_tp06_de': 2,
   'tm-de-all-v2.0_tp19_de': 2,
   'tm-de-all-v2.0_tp23_de': 6,
   'tm-de-all-v2.0_tp24_de': 1,
   'tm-de-all-v2.0_tp33_de': 

In [42]:
literal_eval(aggregated_df[('topics', 'list')][0].head()[0])

['tm-de-all-v2.0_tp11_de',
 'tm-de-all-v2.0_tp12_de',
 'tm-de-all-v2.0_tp30_de',
 'tm-de-all-v2.0_tp40_de',
 'tm-de-all-v2.0_tp65_de',
 'tm-de-all-v2.0_tp81_de']

In [45]:
concat_list = [elem for l in aggregated_df[('topics', 'list')][0].head()[:5] for elem in literal_eval(l)]
counter_list = dict(Counter(concat_list))
counter_list

{'tm-de-all-v2.0_tp11_de': 1,
 'tm-de-all-v2.0_tp12_de': 2,
 'tm-de-all-v2.0_tp30_de': 2,
 'tm-de-all-v2.0_tp40_de': 2,
 'tm-de-all-v2.0_tp65_de': 1,
 'tm-de-all-v2.0_tp81_de': 1,
 'tm-de-all-v2.0_tp29_de': 1,
 'tm-de-all-v2.0_tp43_de': 1,
 'tm-de-all-v2.0_tp88_de': 2,
 'tm-de-all-v2.0_tp99_de': 1,
 'tm-de-all-v2.0_tp10_de': 2,
 'tm-de-all-v2.0_tp14_de': 1,
 'tm-de-all-v2.0_tp38_de': 1,
 'tm-de-all-v2.0_tp51_de': 1,
 'tm-de-all-v2.0_tp56_de': 2,
 'tm-de-all-v2.0_tp68_de': 1,
 'tm-de-all-v2.0_tp79_de': 1,
 'tm-de-all-v2.0_tp94_de': 1,
 'tm-de-all-v2.0_tp44_de': 1,
 'tm-de-all-v2.0_tp54_de': 1,
 'tm-de-all-v2.0_tp89_de': 1,
 'tm-de-all-v2.0_tp02_de': 1,
 'tm-de-all-v2.0_tp04_de': 1,
 'tm-de-all-v2.0_tp06_de': 1,
 'tm-de-all-v2.0_tp19_de': 1,
 'tm-de-all-v2.0_tp23_de': 1,
 'tm-de-all-v2.0_tp24_de': 1,
 'tm-de-all-v2.0_tp33_de': 1,
 'tm-de-all-v2.0_tp50_de': 1,
 'tm-de-all-v2.0_tp59_de': 1,
 'tm-de-all-v2.0_tp76_de': 1,
 'tm-de-all-v2.0_tp96_de': 1}

In [128]:
result = aggregated_df.to_bag(format="dict").compute()#.map(freq).compute()
result

evaluating list
evaluating list
evaluating list
evaluating list
evaluating list
evaluating list
evaluating list
evaluating list
evaluating list
evaluating list
evaluating list
evaluating list
Empty list, appending None
evaluating list
evaluating list
evaluating list
evaluating list
evaluating list
evaluating list
evaluating list
evaluating list
evaluating list
evaluating list
evaluating list
evaluating list
evaluating list
evaluating list
evaluating list
Empty list, appending None
evaluating list
evaluating list
evaluating list
evaluating list
evaluating list
evaluating list
evaluating list
evaluating list
evaluating list
evaluating list
evaluating list
evaluating list
evaluating list
evaluating list
evaluating list
evaluating list
evaluating list
evaluating list
evaluating list
evaluating list
evaluating list
evaluating list
evaluating list
evaluating list
evaluating list
evaluating list
evaluating list
evaluating list
evaluating list
evaluating list
evaluating list
evaluating list
Em

[{'np_id': 'DVF',
  'year': '1878',
  'issues': 1,
  'content_items_out': 19,
  'topics': 19,
  'topics_fd': "['tm-de-all-v2.0_tp11_de', 'tm-de-all-v2.0_tp12_de', 'tm-de-all-v2.0_tp30_de', 'tm-de-all-v2.0_tp40_de', 'tm-de-all-v2.0_tp65_de', 'tm-de-all-v2.0_tp81_de', 'tm-de-all-v2.0_tp29_de', 'tm-de-all-v2.0_tp43_de', 'tm-de-all-v2.0_tp88_de', 'tm-de-all-v2.0_tp99_de', 'tm-de-all-v2.0_tp10_de', 'tm-de-all-v2.0_tp14_de', 'tm-de-all-v2.0_tp38_de', 'tm-de-all-v2.0_tp51_de', 'tm-de-all-v2.0_tp56_de', 'tm-de-all-v2.0_tp68_de', 'tm-de-all-v2.0_tp79_de', 'tm-de-all-v2.0_tp94_de', 'tm-de-all-v2.0_tp12_de', 'tm-de-all-v2.0_tp44_de', 'tm-de-all-v2.0_tp54_de', 'tm-de-all-v2.0_tp88_de', 'tm-de-all-v2.0_tp89_de', 'tm-de-all-v2.0_tp02_de', 'tm-de-all-v2.0_tp04_de', 'tm-de-all-v2.0_tp06_de', 'tm-de-all-v2.0_tp10_de', 'tm-de-all-v2.0_tp19_de', 'tm-de-all-v2.0_tp23_de', 'tm-de-all-v2.0_tp24_de', 'tm-de-all-v2.0_tp30_de', 'tm-de-all-v2.0_tp33_de', 'tm-de-all-v2.0_tp40_de', 'tm-de-all-v2.0_tp50_de', 'tm-d

In [129]:
for r in result:
    r['topics_fd'] = dict(Counter(literal_eval(r['topics_fd'])))

In [130]:
result

[{'np_id': 'DVF',
  'year': '1878',
  'issues': 1,
  'content_items_out': 19,
  'topics': 19,
  'topics_fd': {'tm-de-all-v2.0_tp11_de': 4,
   'tm-de-all-v2.0_tp12_de': 4,
   'tm-de-all-v2.0_tp30_de': 5,
   'tm-de-all-v2.0_tp40_de': 2,
   'tm-de-all-v2.0_tp65_de': 3,
   'tm-de-all-v2.0_tp81_de': 3,
   'tm-de-all-v2.0_tp29_de': 2,
   'tm-de-all-v2.0_tp43_de': 2,
   'tm-de-all-v2.0_tp88_de': 2,
   'tm-de-all-v2.0_tp99_de': 4,
   'tm-de-all-v2.0_tp10_de': 3,
   'tm-de-all-v2.0_tp14_de': 1,
   'tm-de-all-v2.0_tp38_de': 2,
   'tm-de-all-v2.0_tp51_de': 1,
   'tm-de-all-v2.0_tp56_de': 2,
   'tm-de-all-v2.0_tp68_de': 2,
   'tm-de-all-v2.0_tp79_de': 1,
   'tm-de-all-v2.0_tp94_de': 1,
   'tm-de-all-v2.0_tp44_de': 1,
   'tm-de-all-v2.0_tp54_de': 4,
   'tm-de-all-v2.0_tp89_de': 3,
   'tm-de-all-v2.0_tp02_de': 6,
   'tm-de-all-v2.0_tp04_de': 1,
   'tm-de-all-v2.0_tp06_de': 2,
   'tm-de-all-v2.0_tp19_de': 2,
   'tm-de-all-v2.0_tp23_de': 6,
   'tm-de-all-v2.0_tp24_de': 1,
   'tm-de-all-v2.0_tp33_de': 

In [14]:
result = aggregated_df.to_bag(format="dict").compute()
result

[{'np_id': 'BDC',
  'year': '1839',
  'issues': 23,
  'content_items_out': 141,
  'topics': 115}]

## 3. Testing for newsagencies

In [2]:
config_dict_newsagencies = {
    "data_stage": "newsagencies",
    "output_bucket": "42-processed-data-final/newsagencies/newsagencies-bert-base-historic-multilingual_v1-0-0",
    "input_bucket": "22-rebuilt-final",
    "git_repository": "/home/piconti/impresso-essentials",
    "temp_directory": "/home/piconti/temp",
    "newspapers": [],
    "previous_mft_s3_path": "",
    "is_staging": True,
    "is_patch": False,
    "patched_fields": [],
    "push_to_git": True,
    "file_extensions": "jsonl.bz2",
    "log_file": "/local/path/to/log_file.log",
    "notes": """First NER/EL models: 2024-02-01, 
        - NER: dbmdz_bert_base_historic_multilingual_cased trained on DE and FR data (https://github.com/impresso/newsagency-classification)
    """,
}

In [3]:
repo = git.Repo(config_dict_newsagencies["git_repository"])
stage = validate_stage(config_dict_newsagencies["data_stage"])
stage

<DataStage.NEWS_AGENCIES: 'newsagencies'>

In [4]:
manifest = DataManifest(
    data_stage=stage,
    s3_output_bucket=config_dict_newsagencies["output_bucket"],
    s3_input_bucket=config_dict_newsagencies["input_bucket"],
    git_repo=repo,
    temp_dir=config_dict_newsagencies["temp_directory"],
    staging=config_dict_newsagencies["is_staging"],
    is_patch=config_dict_newsagencies["is_patch"],
    push_to_git=config_dict_newsagencies["push_to_git"]
)
manifest

Note that once generated, this manifest will be pushed to the following path data-processing-versioning/data-processing/newsagencies/newsagencies-bert-base-historic-multilingual_v1-0-0 within the git repository.


<impresso_essentials.versioning.data_manifest.DataManifest at 0x7f35d89e3680>

In [5]:
manifest.branch

'staging'

In [6]:
s3_files = compute_manifest.get_files_to_consider(config_dict_newsagencies)
s3_files

{'ACI': ['s3://42-processed-data-final/newsagencies/newsagencies-bert-base-historic-multilingual_v1-0-0/ACI/ACI-1832.jsonl.bz2'],
 'AV': ['s3://42-processed-data-final/newsagencies/newsagencies-bert-base-historic-multilingual_v1-0-0/AV/AV-1880.jsonl.bz2',
  's3://42-processed-data-final/newsagencies/newsagencies-bert-base-historic-multilingual_v1-0-0/AV/AV-1881.jsonl.bz2',
  's3://42-processed-data-final/newsagencies/newsagencies-bert-base-historic-multilingual_v1-0-0/AV/AV-1886.jsonl.bz2'],
 'BDC': ['s3://42-processed-data-final/newsagencies/newsagencies-bert-base-historic-multilingual_v1-0-0/BDC/BDC-1839.jsonl.bz2'],
 'BLB': ['s3://42-processed-data-final/newsagencies/newsagencies-bert-base-historic-multilingual_v1-0-0/BLB/BLB-1846.jsonl.bz2',
  's3://42-processed-data-final/newsagencies/newsagencies-bert-base-historic-multilingual_v1-0-0/BLB/BLB-1847.jsonl.bz2'],
 'BNN': ['s3://42-processed-data-final/newsagencies/newsagencies-bert-base-historic-multilingual_v1-0-0/BNN/BNN-1885.json

In [7]:
for np_title, np_s3_files in s3_files.items():

    processed_files = db.read_text(
        np_s3_files, storage_options=IMPRESSO_STORAGEOPT
    ).map(json.loads)
    computed_stats = compute_manifest.compute_stats_for_stage(processed_files, stage)

    manifest = compute_manifest.add_stats_to_mft(manifest, np_title, computed_stats)
    break

You did not provide metadata, so Dask is running your function on a small dataset to guess output types. It is possible that Dask will guess incorrectly.
To provide an explicit output types or to silence this message, please provide the `meta=` keyword, as described in the map or apply function that you are using.
  Before: .apply(func)
  After:  .apply(func, meta=('ne_entities', 'object'))



Finished grouping and aggregating stats by title and year.


In [8]:
manifest.compute(export_to_git_and_s3=False)

Updated media information for ACI


In [9]:
manifest.manifest_data['notes'] = "debug try, will be deleted after"

In [12]:
manifest.manifest_data

{'mft_version': 'v0.1.0',
 'mft_generation_date': '2024-10-09 15:52:54',
 'mft_s3_path': 's3://42-processed-data-final/newsagencies/newsagencies-bert-base-historic-multilingual_v1-0-0/newsagencies_v0-1-0.json',
 'input_mft_s3_path': 's3://22-rebuilt-final/rebuilt_v1-1-1.json',
 'input_mft_git_path': 'data-processing-versioning/data-preparation/rebuilt_v1-1-1.json',
 'code_git_commit': 'https://github.com/impresso/impresso-essentials/commit/2a82933c28c6dbc1cd669b30b778a2ba035ac642',
 'media_list': [{'media_title': 'ACI',
   'last_modification_date': '2024-10-09 15:52:54',
   'update_type': 'modification',
   'update_level': 'title',
   'updated_years': [],
   'updated_fields': [],
   'code_git_commit': 'https://github.com/impresso/impresso-essentials/commit/2a82933c28c6dbc1cd669b30b778a2ba035ac642',
   'media_statistics': [{'stage': 'newsagencies',
     'granularity': 'title',
     'element': 'ACI',
     'nps_stats': {'content_items_out': 142,
      'issues': 1,
      'ne_mentions': 5,


In [13]:
manifest.validate_and_export_manifest(push_to_git=True)

Git repository impresso/impresso-data-release had already been cloned, pulling from branch staging.
abs_path: /home/piconti/temp/impresso-data-release/data-processing-versioning/data-processing/newsagencies/newsagencies-bert-base-historic-multilingual_v1-0-0, filename: newsagencies_v0-1-0.json


True

## 4. Aggregators for OCRQA

In [None]:
def compute_stats_in_ocrqa_bag(
    s3_ocrqas,
    client,
    title: str | None = None,
) -> list[dict[str, int | str]]:
    """Compute stats on a dask bag of OCRQA outputs.

    Args:
        s3_solr_ing_cis (db.core.Bag): Bag with the contents of the OCRQA files.
        client (Client | None, optional): Dask client. Defaults to None.
        title (str, optional): Media title for which the stats are being computed.
            Defaults to None.

    Returns:
        list[dict[str, Union[int, str]]]: List of counts that match OCRQA output
            DataStatistics keys.
    """
    # when called in the rebuilt, all the rebuilt articles in the bag
    # are from the same newspaper and year
    print(f"{title} - Fetched all files, gathering desired information.")
    #logger.info("%s - Fetched all files, gathering desired information.", title)

    # define the list of columns in the dataframe
    count_df = (
        s3_ocrqas.map(
            lambda ci: {
                "np_id": ci["ci_id"].split("-")[0],
                "year": ci["ci_id"].split("-")[1],
                "issues": "-".join(ci["ci_id"].split("-")[:-1]),
                "content_items_out": 1,
                "avg_ocrqa": ci['ocrqa'],
                "avg_ocrqa_per_lang_fd": {ci["lg"]: ci['ocrqa']},
            }
        )
        .to_dataframe(
            meta={
                "np_id": str,
                "year": str,
                "issues": str,
                "content_items_out": int,
                "avg_ocrqa": float,
                "avg_ocrqa_per_lang_fd": object
            }
        )
        .persist()
    )


    
    aggregated_df = (
        count_df.groupby(by=["np_id", "year"])
        .agg(
            {
                "issues": tunique,
                "content_items_out": sum,
                "avg_ocrqa": 'mean',

            }
        )
        .reset_index()
        .sort_values("year")
    ).persist()

    print(f"{title} - Finished grouping and aggregating stats by title and year.")
    """logger.info(
        "%s - Finished grouping and aggregating stats by title and year.", title
    )"""

    #if client is not None:
        # only add the progress bar if the client is defined
    #    progress(aggregated_df)

    # return as a list of dicts
    return aggregated_df.to_bag(format="dict").compute()

In [31]:
# define locally the nunique() aggregation function for dask
def chunk(s):
    """The function applied to the individual partition (map).
    Part of the ggregating function(s) implementing np.nunique()
    """
    return s.apply(lambda x: list(set(x)))


def agg(s):
    """The function which will aggregate the result from all the partitions (reduce).
    Part of the ggregating function(s) implementing np.nunique()
    """
    s = s._selected_obj
    # added apply(list) because in newer versions of pandas, it was ndarrays.
    return s.apply(list).groupby(level=list(range(s.index.nlevels))).sum()


def finalize(s):
    """The optional function that will be applied to the result of the agg_tu functions.
    Part of the ggregating function(s) implementing np.nunique()
    """
    return s.apply(lambda x: len(set(x)))


# aggregating function implementing np.nunique()
tunique = dd.Aggregation("tunique", chunk, agg, finalize)

In [2]:
manifest_config = {
    "data_stage": "ocrqa",
    "output_bucket": "42-processed-data-final/ocrqa/ocrqa-ocrqa-wp_v1.0.6_v1-0-0/",
    "input_bucket": "22-rebuilt-final",
    "git_repository": "/home/piconti/impresso-essentials",
    "newspapers": [],
    "temp_directory": "/scratch/piconti/impresso/git_temp_folder",
    "previous_mft_s3_path": "",
    "is_staging": True,
    "is_patch": False,
    "patched_fields": [],
    "push_to_git": False,
    "relative_git_path": "",
    "only_counting": False,
    "model_id": "OCR-quality-assessment-unigram-ocrqa-wp_v1.0.6",
    "run_id": "ocrqa-ocrqa-wp_v1.0.6_v1-0-0",
    "notes": "Debug manifest on the OCRQA - would need to be recomputed with the correct git repo.",
    "file_extensions": "jsonl.bz2",
    "compute_altogether": False,
    "check_s3_archives": False
}

ocrqa_s3_path = "s3://42-processed-data-final/ocrqa/ocrqa-ocrqa-wp_v1.0.6_v1-0-0/"

In [3]:
repo = git.Repo(manifest_config["git_repository"])
stage = validate_stage(manifest_config["data_stage"])
stage

<DataStage.OCRQA: 'ocrqa'>

In [4]:
ocrqa_manifest = DataManifest(
    data_stage=stage,
    s3_output_bucket=manifest_config["output_bucket"],
    s3_input_bucket=manifest_config["input_bucket"],
    git_repo=repo,
    temp_dir=manifest_config["temp_directory"],
    staging=manifest_config["is_staging"],
    is_patch=manifest_config["is_patch"],
    push_to_git=manifest_config["push_to_git"]
)
ocrqa_manifest

<impresso_essentials.versioning.data_manifest.DataManifest at 0x7fa0003bb980>

In [5]:
ocrqa_s3_files = compute_manifest.get_files_to_consider(manifest_config)
ocrqa_s3_files

Not checking for any corrupted S3 archives before launching the manifest computation. If you encounter any problems regarding the reading of the archives, please set `check_s3_archives` to True.


{'ACI': ['s3://42-processed-data-final/ocrqa/ocrqa-ocrqa-wp_v1.0.6_v1-0-0/ACI/ACI-1832.jsonl.bz2'],
 'AV': ['s3://42-processed-data-final/ocrqa/ocrqa-ocrqa-wp_v1.0.6_v1-0-0/AV/AV-1880.jsonl.bz2',
  's3://42-processed-data-final/ocrqa/ocrqa-ocrqa-wp_v1.0.6_v1-0-0/AV/AV-1881.jsonl.bz2',
  's3://42-processed-data-final/ocrqa/ocrqa-ocrqa-wp_v1.0.6_v1-0-0/AV/AV-1886.jsonl.bz2'],
 'BDC': ['s3://42-processed-data-final/ocrqa/ocrqa-ocrqa-wp_v1.0.6_v1-0-0/BDC/BDC-1839.jsonl.bz2'],
 'BLB': ['s3://42-processed-data-final/ocrqa/ocrqa-ocrqa-wp_v1.0.6_v1-0-0/BLB/BLB-1845.jsonl.bz2',
  's3://42-processed-data-final/ocrqa/ocrqa-ocrqa-wp_v1.0.6_v1-0-0/BLB/BLB-1846.jsonl.bz2',
  's3://42-processed-data-final/ocrqa/ocrqa-ocrqa-wp_v1.0.6_v1-0-0/BLB/BLB-1847.jsonl.bz2'],
 'BNN': ['s3://42-processed-data-final/ocrqa/ocrqa-ocrqa-wp_v1.0.6_v1-0-0/BNN/BNN-1885.jsonl.bz2',
  's3://42-processed-data-final/ocrqa/ocrqa-ocrqa-wp_v1.0.6_v1-0-0/BNN/BNN-1886.jsonl.bz2',
  's3://42-processed-data-final/ocrqa/ocrqa-ocrq

In [15]:
count_dfs = {}
for np_title, np_s3_files in ocrqa_s3_files.items():

    if np_title in ['ACI', 'AV', 'BDC', 'BLB', 'BNN', 'Bombe', 'CDV']:
        
        processed_files = db.read_text(
            np_s3_files, storage_options=IMPRESSO_STORAGEOPT
        ).map(json.loads)
        
        count_df = (
            processed_files.map(
                lambda ci: {
                    "np_id": ci["ci_id"].split("-")[0],
                    "year": ci["ci_id"].split("-")[1],
                    "issues": "-".join(ci["ci_id"].split("-")[:-1]),
                    "content_items_out": 1,
                    "avg_ocrqa": ci['ocrqa'],
                    "avg_ocrqa_per_lang_fd": {ci["lg"]: ci['ocrqa']},
                }
            )
            .to_dataframe(
                meta={
                    "np_id": str,
                    "year": str,
                    "issues": str,
                    "content_items_out": int,
                    "avg_ocrqa": float,
                    "avg_ocrqa_per_lang_fd": object
                }
            )
            .persist()
        )
        
        count_dfs[np_title] = count_df

In [6]:
break_now = False
for np_title, np_s3_files in ocrqa_s3_files.items():

    processed_files = db.read_text(
        np_s3_files, storage_options=IMPRESSO_STORAGEOPT
    ).map(json.loads)
    computed_stats = compute_manifest.compute_stats_for_stage(processed_files, stage)

    ocrqa_manifest = compute_manifest.add_stats_to_mft(ocrqa_manifest, np_title, computed_stats)
    if break_now:
        break
    break_now = True

None - Fetched all files, gathering desired information.
None - Finished grouping and aggregating stats by title and year.
None - Fetched all files, gathering desired information.
None - Finished grouping and aggregating stats by title and year.


In [7]:
ocrqa_manifest.compute(export_to_git_and_s3=False)

update_media_stats - Adding new key 1832-ACI.
update_media_stats - Adding new key 1880-AV.
update_media_stats - Adding new key 1881-AV.
update_media_stats - Adding new key 1886-AV.


In [8]:
ocrqa_manifest.manifest_data

{'mft_version': 'v0.0.1',
 'mft_generation_date': '2025-03-06 18:04:19',
 'mft_s3_path': 's3://42-processed-data-final/ocrqa/ocrqa-ocrqa-wp_v1.0.6_v1-0-0/ocrqa_v0-0-1.json',
 'input_mft_s3_path': 's3://22-rebuilt-final/rebuilt_v1-1-1.json',
 'input_mft_git_path': 'data-processing-versioning/data-preparation/rebuilt_v1-1-1.json',
 'code_git_commit': 'https://github.com/impresso/impresso-essentials/commit/4b44bc0a132051a2cc8bce3d0da9fbac8acdcdea',
 'model_id': '',
 'run_id': '',
 'media_list': [{'media_title': 'ACI',
   'last_modification_date': '2025-03-06 18:04:19',
   'update_type': 'addition',
   'update_level': 'title',
   'updated_years': [],
   'updated_fields': [],
   'code_git_commit': 'https://github.com/impresso/impresso-essentials/commit/4b44bc0a132051a2cc8bce3d0da9fbac8acdcdea',
   'media_statistics': [{'stage': 'ocrqa',
     'granularity': 'title',
     'element': 'ACI',
     'nps_stats': {'content_items_out': 139, 'issues': 1, 'avg_ocrqa': 0.92}},
    {'stage': 'ocrqa',
  

## 5. Aggregators for Doc Embeddings

In [24]:
manifest_config = {
    "data_stage": "embeddings-docs",
    "output_bucket": "42-processed-data-final/embeddings/docs/doc-embeddings_v1-1-0/",
    "input_bucket": "22-rebuilt-final",
    "git_repository": "/home/piconti/impresso-essentials",
    "newspapers": [],
    "temp_directory": "/scratch/piconti/impresso/git_temp_folder",
    "previous_mft_s3_path": "",
    "is_staging": True,
    "is_patch": False,
    "patched_fields": [],
    "push_to_git": False,
    "relative_git_path": "",
    "only_counting": False,
    "model_id": "emb-docs_gte-multilingual_v1.0.0",
    "run_id": "emb-docs_gte-multilingual_v1-1-0",
    "notes": "Debug manifest on the doc embeddings - would need to be recomputed with the correct git repo.",
    "file_extensions": "jsonl.bz2",
    "compute_altogether": False,
    "check_s3_archives": False
}

emb_doc_s3_path = "s3://42-processed-data-final/embeddings/docs/doc-embeddings_v1-1-0/"

In [25]:
repo = git.Repo(manifest_config["git_repository"])
stage = validate_stage(manifest_config["data_stage"])
stage

<DataStage.EMB_DOCS: 'embeddings-docs'>

In [26]:
emb_doc_manifest = DataManifest(
    data_stage=stage,
    s3_output_bucket=manifest_config["output_bucket"],
    s3_input_bucket=manifest_config["input_bucket"],
    git_repo=repo,
    temp_dir=manifest_config["temp_directory"],
    staging=manifest_config["is_staging"],
    is_patch=manifest_config["is_patch"],
    push_to_git=manifest_config["push_to_git"]
)
emb_doc_manifest

<impresso_essentials.versioning.data_manifest.DataManifest at 0x7fa005ed5430>

In [27]:
emb_doc_s3_files = compute_manifest.get_files_to_consider(manifest_config)
emb_doc_s3_files

Not checking for any corrupted S3 archives before launching the manifest computation. If you encounter any problems regarding the reading of the archives, please set `check_s3_archives` to True.


{'ACI': ['s3://42-processed-data-final/embeddings/docs/doc-embeddings_v1-1-0/ACI/ACI-1832.jsonl.bz2'],
 'AV': ['s3://42-processed-data-final/embeddings/docs/doc-embeddings_v1-1-0/AV/AV-1880.jsonl.bz2',
  's3://42-processed-data-final/embeddings/docs/doc-embeddings_v1-1-0/AV/AV-1881.jsonl.bz2',
  's3://42-processed-data-final/embeddings/docs/doc-embeddings_v1-1-0/AV/AV-1886.jsonl.bz2'],
 'BDC': ['s3://42-processed-data-final/embeddings/docs/doc-embeddings_v1-1-0/BDC/BDC-1839.jsonl.bz2'],
 'BLB': ['s3://42-processed-data-final/embeddings/docs/doc-embeddings_v1-1-0/BLB/BLB-1845.jsonl.bz2',
  's3://42-processed-data-final/embeddings/docs/doc-embeddings_v1-1-0/BLB/BLB-1846.jsonl.bz2',
  's3://42-processed-data-final/embeddings/docs/doc-embeddings_v1-1-0/BLB/BLB-1847.jsonl.bz2'],
 'BNN': ['s3://42-processed-data-final/embeddings/docs/doc-embeddings_v1-1-0/BNN/BNN-1885.jsonl.bz2',
  's3://42-processed-data-final/embeddings/docs/doc-embeddings_v1-1-0/BNN/BNN-1886.jsonl.bz2',
  's3://42-process

In [29]:
break_now = False
for np_title, np_s3_files in emb_doc_s3_files.items():

    processed_files = db.read_text(
        np_s3_files, storage_options=IMPRESSO_STORAGEOPT
    ).map(json.loads)
    computed_stats = compute_manifest.compute_stats_for_stage(processed_files, stage)

    emb_doc_manifest = compute_manifest.add_stats_to_mft(emb_doc_manifest, np_title, computed_stats)
    if break_now:
        break
    break_now = True

None - Fetched all files, gathering desired information.
None - Finished grouping and aggregating stats by title and year.
None - Fetched all files, gathering desired information.
None - Finished grouping and aggregating stats by title and year.


In [30]:
emb_doc_manifest.compute(export_to_git_and_s3=False)
emb_doc_manifest.manifest_data

update_media_stats - Adding new key 1832-ACI.
update_media_stats - Adding new key 1880-AV.
update_media_stats - Adding new key 1881-AV.
update_media_stats - Adding new key 1886-AV.


{'mft_version': 'v0.0.1',
 'mft_generation_date': '2025-03-06 18:35:34',
 'mft_s3_path': 's3://42-processed-data-final/embeddings/docs/doc-embeddings_v1-1-0/embeddings-docs_v0-0-1.json',
 'input_mft_s3_path': 's3://22-rebuilt-final/rebuilt_v1-1-1.json',
 'input_mft_git_path': 'data-processing-versioning/data-preparation/rebuilt_v1-1-1.json',
 'code_git_commit': 'https://github.com/impresso/impresso-essentials/commit/4b44bc0a132051a2cc8bce3d0da9fbac8acdcdea',
 'model_id': '',
 'run_id': '',
 'media_list': [{'media_title': 'ACI',
   'last_modification_date': '2025-03-06 18:35:34',
   'update_type': 'addition',
   'update_level': 'title',
   'updated_years': [],
   'updated_fields': [],
   'code_git_commit': 'https://github.com/impresso/impresso-essentials/commit/4b44bc0a132051a2cc8bce3d0da9fbac8acdcdea',
   'media_statistics': [{'stage': 'embeddings-docs',
     'granularity': 'title',
     'element': 'ACI',
     'nps_stats': {'content_items_out': 96, 'issues': 1}},
    {'stage': 'embeddi

## 6. Aggregators for Solr Text ingestion

In [39]:
manifest_config = {
    "data_stage": "solr-text-ingestion",
    "output_bucket": "71-ingestion-staging/solr-ingestion/rebuilt/",
    "input_bucket": "22-rebuilt-final",
    "git_repository": "/home/piconti/impresso-essentials",
    "newspapers": [],
    "temp_directory": "/scratch/piconti/impresso/git_temp_folder",
    "previous_mft_s3_path": "",
    "is_staging": True,
    "is_patch": False,
    "patched_fields": [],
    "push_to_git": False,
    "relative_git_path": "",
    "only_counting": False,
    "notes": "Debug manifest on the Solr text ingestion - would need to be recomputed with the correct git repo.",
    "file_extensions": "json.bz2",
    "compute_altogether": False,
    "check_s3_archives": False
}

solr_ing_s3_path = "s3://71-ingestion-staging/solr-ingestion/rebuilt/"

In [33]:
repo = git.Repo(manifest_config["git_repository"])
stage = validate_stage(manifest_config["data_stage"])
stage

<DataStage.SOLR_TEXT: 'solr-text-ingestion'>

In [35]:
solr_ing_manifest = DataManifest(
    data_stage=stage,
    s3_output_bucket=manifest_config["output_bucket"],
    s3_input_bucket=manifest_config["input_bucket"],
    git_repo=repo,
    temp_dir=manifest_config["temp_directory"],
    staging=manifest_config["is_staging"],
    is_patch=manifest_config["is_patch"],
    push_to_git=manifest_config["push_to_git"]
)
solr_ing_manifest

<impresso_essentials.versioning.data_manifest.DataManifest at 0x7fa0062ceed0>

In [40]:
solr_s3_files = compute_manifest.get_files_to_consider(manifest_config)
solr_s3_files

Not checking for any corrupted S3 archives before launching the manifest computation. If you encounter any problems regarding the reading of the archives, please set `check_s3_archives` to True.


{'ACI': ['s3://71-ingestion-staging/solr-ingestion/rebuilt/ACI/ACI-1832.json.bz2'],
 'AV': ['s3://71-ingestion-staging/solr-ingestion/rebuilt/AV/AV-1880.json.bz2',
  's3://71-ingestion-staging/solr-ingestion/rebuilt/AV/AV-1881.json.bz2',
  's3://71-ingestion-staging/solr-ingestion/rebuilt/AV/AV-1886.json.bz2'],
 'BDC': ['s3://71-ingestion-staging/solr-ingestion/rebuilt/BDC/BDC-1839.json.bz2'],
 'BLB': ['s3://71-ingestion-staging/solr-ingestion/rebuilt/BLB/BLB-1845.json.bz2',
  's3://71-ingestion-staging/solr-ingestion/rebuilt/BLB/BLB-1846.json.bz2',
  's3://71-ingestion-staging/solr-ingestion/rebuilt/BLB/BLB-1847.json.bz2'],
 'BNN': ['s3://71-ingestion-staging/solr-ingestion/rebuilt/BNN/BNN-1885.json.bz2',
  's3://71-ingestion-staging/solr-ingestion/rebuilt/BNN/BNN-1886.json.bz2',
  's3://71-ingestion-staging/solr-ingestion/rebuilt/BNN/BNN-1887.json.bz2',
  's3://71-ingestion-staging/solr-ingestion/rebuilt/BNN/BNN-1888.json.bz2',
  's3://71-ingestion-staging/solr-ingestion/rebuilt/BNN/

In [41]:
break_now = False
for np_title, np_s3_files in solr_s3_files.items():

    processed_files = db.read_text(
        np_s3_files, storage_options=IMPRESSO_STORAGEOPT
    ).map(json.loads)
    computed_stats = compute_manifest.compute_stats_for_stage(processed_files, stage)

    solr_ing_manifest = compute_manifest.add_stats_to_mft(solr_ing_manifest, np_title, computed_stats)
    if break_now:
        break
    break_now = True

None - Fetched all files, gathering desired information.
None - Finished grouping and aggregating stats by title and year.
None - Fetched all files, gathering desired information.
None - Finished grouping and aggregating stats by title and year.


In [43]:
solr_ing_manifest.compute(export_to_git_and_s3=False)
solr_ing_manifest.manifest_data

update_media_stats - Adding new key 1832-ACI.
update_media_stats - Adding new key 1880-AV.
update_media_stats - Adding new key 1881-AV.
update_media_stats - Adding new key 1886-AV.


{'mft_version': 'v0.0.1',
 'mft_generation_date': '2025-03-06 18:55:50',
 'mft_s3_path': 's3://71-ingestion-staging/solr-ingestion/rebuilt/solr-text-ingestion_v0-0-1.json',
 'input_mft_s3_path': 's3://22-rebuilt-final/rebuilt_v1-1-1.json',
 'input_mft_git_path': 'data-processing-versioning/data-preparation/rebuilt_v1-1-1.json',
 'code_git_commit': 'https://github.com/impresso/impresso-essentials/commit/4b44bc0a132051a2cc8bce3d0da9fbac8acdcdea',
 'model_id': '',
 'run_id': '',
 'media_list': [{'media_title': 'ACI',
   'last_modification_date': '2025-03-06 18:55:50',
   'update_type': 'addition',
   'update_level': 'title',
   'updated_years': [],
   'updated_fields': [],
   'code_git_commit': 'https://github.com/impresso/impresso-essentials/commit/4b44bc0a132051a2cc8bce3d0da9fbac8acdcdea',
   'media_statistics': [{'stage': 'solr-text-ingestion',
     'granularity': 'title',
     'element': 'ACI',
     'nps_stats': {'content_items_out': 142, 'issues': 1, 'ft_tokens': 24382}},
    {'stage