In [4]:
import os
import json
import signal
import logging
from typing import Callable, Iterable, Any
import pandas as pd
import re
import numpy as np
from dask import dataframe as dd
from dask import bag as db
from dask.distributed import Client, LocalCluster
# from dask_k8 import DaskCluster
from impresso_essentials.io.s3  import IMPRESSO_STORAGEOPT
from datetime import datetime, timedelta
from ast import literal_eval
import dask.config
from docopt import docopt
from impresso_essentials.utils  import init_logger
from tqdm import tqdm
from dask.bag import random as db_random

## Setup the dask cluster

In [5]:
memory_per_worker_gb = 28 #32
#cluster = LocalCluster(n_workers=12, threads_per_worker=2, memory_limit=f"{memory_per_worker_gb}GB")
cluster = LocalCluster(n_workers=10, threads_per_worker=1, memory_limit=f"{memory_per_worker_gb}GB")

Perhaps you already have a cluster running?
Hosting the HTTP server on port 42801 instead


In [6]:
client = cluster.get_client()
client

0,1
Connection method: Cluster object,Cluster type: distributed.LocalCluster
Dashboard: http://127.0.0.1:42801/status,

0,1
Dashboard: http://127.0.0.1:42801/status,Workers: 10
Total threads: 10,Total memory: 260.77 GiB
Status: running,Using processes: True

0,1
Comm: tcp://127.0.0.1:41759,Workers: 10
Dashboard: http://127.0.0.1:42801/status,Total threads: 10
Started: Just now,Total memory: 260.77 GiB

0,1
Comm: tcp://127.0.0.1:44393,Total threads: 1
Dashboard: http://127.0.0.1:33105/status,Memory: 26.08 GiB
Nanny: tcp://127.0.0.1:37591,
Local directory: /scratch/piconti/impresso/dask_tmp/dask-scratch-space/worker-qvbsllhg,Local directory: /scratch/piconti/impresso/dask_tmp/dask-scratch-space/worker-qvbsllhg

0,1
Comm: tcp://127.0.0.1:43587,Total threads: 1
Dashboard: http://127.0.0.1:38501/status,Memory: 26.08 GiB
Nanny: tcp://127.0.0.1:43895,
Local directory: /scratch/piconti/impresso/dask_tmp/dask-scratch-space/worker-m70og2vd,Local directory: /scratch/piconti/impresso/dask_tmp/dask-scratch-space/worker-m70og2vd

0,1
Comm: tcp://127.0.0.1:38327,Total threads: 1
Dashboard: http://127.0.0.1:41435/status,Memory: 26.08 GiB
Nanny: tcp://127.0.0.1:43727,
Local directory: /scratch/piconti/impresso/dask_tmp/dask-scratch-space/worker-g3i78jow,Local directory: /scratch/piconti/impresso/dask_tmp/dask-scratch-space/worker-g3i78jow

0,1
Comm: tcp://127.0.0.1:41077,Total threads: 1
Dashboard: http://127.0.0.1:46267/status,Memory: 26.08 GiB
Nanny: tcp://127.0.0.1:34385,
Local directory: /scratch/piconti/impresso/dask_tmp/dask-scratch-space/worker-9on19fgy,Local directory: /scratch/piconti/impresso/dask_tmp/dask-scratch-space/worker-9on19fgy

0,1
Comm: tcp://127.0.0.1:40185,Total threads: 1
Dashboard: http://127.0.0.1:39913/status,Memory: 26.08 GiB
Nanny: tcp://127.0.0.1:40711,
Local directory: /scratch/piconti/impresso/dask_tmp/dask-scratch-space/worker-8412y5t3,Local directory: /scratch/piconti/impresso/dask_tmp/dask-scratch-space/worker-8412y5t3

0,1
Comm: tcp://127.0.0.1:34531,Total threads: 1
Dashboard: http://127.0.0.1:36329/status,Memory: 26.08 GiB
Nanny: tcp://127.0.0.1:35915,
Local directory: /scratch/piconti/impresso/dask_tmp/dask-scratch-space/worker-x3wzrda_,Local directory: /scratch/piconti/impresso/dask_tmp/dask-scratch-space/worker-x3wzrda_

0,1
Comm: tcp://127.0.0.1:43797,Total threads: 1
Dashboard: http://127.0.0.1:33669/status,Memory: 26.08 GiB
Nanny: tcp://127.0.0.1:37731,
Local directory: /scratch/piconti/impresso/dask_tmp/dask-scratch-space/worker-e1rbltwl,Local directory: /scratch/piconti/impresso/dask_tmp/dask-scratch-space/worker-e1rbltwl

0,1
Comm: tcp://127.0.0.1:40449,Total threads: 1
Dashboard: http://127.0.0.1:33157/status,Memory: 26.08 GiB
Nanny: tcp://127.0.0.1:44367,
Local directory: /scratch/piconti/impresso/dask_tmp/dask-scratch-space/worker-qxmhddtq,Local directory: /scratch/piconti/impresso/dask_tmp/dask-scratch-space/worker-qxmhddtq

0,1
Comm: tcp://127.0.0.1:37023,Total threads: 1
Dashboard: http://127.0.0.1:35443/status,Memory: 26.08 GiB
Nanny: tcp://127.0.0.1:43329,
Local directory: /scratch/piconti/impresso/dask_tmp/dask-scratch-space/worker-s63xu9uf,Local directory: /scratch/piconti/impresso/dask_tmp/dask-scratch-space/worker-s63xu9uf

0,1
Comm: tcp://127.0.0.1:44591,Total threads: 1
Dashboard: http://127.0.0.1:44357/status,Memory: 26.08 GiB
Nanny: tcp://127.0.0.1:33191,
Local directory: /scratch/piconti/impresso/dask_tmp/dask-scratch-space/worker-q4412s6h,Local directory: /scratch/piconti/impresso/dask_tmp/dask-scratch-space/worker-q4412s6h


In [None]:
import dask.config
dask.config.set(temporary_directory='/scratch/piconti/impresso/dask_tmp')
dask.config.config

In [None]:
client.close()

In [None]:
client.shutdown()

## Functions

In [64]:
def format_passage_data(record: dict) -> dict:
    # ensure the record's columns correspond to the goal: with the title as the last one.
    # also add the necessary 

    if 'title' in record:
        record_keys = list(record.keys())
        if record_keys[-1] == 'title':
            # if 'title' is the last key, return the record as is.
            return record
        else:
            # otherwise modify the dict to place it at the end
            title = record['title']
            del record['title']
    else:
        title = ''

    if 'id' in record and 'begin' in record and 'end' in record:
        passages = "{}@{}:{}".format(record['id'], record['begin'], record['end'])
        #p_id = f"c{record['cluster']}-{passages}"
    else:
        passages = ''

    return {
        "id": f"c{record['cluster']}-{passages}",
        "begin": record['begin'],
        "ci_id": record['id'],
        "cluster_id": f"tr-all-v1-24-c{record['cluster']}",
        "date": record['date'],
        "end": record['end'],
        "pages": record['pages'],
        "cluster_size": record['size'],
        "text": record['text'],
        "title": title
    }
    

In [65]:
def remove_extra_cluster_cols(c_record: dict) -> dict:
    """del c_record['min_date']
    del c_record['max_date']
    del c_record['cluster_size']
    del c_record['newspapers']
    del c_record['passages']
    del c_record['doc_ids']"""
    return {
        'id': c_record['id'],
        'time_delta': c_record['time_delta'],
        'lexial_overlap': c_record['lexical_overlap']
    }

In [66]:
def unify_data(record: dict) -> dict:
    # ensure the record's columns correspond to the goal: with the title as the last one.
    if 'title' in record:
        record_keys = list(record.keys())
        if record_keys[-1] == 'title':
            # if 'title' is the last key, return the record as is.
            return record
        else:
            # otherwise modify the dict to place it at the end
            title = record['title']
            del record['title']
    else:
        title = ''
    
    record['title'] = title
    if 'id' in record and 'begin' in record and 'end' in record:
        record['passages'] = "{}@{}:{}".format(record['id'], record['begin'], record['end'])
    else:
        record['passages'] = ''
    
    return record

In [67]:
def add_cluster_info(p_record, clusters_bag):
    # find the cluster corresponding to this passage and add the corresponding info
    cluster_record = clusters_bag.filter(lambda x: x['id']==p_record["cluster_id"])#.compute() #.persist() #compute() instead?

    assert len(cluster_record) ==1, f"len(cluster_record) != 1 for {cluster_record}"

    p_record['cluster_lexical_overlap'] = cluster_record[0]['lexical_overlap']
    p_record['cluster_time_delta'] = cluster_record[0]['time_delta']

    # add here or reformat in another function?
    #p_record['ci_id'] = p_record['id']
    #p_record['id'] = f"c{cluster_num}-{p_record['passages']}"

    return p_record

## 1. Read in the data

In [53]:
input_bucket = "s3://41-processed-data-staging/text-reuse/text-reuse_v1-0-0"
run_prefix = "passim_output_run_2"
path_data = "passim_output_run_2/out.json/"
path_clusters = "passim_output_run_2/tr_clusters/"
s3_output_path = "passim_output_run_2/debug_tr_passages/"

In [10]:
clusters_data = db.read_text(
    f"{os.path.join(input_bucket, path_clusters)}*.jsonl.bz2", storage_options=IMPRESSO_STORAGEOPT
).map(json.loads).persist()#.compute()

In [54]:
passim_data = db.read_text(
    f"{os.path.join(input_bucket, path_data)}part-02*.json", storage_options=IMPRESSO_STORAGEOPT
).map(json.loads).persist()

### a. Minor check that the created clusters data is correct relative to previous results

In [7]:
clusters_data.take(1)

({'id': 'tr-all-v1-24-c467076',
  'min_date': '2001-10-05',
  'max_date': '2013-02-02',
  'cluster_size': 33,
  'time_delta': 4138,
  'newspapers': ['EXP', 'IMP'],
  'passages': ['IMP-2008-01-12-a-i0056@744:1245',
   'IMP-2011-02-19-a-i0044@787:1291',
   'EXP-2004-09-18-a-i0163@608:1302',
   'IMP-2007-09-08-a-i0084@640:1141',
   'IMP-2013-02-02-a-i0032@1111:1617',
   'IMP-2004-01-05-a-i0142@607:1075',
   'IMP-2004-09-18-a-i0144@977:1549',
   'EXP-2012-02-18-a-i0088@692:1305',
   'EXP-2010-11-13-a-i0102@787:1384',
   'EXP-2012-11-05-a-i0032@737:1311',
   'IMP-2003-09-13-a-i0034@1023:1718',
   'IMP-2005-01-21-a-i0087@1189:1806',
   'IMP-2008-11-08-a-i0186@776:1275',
   'EXP-2005-01-08-a-i0051@737:1370',
   'EXP-2004-01-03-a-i0116@588:1426',
   'EXP-2008-11-08-a-i0091@798:1369',
   'EXP-2003-09-27-a-i0132@558:1268',
   'EXP-2008-01-12-a-i0059@622:1272',
   'IMP-2010-02-22-a-i0058@824:1453',
   'IMP-2001-10-05-a-i0213@657:1310',
   'IMP-2010-11-13-a-i0076@796:1305',
   'EXP-2005-09-10-a-i0

In [8]:
clusters_data.count().compute()

8620946

In [16]:
clusters_data.pluck('cluster_size').mean().compute(), clusters_data.pluck('cluster_size').max().compute()

(2.6633012200749198, 42220)

In [13]:
sanity_check = clusters_data.filter(lambda x: "c8590277799" in x['id']).compute()
sanity_check 

[{'id': 'tr-all-v1-24-c8590277799',
  'min_date': '1994-10-15',
  'max_date': '1994-10-15',
  'cluster_size': 2,
  'time_delta': 0,
  'newspapers': ['GDL', 'JDG'],
  'passages': ['JDG-1994-10-15-a-i0086@357:937',
   'GDL-1994-10-15-a-i0256@2280:2810'],
  'doc_ids': ['GDL-1994-10-15-a-i0256', 'JDG-1994-10-15-a-i0086'],
  'lexical_overlap': 76.71232876712328}]

In [19]:
sanity_check = clusters_data.filter(lambda x: "tr-all-v1-24-c300" == x['id']).compute()
sanity_check 

[{'id': 'tr-all-v1-24-c300',
  'min_date': '1943-07-31',
  'max_date': '2002-05-04',
  'cluster_size': 7937,
  'time_delta': 21462,
  'newspapers': ['DTT',
   'EXP',
   'FZG',
   'GDL',
   'IMP',
   'JDG',
   'LCE',
   'LLE',
   'LSE',
   'NZZ'],
  'passages': ['FZG-1976-06-08-a-i0053@10:1967',
   'DTT-1975-11-06-a-i0090@26:1054',
   'LLE-1986-06-14-a-i0463@18:738',
   'LLE-1988-05-27-a-i0472@24:686',
   'FZG-1979-12-07-a-i0152@1414:3144',
   'LLE-1955-11-28-a-i0059@0:578',
   'LLE-1993-04-19-a-i0206@88:404',
   'LLE-1974-10-01-a-i0012@14:1168',
   'JDG-1964-04-30-a-i0054@1506:2418',
   'FZG-1993-07-31-a-i0075@859:2313',
   'JDG-1983-09-17-a-i0235@38:2282',
   'FZG-1984-10-06-a-i0017@1996:3369',
   'FZG-1984-10-06-a-i0017@3972:5242',
   'GDL-1974-12-16-a-i0009@48:1547',
   'LLE-1993-12-10-a-i0307@89:416',
   'FZG-1957-09-16-a-i0012@1321:1653',
   'JDG-1987-07-18-a-i0221@0:1769',
   'LLE-1983-12-22-a-i0268@22:433',
   'LLE-1989-03-23-a-i0469@0:850',
   'LSE-1945-01-13-a-i0005@10064:1024

In [21]:
sanity_check_c = clusters_data.filter(lambda x: x['cluster_size'] > 7000).compute()
sanity_check_c

[{'id': 'tr-all-v1-24-c300',
  'min_date': '1943-07-31',
  'max_date': '2002-05-04',
  'cluster_size': 7937,
  'time_delta': 21462,
  'newspapers': ['DTT',
   'EXP',
   'FZG',
   'GDL',
   'IMP',
   'JDG',
   'LCE',
   'LLE',
   'LSE',
   'NZZ'],
  'passages': ['FZG-1976-06-08-a-i0053@10:1967',
   'DTT-1975-11-06-a-i0090@26:1054',
   'LLE-1986-06-14-a-i0463@18:738',
   'LLE-1988-05-27-a-i0472@24:686',
   'FZG-1979-12-07-a-i0152@1414:3144',
   'LLE-1955-11-28-a-i0059@0:578',
   'LLE-1993-04-19-a-i0206@88:404',
   'LLE-1974-10-01-a-i0012@14:1168',
   'JDG-1964-04-30-a-i0054@1506:2418',
   'FZG-1993-07-31-a-i0075@859:2313',
   'JDG-1983-09-17-a-i0235@38:2282',
   'FZG-1984-10-06-a-i0017@1996:3369',
   'FZG-1984-10-06-a-i0017@3972:5242',
   'GDL-1974-12-16-a-i0009@48:1547',
   'LLE-1993-12-10-a-i0307@89:416',
   'FZG-1957-09-16-a-i0012@1321:1653',
   'JDG-1987-07-18-a-i0221@0:1769',
   'LLE-1983-12-22-a-i0268@22:433',
   'LLE-1989-03-23-a-i0469@0:850',
   'LSE-1945-01-13-a-i0005@10064:1024

In [23]:
def sort_passages(c):
    c['passages'] = sorted(c['passages'])
    return c

In [24]:
sanity_check_c = clusters_data.filter(lambda x: x['cluster_size'] > 7000).map(sort_passages).compute()
sanity_check_c

[{'id': 'tr-all-v1-24-c300',
  'min_date': '1943-07-31',
  'max_date': '2002-05-04',
  'cluster_size': 7937,
  'time_delta': 21462,
  'newspapers': ['DTT',
   'EXP',
   'FZG',
   'GDL',
   'IMP',
   'JDG',
   'LCE',
   'LLE',
   'LSE',
   'NZZ'],
  'passages': ['DTT-1957-01-12-a-i0022@1214:3074',
   'DTT-1957-01-12-a-i0022@525:1040',
   'DTT-1957-06-08-a-i0001@2807:3169',
   'DTT-1957-06-08-a-i0001@44:2728',
   'DTT-1957-08-31-a-i0032@10315:10880',
   'DTT-1957-08-31-a-i0032@7507:10271',
   'DTT-1957-09-14-a-i0012@609:1490',
   'DTT-1957-09-21-a-i0009@1909:2539',
   'DTT-1957-09-21-a-i0009@290:897',
   'DTT-1957-09-21-a-i0009@924:1754',
   'DTT-1957-10-12-a-i0023@0:551',
   'DTT-1957-10-12-a-i0023@1567:2243',
   'DTT-1957-10-12-a-i0023@2286:2997',
   'DTT-1957-10-12-a-i0023@3030:3537',
   'DTT-1958-01-18-a-i0019@1679:2449',
   'DTT-1958-01-18-a-i0019@207:972',
   'DTT-1958-01-18-a-i0019@2546:3003',
   'DTT-1958-03-01-a-i0138@171:1949',
   'DTT-1958-03-01-a-i0138@2562:3040',
   'DTT-195

### b. Create debug subsets of both datasets to prevent OOM errors while dev

In [55]:
passim_subset = db_random.sample(passim_data, k=27000).persist()

In [14]:
passim_sub_bag = client.gather(passim_subset)

## 2. Creating the index and passages data

### a. format the data and put it into dataframes

In [56]:
passages_sub_df = passim_subset.map(format_passage_data).to_dataframe().persist()
passages_sub_df.head()

Unnamed: 0,id,begin,ci_id,cluster_id,date,end,pages,cluster_size,text,title
0,c8590607949-JDG-1940-04-19-a-i0076@1107:1759,1107,JDG-1940-04-19-a-i0076,tr-all-v1-24-c8590607949,1940-04-19,1759,"[{'id': 'JDG-1940-04-19-a-p0008', 'seq': 8, 'r...",2,D'ailleurs il est possible (et on le souhaite ...,
1,c51540351932-IMP-1960-08-29-a-i0069@129:826,129,IMP-1960-08-29-a-i0069,tr-all-v1-24-c51540351932,1960-08-29,826,"[{'id': 'IMP-1960-08-29-a-p0007', 'seq': 7, 'r...",2,"PARIS, 29. — AFP — L'épave du Superconstellati...",Nouvelles de dernière heure
2,c25770487052-IMP-1941-03-07-a-i0005@0:565,0,IMP-1941-03-07-a-i0005,tr-all-v1-24-c25770487052,1941-03-07,565,"[{'id': 'IMP-1941-03-07-a-p0001', 'seq': 1, 'r...",2,Récupération des déchets en Qrande-Bretagne En...,Récupération des déchets en Qrande-Bretagne
3,c42950294638-oeuvre-1918-10-15-a-i0010@1050:2958,1050,oeuvre-1918-10-15-a-i0010,tr-all-v1-24-c42950294638,1918-10-15,2958,"[{'id': 'oeuvre-1918-10-15-a-p0002', 'seq': 2,...",2,"Un 11ttJt qui répand la terreur, mat que le ci...","Ce pelé, ce galeux..."
4,c423033-NZZ-1943-09-08-a-i0002@13598:14599,13598,NZZ-1943-09-08-a-i0002,tr-all-v1-24-c423033,1943-09-08,14599,"[{'id': 'NZZ-1943-09-08-a-p0002', 'seq': 2, 'r...",2,"Vosla, 7. Sept., g ( Agence Bulgare) ssine Rei...",


In [85]:
"""clusters_data = db.read_text(
    f"{os.path.join(input_bucket, path_clusters)}*.jsonl.bz2", storage_options=IMPRESSO_STORAGEOPT
).map(json.loads).persist()"""

clusters_df = clusters_data.map(remove_extra_cluster_cols).to_dataframe().set_index('id').persist()

In [17]:
clusters_df.head()

Unnamed: 0_level_0,time_delta,lexial_overlap
id,Unnamed: 1_level_1,Unnamed: 2_level_1
tr-all-v1-24-c0,2,43.877551
tr-all-v1-24-c1,2,50.0
tr-all-v1-24-c100,0,36.434109
tr-all-v1-24-c1000,3,54.83871
tr-all-v1-24-c10000,0,64.102564


In [18]:
clusters_df.columns

Index(['time_delta', 'lexial_overlap'], dtype='object')

### b. Join the two to integrate cluster info

In [86]:
joined_df = passages_sub_df.join(clusters_df, on='cluster_id').persist()
joined_df.head()

Unnamed: 0,id,begin,ci_id,cluster_id,date,end,pages,cluster_size,text,title,time_delta,lexial_overlap
96,c25770147833-IMP-2008-12-23-a-i0198@0:2668,0,IMP-2008-12-23-a-i0198,tr-all-v1-24-c25770147833,2008-12-23,2668,"[{'id': 'IMP-2008-12-23-a-p0023', 'seq': 23, '...",2,Les exportations japonaises s’effondrent CRISE...,Les exportations japonaises s’effondrent,0,100.0
100,c51539835874-JDG-1995-05-13-a-i0207@2242:2571,2242,JDG-1995-05-13-a-i0207,tr-all-v1-24-c51539835874,1995-05-13,2571,"[{'id': 'JDG-1995-05-13-a-p0031', 'seq': 31, '...",2,"Au Tibet avec Tintin, Centre des congrès Montr...",EXPOSITION Les adieux tibétains de Tintin à Mo...,0,93.103448
120,c25770496104-LLE-1969-09-05-a-i0095@3128:3381,3128,LLE-1969-09-05-a-i0095,tr-all-v1-24-c25770496104,1969-09-05,3381,"[{'id': 'LLE-1969-09-05-a-p0005', 'seq': 5, 'r...",2,Le parti conservateur et les votations des 13 ...,COMMISSION ROMANDE POUR LA LIGNE DU SIMP...,7864,27.777778
432,c801007-NV2-1878-08-07-a-i0003@2815:3252,2815,NV2-1878-08-07-a-i0003,tr-all-v1-24-c801007,1878-08-07,3252,"[{'id': 'NV2-1878-08-07-a-p0003', 'seq': 3, 'r...",2,du lit de la Broie. La commission reconnaît qu...,,1,28.813559
458,c51540113532-EXP-1999-02-13-a-i0039@0:1673,0,EXP-1999-02-13-a-i0039,tr-all-v1-24-c51540113532,1999-02-13,1673,"[{'id': 'EXP-1999-02-13-a-p0003', 'seq': 3, 'r...",2,1 er Mars La marche revient à la tradition Apr...,1er Mars La marche revient à la tradition,0,97.126437


In [57]:
def get_connected_clusters(passages_df):
    grouped_clusters = passages_df.groupby('ci_id')['cluster_id']
    conn_clusters = grouped_clusters.apply(list, meta=('connected_clusters', object)).persist()
    n_conn_clusters = grouped_clusters.apply('count', meta=('n_connected_clusters', int)).persist()
    conn_clusters_df = conn_clusters.to_frame()
    conn_clusters_df[n_conn_clusters.name] = n_conn_clusters
    conn_clusters_df = conn_clusters_df.reset_index()
    conn_clusters_df.columns = ['ci_id', 'connected_clusters', 'n_connected_clusters']
    return conn_clusters_df.astype({'ci_id': "string[pyarrow]"}).compute()


In [58]:
conn_clusters_df = get_connected_clusters(passages_sub_df)
print(conn_clusters_df.columns)
conn_clusters_df.head()



Index(['ci_id', 'connected_clusters', 'n_connected_clusters'], dtype='object')


Unnamed: 0,ci_id,connected_clusters,n_connected_clusters
0,BLB-1846-10-03-a-i0001,[tr-all-v1-24-c8590581047],1
1,BNN-1887-06-05-a-i0022,[tr-all-v1-24-c562386],1
2,BNN-1887-06-23-a-i0001,[tr-all-v1-24-c25770559776],1
3,BNN-1888-03-06-a-i0014,[tr-all-v1-24-c25770347169],1
4,BNN-1888-06-19-a-i0001,[tr-all-v1-24-c42949844571],1


In [89]:
conn_clusters_df.compute()

Unnamed: 0,ci_id,connected_clusters,n_connected_clusters
0,BDC-1839-03-21-a-i0004,[tr-all-v1-24-c272852],1
1,BLB-1847-01-30-a-i0003,[tr-all-v1-24-c42950428060],1
2,BLB-1847-08-28-a-i0003,[tr-all-v1-24-c53151],1
3,BNN-1887-01-06-a-i0001,[tr-all-v1-24-c42949847570],1
4,BNN-1887-04-06-a-i0026,[tr-all-v1-24-c34360385988],1
...,...,...,...
26890,waechtersauer-1863-09-27-a-i0002,[tr-all-v1-24-c8590272499],1
26891,waechtersauer-1863-11-03-a-i0004,[tr-all-v1-24-c42950050868],1
26892,waechtersauer-1864-03-10-a-i0002,[tr-all-v1-24-c249066],1
26893,waechtersauer-1864-09-29-a-i0017,[tr-all-v1-24-c8590516337],1


In [24]:
full_joined_df = joined_df.merge(conn_clusters_df, on='ci_id', how='left').persist()
full_joined_df.head()

Unnamed: 0,id,begin,ci_id,cluster_id,date,end,pages,cluster_size,text,title,time_delta,lexial_overlap,connected_clusters,n_connected_clusters
0,c85899588178-LSE-1940-12-30-a-i0067@3259:4019,3259,LSE-1940-12-30-a-i0067,tr-all-v1-24-c85899588178,1940-12-30,4019,"[{'id': 'LSE-1940-12-30-a-p0008', 'seq': 8, 'r...",3,"wagons d’objets perdus Paris, 29 décembre. (Ag...","Tout va très bien, Madame la marquise !",1,64.210526,[tr-all-v1-24-c85899588178],1
1,c8590332327-JDG-1993-01-19-a-i0038@673:1072,673,JDG-1993-01-19-a-i0038,tr-all-v1-24-c8590332327,1993-01-19,1072,"[{'id': 'JDG-1993-01-19-a-p0005', 'seq': 5, 'r...",3,ANGOLA Le CICR évacue la ville de Huambo Le mo...,PROCHE-ORIENT Trois Palestiniens abattus à Gaza,0,67.307692,[tr-all-v1-24-c8590332327],1
2,c51540059121-legaulois-1887-01-04-a-i0034@0:238,0,legaulois-1887-01-04-a-i0034,tr-all-v1-24-c51540059121,1887-01-04,238,"[{'id': 'legaulois-1887-01-04-a-p0004', 'seq':...",4,"Etude de H° Henry Mutel, avoué à Paris rue Sai...",Adv. 8 Page 4,1140,39.02439,[tr-all-v1-24-c51540059121],1
3,c25769855778-lematin-1930-11-25-a-i0029@51:374,51,lematin-1930-11-25-a-i0029,tr-all-v1-24-c25769855778,1930-11-25,374,"[{'id': 'lematin-1930-11-25-a-p0008', 'seq': 8...",3,Le commissaire du quartier Vendô- me a envoyé ...,Un mendiant était porteur de près de 25.000 fr...,0,43.902439,[tr-all-v1-24-c25769855778],1
4,c146029084093-IMP-1944-01-24-a-i0028@13223:13623,13223,IMP-1944-01-24-a-i0028,tr-all-v1-24-c146029084093,1944-01-24,13623,"[{'id': 'IMP-1944-01-24-a-p0003', 'seq': 3, 'r...",3,Sur proposition du comité de l'insigne sportif...,Chronique Sportive,3,79.069767,[tr-all-v1-24-c146029084093],1


In [25]:
def eval_str_lists(passage):
    passage['pages'] = literal_eval(passage['pages'])
    passage['connected_clusters'] = literal_eval(passage['connected_clusters'])
    #passage['doc_ids'] = literal_eval(passage['doc_ids'])
    
    return passage


full_passages_bag = full_joined_df.to_bag(format='dict').map(eval_str_lists).persist()
full_passages_bag.take(10)

({'id': 'c85899588178-LSE-1940-12-30-a-i0067@3259:4019',
  'begin': 3259,
  'ci_id': 'LSE-1940-12-30-a-i0067',
  'cluster_id': 'tr-all-v1-24-c85899588178',
  'date': '1940-12-30',
  'end': 4019,
  'pages': [{'id': 'LSE-1940-12-30-a-p0008',
    'seq': 8,
    'regions': [{'start': 3259,
      'length': 760,
      'coords': {'x': 543, 'y': 1498, 'w': 426, 'h': 376}}]}],
  'cluster_size': 3,
  'text': "wagons d’objets perdus\nParis, 29 décembre. (Ag.)\nAu cours du tragique exode de mai et juin der\xad\nniers, plus de 17,316 wagons d'objets divers\navaient disparu. C’est à Paris, sous l'immense\nhall de la gare des Batignolles, que toutes ces\nchoses précieuses ont peu à peu été regroupées\net classées. A journée faite, c'est dans le hall\nun incessant défilé de gens à la recherche de\nleurs biens. Sur 104,000 réclamations pour perte\nde bagages, 25,000 ont obtenu satisfaction. Cinq\ncentres fonctionnent encore à Paris dans le même\nbut et 27 autres sont ouverts dans les grandes\nvilles de 

In [30]:
passages_output_files = [
    f"{input_bucket}/{s3_output_path}{str(n).zfill(4)}.jsonl.bz2"
    for n in range(full_passages_bag.npartitions)
]
len(passages_output_files), passages_output_files[:10]

(87,
 ['s3://41-processed-data-staging/text-reuse/text-reuse_v1-0-0/passim_output_run_2/debug_tr_passages/0000.jsonl.bz2',
  's3://41-processed-data-staging/text-reuse/text-reuse_v1-0-0/passim_output_run_2/debug_tr_passages/0001.jsonl.bz2',
  's3://41-processed-data-staging/text-reuse/text-reuse_v1-0-0/passim_output_run_2/debug_tr_passages/0002.jsonl.bz2',
  's3://41-processed-data-staging/text-reuse/text-reuse_v1-0-0/passim_output_run_2/debug_tr_passages/0003.jsonl.bz2',
  's3://41-processed-data-staging/text-reuse/text-reuse_v1-0-0/passim_output_run_2/debug_tr_passages/0004.jsonl.bz2',
  's3://41-processed-data-staging/text-reuse/text-reuse_v1-0-0/passim_output_run_2/debug_tr_passages/0005.jsonl.bz2',
  's3://41-processed-data-staging/text-reuse/text-reuse_v1-0-0/passim_output_run_2/debug_tr_passages/0006.jsonl.bz2',
  's3://41-processed-data-staging/text-reuse/text-reuse_v1-0-0/passim_output_run_2/debug_tr_passages/0007.jsonl.bz2',
  's3://41-processed-data-staging/text-reuse/text-r

In [31]:
future = (
    full_passages_bag.map(json.dumps)
    .repartition(len(passages_output_files))
    .to_textfiles(passages_output_files, storage_options=IMPRESSO_STORAGEOPT)
)

## 3. Run all at once to see if still fails

In [112]:
def get_connected_clusters(passages_df):
    grouped_clusters = passages_df.groupby('ci_id')['cluster_id']
    conn_clusters = grouped_clusters.apply(list, meta=('connected_clusters', object)).persist()
    n_conn_clusters = grouped_clusters.apply('count', meta=('n_connected_clusters', int)).persist()
    conn_clusters_df = conn_clusters.to_frame()
    conn_clusters_df[n_conn_clusters.name] = n_conn_clusters
    conn_clusters_df = conn_clusters_df.reset_index()
    conn_clusters_df.columns = ['ci_id', 'connected_clusters', 'n_connected_clusters']
    return conn_clusters_df.astype({'ci_id': "string[pyarrow]"})

def eval_str_lists(passage):
    passage['pages'] = literal_eval(passage['pages'])
    passage['connected_clusters'] = literal_eval(passage['connected_clusters'])
    #passage['doc_ids'] = literal_eval(passage['doc_ids'])
    
    return passage

In [113]:
s3_bucket = "s3://41-processed-data-staging/text-reuse/text-reuse_v1-0-0"
run_prefix = "passim_output_run_2"
s3_passages_path = f"{s3_bucket}/passim_output_run_2/out.json/"
s3_clusters_path = f"{s3_bucket}/passim_output_run_2/tr_clusters/"
s3_output_path = f"{s3_bucket}/passim_output_run_2/debug_tr_passages/"
s3_passages_path,s3_clusters_path,s3_output_path

('s3://41-processed-data-staging/text-reuse/text-reuse_v1-0-0/passim_output_run_2/out.json/',
 's3://41-processed-data-staging/text-reuse/text-reuse_v1-0-0/passim_output_run_2/tr_clusters/',
 's3://41-processed-data-staging/text-reuse/text-reuse_v1-0-0/passim_output_run_2/debug_tr_passages/')

In [114]:
passim_data = db.read_text(
    f"{s3_passages_path}part-02*.json", storage_options=IMPRESSO_STORAGEOPT
).map(json.loads).persist() 

# when there is the subset, with split_every=100, all in one partition --> no error, if split_every is not defined --> error
passim_subset = db_random.sample(passim_data, k=27000).persist()

passages_data_df = passim_data.map(format_passage_data).to_dataframe().persist()
#passages_data_df = passim_subset.map(format_passage_data).to_dataframe().persist()
#passages_data_df = client.gather(passages_data_df)

### Creating 
print("Finished the loading data, getting the connected clusters.")

# fetch the information on the connected clusters and reformat them]
grouped_clusters = passages_data_df.groupby('ci_id')['cluster_id']
conn_clusters = grouped_clusters.apply(list, meta=('connected_clusters', object)).persist()
n_conn_clusters = grouped_clusters.apply('count', meta=('n_connected_clusters', int)).compute()
conn_clusters_df = conn_clusters.to_frame().compute()
conn_clusters_df[n_conn_clusters.name] = n_conn_clusters
conn_clusters_df = conn_clusters_df.reset_index()
conn_clusters_df.columns = ['ci_id', 'connected_clusters', 'n_connected_clusters']
conn_clusters_df = conn_clusters_df.astype({'ci_id': "string[pyarrow]"})

### Reading clusters data in memory
clusters_data = db.read_text(
    f"{s3_clusters_path}*.jsonl.bz2", storage_options=IMPRESSO_STORAGEOPT
).map(json.loads).persist()

clusters_df = clusters_data.map(remove_extra_cluster_cols).to_dataframe().set_index('id').persist()


print("Merging the fetched data with the passages..")
joined_df = passages_data_df.join(clusters_df, on='cluster_id')

print(f"joined_df.dtypes: {joined_df.dtypes}, conn_clusters_df.dtypes: {conn_clusters_df.dtypes}")

full_joined_df = joined_df.merge(conn_clusters_df, on='ci_id', how='left').persist()

print("Loading the dataframe back into bags and dumping it to files on S3.")

full_passages_bag = full_joined_df.to_bag(format='dict').map(eval_str_lists).persist()

passages_output_files = [
    f"{s3_output_path}{str(n).zfill(4)}.jsonl.bz2"
    for n in range(full_passages_bag.npartitions)
]

print("Writing the passages to S3 files.")

future = (
    full_passages_bag.map(json.dumps)
    .repartition(len(passages_output_files))
    .to_textfiles(passages_output_files, storage_options=IMPRESSO_STORAGEOPT)
)

Finished the loading data, getting the connected clusters.




2024-10-04 12:33:47,330 - tornado.application - ERROR - Exception in callback <bound method Worker.trigger_profile of <Worker 'tcp://127.0.0.1:41851', name: 0, status: running, stored: 167, running: 1/1, ready: 114, comm: 0, waiting: 0>>
Traceback (most recent call last):
  File "/scratch/piconti/.conda/envs/tr_postprocessing/lib/python3.11/site-packages/tornado/ioloop.py", line 937, in _run
    val = self.callback()
          ^^^^^^^^^^^^^^^
  File "/scratch/piconti/.conda/envs/tr_postprocessing/lib/python3.11/site-packages/distributed/worker.py", line 2445, in trigger_profile
    profile.process(
  File "/scratch/piconti/.conda/envs/tr_postprocessing/lib/python3.11/site-packages/distributed/profile.py", line 192, in process
    and (stop is None or not prev.f_code.co_filename.endswith(stop))
                             ^^^^^^^^^^^
AttributeError: 'str' object has no attribute 'f_code'


Merging the fetched data with the passages..
joined_df.dtypes: id                string[pyarrow]
begin                       int64
ci_id             string[pyarrow]
cluster_id        string[pyarrow]
date              string[pyarrow]
end                         int64
pages             string[pyarrow]
cluster_size                int64
text              string[pyarrow]
title             string[pyarrow]
time_delta                  int64
lexial_overlap            float64
dtype: object, conn_clusters_df.dtypes: ci_id                   string[pyarrow]
connected_clusters               object
n_connected_clusters              int64
dtype: object


This may cause some slowdown.
Consider loading the data with Dask directly
 or using futures or delayed objects to embed the data into the graph without repetition.
See also https://docs.dask.org/en/stable/best-practices.html#load-data-with-dask for more information.


Loading the dataframe back into bags and dumping it to files on S3.
Writing the passages to S3 files.


In [100]:
passim_data = db.read_text(
    f"{s3_passages_path}part-02*.json", storage_options=IMPRESSO_STORAGEOPT
).map(json.loads).persist() 

# when there is the subset, with split_every=100, all in one partition --> no error, if split_every is not defined --> error
passim_subset = db_random.sample(passim_data, k=27000).persist()

passages_data_df = passim_data.map(format_passage_data).to_dataframe().persist()
#passages_data_df = passim_subset.map(format_passage_data).to_dataframe().persist()
#passages_data_df = client.gather(passages_data_df)

### Creating 
print("Finished the loading data, getting the connected clusters.")

# fetch the information on the connected clusters and reformat them]
grouped_clusters = passages_data_df.groupby('ci_id')['cluster_id']
conn_clusters = grouped_clusters.apply(list, meta=('connected_clusters', object)).persist()
n_conn_clusters = grouped_clusters.apply('count', meta=('n_connected_clusters', int)).compute()
conn_clusters_df = conn_clusters.to_frame().compute()
conn_clusters_df[n_conn_clusters.name] = n_conn_clusters
conn_clusters_df = conn_clusters_df.reset_index()
conn_clusters_df.columns = ['ci_id', 'connected_clusters', 'n_connected_clusters']
conn_clusters_df = conn_clusters_df.astype({'ci_id': "string[pyarrow]"})
conn_clusters_df.head()

Finished the loading data, getting the connected clusters.


Unnamed: 0,ci_id,connected_clusters,n_connected_clusters
0,BNN-1887-11-15-a-i0017,[tr-all-v1-24-c17179974124],1
1,BNN-1891-06-05-a-i0039,"[tr-all-v1-24-c726188, tr-all-v1-24-c726189]",2
2,BNN-1892-09-10-a-i0003,[tr-all-v1-24-c42949732336],1
3,CDV-1844-05-04-a-i0004,"[tr-all-v1-24-c8590691942, tr-all-v1-24-c85906...",2
4,CDV-1850-03-16-a-i0014,"[tr-all-v1-24-c8590355618, tr-all-v1-24-c85903...",4


In [103]:
### Reading clusters data in memory
clusters_data = db.read_text(
    f"{s3_clusters_path}*.jsonl.bz2", storage_options=IMPRESSO_STORAGEOPT
).map(json.loads).persist()

clusters_df = clusters_data.map(remove_extra_cluster_cols).to_dataframe().set_index('id').persist()

print("Merging the fetched data with the passages..")
joined_df = passages_data_df.join(clusters_df, on='cluster_id')

Merging the fetched data with the passages..


In [104]:
full_joined_df = joined_df.merge(conn_clusters_df, on='ci_id', how='left').persist()

This may cause some slowdown.
Consider loading the data with Dask directly
 or using futures or delayed objects to embed the data into the graph without repetition.
See also https://docs.dask.org/en/stable/best-practices.html#load-data-with-dask for more information.


In [106]:
full_passages_bag = full_joined_df.to_bag(format='dict').map(eval_str_lists).persist()

passages_output_files = [
    f"{s3_output_path}{str(n).zfill(4)}.jsonl.bz2"
    for n in range(full_passages_bag.npartitions)
]

print("Writing the passages to S3 files.")

future = (
    full_passages_bag.map(json.dumps)
    .repartition(len(passages_output_files))
    .to_textfiles(passages_output_files, storage_options=IMPRESSO_STORAGEOPT)
)

Writing the passages to S3 files.


In [123]:
passages_output_files

['passim_output_run_2/debug_tr_passages/0000.jsonl.bz2',
 'passim_output_run_2/debug_tr_passages/0001.jsonl.bz2',
 'passim_output_run_2/debug_tr_passages/0002.jsonl.bz2',
 'passim_output_run_2/debug_tr_passages/0003.jsonl.bz2',
 'passim_output_run_2/debug_tr_passages/0004.jsonl.bz2',
 'passim_output_run_2/debug_tr_passages/0005.jsonl.bz2',
 'passim_output_run_2/debug_tr_passages/0006.jsonl.bz2',
 'passim_output_run_2/debug_tr_passages/0007.jsonl.bz2',
 'passim_output_run_2/debug_tr_passages/0008.jsonl.bz2',
 'passim_output_run_2/debug_tr_passages/0009.jsonl.bz2',
 'passim_output_run_2/debug_tr_passages/0010.jsonl.bz2',
 'passim_output_run_2/debug_tr_passages/0011.jsonl.bz2',
 'passim_output_run_2/debug_tr_passages/0012.jsonl.bz2',
 'passim_output_run_2/debug_tr_passages/0013.jsonl.bz2',
 'passim_output_run_2/debug_tr_passages/0014.jsonl.bz2',
 'passim_output_run_2/debug_tr_passages/0015.jsonl.bz2',
 'passim_output_run_2/debug_tr_passages/0016.jsonl.bz2',
 'passim_output_run_2/debug_tr_

In [None]:
conn_clusters_df.compute()

In [121]:
passim_data = db.read_text(
    f"{s3_passages_path}part-02*.json", storage_options=IMPRESSO_STORAGEOPT
).map(json.loads).persist() 

passim_subset = db_random.sample(passim_data, k=27000, split_every=100).persist()

passages_data_df = passim_data.map(format_passage_data).to_dataframe().persist()

Task exception was never retrieved
future: <Task finished name='Task-11133068' coro=<Client._gather.<locals>.wait() done, defined at /scratch/piconti/.conda/envs/tr_postprocessing/lib/python3.11/site-packages/distributed/client.py:2385> exception=AllExit()>
Traceback (most recent call last):
  File "/scratch/piconti/.conda/envs/tr_postprocessing/lib/python3.11/site-packages/distributed/client.py", line 2394, in wait
    raise AllExit()
distributed.client.AllExit


KeyboardInterrupt: 

## 4. Create the CI-cluster index from the passages

In [8]:
input_bucket = "s3://41-processed-data-staging/text-reuse/text-reuse_v1-0-0"
run_prefix = "passim_output_run_2"
path_data = "passim_output_run_2/out.json/"
path_clusters = "passim_output_run_2/tr_clusters/"
passges_path = f"{input_bucket}/passim_output_run_2/tr_passages/"
output_index_path = f"{input_bucket}/passim_output_run_2/tr_doc_cluster_index_corrected/"

In [9]:
passages_data = db.read_text(
    f"{passges_path}*.jsonl.bz2", storage_options=IMPRESSO_STORAGEOPT
).map(json.loads).persist()

In [10]:
passages_data.count().compute()

22960176

In [11]:
passages_data.take(1)

({'id': 'c101-oecaen-1940-10-16-a-i0062@759:1352',
  'begin': 759,
  'ci_id': 'oecaen-1940-10-16-a-i0062',
  'cluster_id': 'tr-all-v1-24-c101',
  'date': '1940-10-16',
  'end': 1352,
  'pages': [{'id': 'oecaen-1940-10-16-a-p0004',
    'seq': 4,
    'regions': [{'start': 759,
      'length': 593,
      'coords': {'x': 2405, 'y': 1707, 'w': 549, 'h': 407}}]}],
  'cluster_size': 217,
  'text': 'N. 5% 38. 100,90.\nObligations Trésor 4 1/2 1933. 925 ;\nobligations Trésor 4% 1934. 920 ; obli-\ngations Trésor 5% 1935. 980 ; obliga-\ntions Trésor 4% 1936 B. 130 ; Outil-\nlage National 4 1/2 1932, 900 ; P. T. T.\n4 1/2 1929, 503 ; P. T. T. 5% 1934-35,\n995 ; P. T. T. 6% 1938, 1016 ; Caisse\nAutonome 4 1/2 29. 857.\nBons du Trésor 5% 1934, 1005 ; bons\ndu Trésor 4 1/2 1934, 985 ; bons du\nTrésor 4% 1935, 1035 ; bons du Trésor\n5% septembre 1937, 1005 ; oons du\nTrésor 5% décembre 1937. 1026 : bons\ndu Trésor 5 1/2 1938. 1020 ; Crédit Na-\ntional 5% 1919, 515 ; Crédit National\n5% 1920. 502 ; Cré

In [12]:
docs_clusters = passages_data.map(lambda x: {'docid': x['ci_id'], 'clusters': x['cluster_id']}).to_dataframe().persist()
docs_clusters.head()

Unnamed: 0,docid,clusters
0,oecaen-1940-10-16-a-i0062,tr-all-v1-24-c101
1,oecaen-1940-11-15-a-i0104,tr-all-v1-24-c101
2,oecaen-1940-11-16-a-i0081,tr-all-v1-24-c101
3,oecaen-1940-11-27-a-i0030,tr-all-v1-24-c101
4,oecaen-1940-11-28-a-i0080,tr-all-v1-24-c101


In [13]:
index_df = docs_clusters.groupby('docid')['clusters'].apply(list, meta=('clusters', object)).reset_index().persist()
index_df.head()



2024-11-05 16:20:05,222 - tornado.application - ERROR - Exception in callback <bound method Worker.trigger_profile of <Worker 'tcp://127.0.0.1:37023', name: 8, status: running, stored: 1498, running: 1/1, ready: 155, comm: 0, waiting: 0>>
Traceback (most recent call last):
  File "/scratch/piconti/.conda/envs/tr_postprocessing/lib/python3.11/site-packages/tornado/ioloop.py", line 937, in _run
    val = self.callback()
          ^^^^^^^^^^^^^^^
  File "/scratch/piconti/.conda/envs/tr_postprocessing/lib/python3.11/site-packages/distributed/worker.py", line 2445, in trigger_profile
    profile.process(
  File "/scratch/piconti/.conda/envs/tr_postprocessing/lib/python3.11/site-packages/distributed/profile.py", line 192, in process
    and (stop is None or not prev.f_code.co_filename.endswith(stop))
                             ^^^^^^^^^^^
AttributeError: 'str' object has no attribute 'f_code'


Unnamed: 0,docid,clusters
0,CDV-1854-01-15-a-i0003,"[tr-all-v1-24-c120259141200, tr-all-v1-24-c773..."
1,CON-1903-01-28-a-i0026,"[tr-all-v1-24-c180389032825, tr-all-v1-24-c163..."
2,DTT-1940-07-12-a-i0033,"[tr-all-v1-24-c103079566256, tr-all-v1-24-c103..."
3,DTT-1941-02-21-a-i0017,[tr-all-v1-24-c137439764294]
4,DTT-1941-10-10-a-i0033,"[tr-all-v1-24-c8589969750, tr-all-v1-24-c85899..."


In [14]:
index_df = index_df.repartition(npartitions=50)

In [18]:
index_output_files = [
    f"{output_index_path}tr-doc-clusters-{str(n).zfill(4)}.jsonl.bz2"
    for n in range(index_df.npartitions)
]
index_output_files[-10:]

['s3://41-processed-data-staging/text-reuse/text-reuse_v1-0-0/passim_output_run_2/tr_doc_cluster_index_corrected/tr-doc-clusters-0040.jsonl.bz2',
 's3://41-processed-data-staging/text-reuse/text-reuse_v1-0-0/passim_output_run_2/tr_doc_cluster_index_corrected/tr-doc-clusters-0041.jsonl.bz2',
 's3://41-processed-data-staging/text-reuse/text-reuse_v1-0-0/passim_output_run_2/tr_doc_cluster_index_corrected/tr-doc-clusters-0042.jsonl.bz2',
 's3://41-processed-data-staging/text-reuse/text-reuse_v1-0-0/passim_output_run_2/tr_doc_cluster_index_corrected/tr-doc-clusters-0043.jsonl.bz2',
 's3://41-processed-data-staging/text-reuse/text-reuse_v1-0-0/passim_output_run_2/tr_doc_cluster_index_corrected/tr-doc-clusters-0044.jsonl.bz2',
 's3://41-processed-data-staging/text-reuse/text-reuse_v1-0-0/passim_output_run_2/tr_doc_cluster_index_corrected/tr-doc-clusters-0045.jsonl.bz2',
 's3://41-processed-data-staging/text-reuse/text-reuse_v1-0-0/passim_output_run_2/tr_doc_cluster_index_corrected/tr-doc-clus

In [19]:
index_future = (
    index_df
    .to_bag(format='dict')
    .map(json.dumps)
    .repartition(len(index_output_files))
    .to_textfiles(index_output_files, storage_options=IMPRESSO_STORAGEOPT)
)