In [None]:
import tarfile
from functools import reduce 
import io
import sys
from os.path import join
import pandas as pd

DHLABSRV_DATA_FOLDER = 'from_dhlabsrv4_iiif/'

def capture_output(func, *args, **kwargs):
    # Create a StringIO object to capture the output
    captured_output = io.StringIO()
    # Save the original sys.stdout
    original_stdout = sys.stdout
    try:
        # Redirect sys.stdout to the StringIO object
        sys.stdout = captured_output
        # Call the function
        func(*args, **kwargs)
    finally:
        # Restore the original sys.stdout
        sys.stdout = original_stdout
    # Get the captured output as a string
    return captured_output.getvalue()

# trying to infer the path of the images related to the sommarioni text entries
tarf = tarfile.open(join(DHLABSRV_DATA_FOLDER, 'sommmarioni_links_map_iiif_dhlabsrv4.tar.gz'))
csv_files = [v for v in capture_output(tarf.list, verbose=False).replace(' ', '').split('\n') if v.endswith('csv')]
f = tarf.extractfile(tarf.getmember(csv_files[0]))
all_csv_points = reduce(lambda a,b: pd.concat([a, pd.read_csv(tarf.extractfile(tarf.getmember(b)), names=['parcel_number', 'pages', 'uid1', 'geojson', 'id2'])]), csv_files, pd.DataFrame())
all_csv_points['page_name'] = all_csv_points['pages'].str.replace('sommarioni/', '').str.replace('pages/','').str.replace('.json','')

In [None]:
from typing import Optional
with open(join(DHLABSRV_DATA_FOLDER, 'sommarioni_pages_name.txt'), 'r') as f:
    files = [v.replace('\n','').replace('.jpg', '').replace('./', '') for v in f.readlines()]

all_pages = set(files)
reference_pages = set(all_csv_points['page_name'].unique())
sorted(list(all_pages.difference(reference_pages)))

ALL_ORDERED_PAGES = sorted(all_pages)

def guess_the_missing_parcel_number_range(page_label:str, reference_list: dict[str, tuple[int,int]]) -> Optional[tuple[int,int]]:
    '''
    '''
    if not page_label in reference_list:
        idx = ALL_ORDERED_PAGES.index(page_label)    
        if idx == 0 or idx == len(ALL_ORDERED_PAGES) - 1 :
            return None
        prev_in_list = ALL_ORDERED_PAGES[idx-1]
        next_in_list = ALL_ORDERED_PAGES[idx+1]
        try:
            if prev_in_list in reference_list and next_in_list in reference_list:
            # we have a single hole in the list we can fill, let's fill it. 
                min_range = reference_list[prev_in_list][1] + 1 
                max_range = reference_list[next_in_list][0] - 1
                return [min_range, max_range]
        except Exception as e:
            print(page_label, e)
        return None  
    return None  

pages_nmbr = all_csv_points.groupby('page_name').agg(lambda g: (min(g), max(g)))['parcel_number'].to_dict()
pn_df = pd.DataFrame(ALL_ORDERED_PAGES, columns=['fn']).set_index('fn')
pn_df['truth_pn'] = pages_nmbr
pn_df = pn_df.reset_index()
pn_df['guess_pn'] = pn_df['fn'].apply(lambda v: guess_the_missing_parcel_number_range(v, pages_nmbr))

In [None]:

from IPython.display import Image

from PIL import Image
import os
from pathlib import Path
# a noter, les 5 premières images peuvent être skippées 
MIN = 1
MAX = 15700
ref_dict = pn_df.set_index('fn')['truth_pn'].fillna('').to_dict()
def get_possible_range_for_img(img_name:str, d:dict) -> tuple[int,int]:

    img_name = img_name.replace('.jpg', '')
    itms = list(d.items())
    if img_name in d:
        idx = list(d.keys()).index(img_name)
    else:
        raise Exception(f'{img_name} not found in the reference set of value')
    max = MAX
    for i in range(idx, len(d)-1):
        hit = itms[i+1][1]
        if hit:
            max = hit[0] 
            break
    
    min = MIN
    for i in range(0, idx)[::-1]:
        hit = itms[i][1]
        if hit:
            min = hit[1] # on prend le min du tuple, pour avoir un buffer
            break
    if min > max:
        min, max = MIN, MAX
    return (min, max)


def fetch_corner_of_image_and_down_sample(img: Image, 
                                          top_corner:bool = True, 
                                          crop_factor:float = .33, 
                                          downsample_factor: float = .5):
    """
    Get the top or bottom corner of an image according to the crop factor (expressed as a fraction of the image width)
    Downscale the resulting JPEG image by a certain factor.
    
    Args:
        input_path (str): Path to the input JPEG image.
        output_path (str): Path to save the downscaled image.
        factor (float): Downscaling factor (e.g., 0.5 for half size, 0.25 for quarter size, etc.).
    """
    # Calculate new dimensions
    width, height = img.size

    crop_w = int(width*crop_factor) 

    crop_coords = (0, 0, crop_w, crop_w) if top_corner else (0, height-crop_w, crop_w, height)
    
    new_width = int(crop_w * downsample_factor)
    new_width = new_width if new_width > 512 else 512
    # Resize the image
    resized_img = img.crop(crop_coords).resize((new_width, new_width), Image.LANCZOS)
    return resized_img

 

for img in Path(DHLABSRV_DATA_FOLDER).rglob('*.jpg'):
    img_obj = Image.open(img)
    sample_img1 = fetch_corner_of_image_and_down_sample(img_obj,crop_factor=.5, downsample_factor=.1, top_corner=False) 
    sample_img2 = fetch_corner_of_image_and_down_sample(img_obj,crop_factor=.5, downsample_factor=.1, top_corner=True) 
    min, max = get_possible_range_for_img(str(img), ref_dict)
    print(img)
    print(min, max)
    display(sample_img2)
    display(sample_img1)

In [None]:
# gpt_attempt
from langchain_core.prompts import PromptTemplate
from langchain_core.output_parsers import CommaSeparatedListOutputParser
import base64
from io import BytesIO

output_parser = CommaSeparatedListOutputParser()
format_instructions = output_parser.get_format_instructions()

top_corner_identify_prompt = PromptTemplate.from_template(
    template='On this scan of an archival document, there is a column with the header reading "della Mappa". Knowing they are manually written and in a sequential order, within the range of number from {min} to {max}, what are the values appearing under this colum, reading it top to bottom? Answer 0 in case no values are found, or if a value is out of the proposed range. \n{format_instructions}',
    partial_variables={"format_instructions": format_instructions},
)

bottom_corner_identify_prompt = PromptTemplate.from_template(
    template='On this scan of an archival document, knowing they are manually written and in a sequential order, within the range of number from {min} to {max}, reading from top to bottom, what are the values appearing the left-most column holding numbers? Answer 0 in case no value are found, or if a value is out of the proposed range\n{format_instructions}',
    partial_variables={"format_instructions": format_instructions},
)


def generate_gpt_img_msg(prompt:str, image:Image) -> list[dict]:
    buffered = BytesIO()
    image.save(buffered, format="JPEG")
    base64_image = base64.b64encode(buffered.getvalue()).decode('utf-8')
    return [
        {
        "role": "user",
        "content": [
            {"type": "text", "text": prompt},
            {
            "type": "image_url",
            "image_url": {
                "url": f"data:image/jpeg;base64,{base64_image}",
                "detail": "low"
            },
            },
        ],
        }
    ]

from langchain_openai import ChatOpenAI
from langchain_core.messages import HumanMessage
import os


with open('openaiapikey.txt', 'r') as f: 
    cont = f.readlines()

params = dict((v.replace('\n','').split('=')) for v in cont)

llm = ChatOpenAI(
  openai_organization=params['oraganization_id'],
  openai_api_key=params['api_key'],
  model='gpt-4o'
)

# prompt_and_model = prompt | model
# output = prompt_and_model.invoke({"query": "Tell me a joke."})
# parser.invoke(output)


t_img = 'from_dhlabsrv4_iiif/0006.jpg'

for img in Path(DHLABSRV_DATA_FOLDER).rglob('*.jpg'):
    min, max = get_possible_range_for_img(str(img), ref_dict)
    img_top = fetch_corner_of_image_and_down_sample(img,crop_factor=.5, downsample_factor=.1, top_corner=True) 
    msg_top = generate_gpt_img_msg(top_corner_identify_prompt.format(min=min, max=max), img_top)
    display(img_top)
    print(llm.invoke(msg_top).content)
    img_bottom = fetch_corner_of_image_and_down_sample(img,crop_factor=.5, downsample_factor=.1, top_corner=False) 
    msg_bottom = generate_gpt_img_msg(bottom_corner_identify_prompt.format(min=min, max=max), img_bottom)
    display(img_bottom)
    print(llm.invoke(msg_bottom).content)


In [None]:
import paramiko
from tqdm.notebook import tqdm
import json
ALL_ORDERED_PAGES
cache_fp = 'gpt_page_reads_and_res.json'
res_dict = {v: None for v in ALL_ORDERED_PAGES if not v.endswith('0001') and not v.endswith('0002')}
if os.path.exists(cache_fp):
    with open(cache_fp, 'r') as f:
        res_dict = json.load(f)

client = paramiko.SSHClient()
client.load_system_host_keys()
client.connect('cdhvm0003.xaas.epfl.ch', username='viaccoz')
stdin, stdout, stderr = client.exec_command('ls -l')
sftp_client = client.open_sftp()
for v in tqdm(res_dict.keys()):
    if not res_dict[v]:
        try:
            
            min, max = get_possible_range_for_img(str(v), ref_dict)
            remote_file = sftp_client.open(f'/mnt/ltm/data/venice/sommarioni/registry/{v}.jpg')
            img = Image.open(remote_file)
            img_top = fetch_corner_of_image_and_down_sample(img,crop_factor=.5, downsample_factor=.1, top_corner=True) 
            msg_top = generate_gpt_img_msg(top_corner_identify_prompt.format(min=min, max=max), img_top)
            top_res = llm.invoke(msg_top).content
            img_bottom = fetch_corner_of_image_and_down_sample(img,crop_factor=.5, downsample_factor=.1, top_corner=False) 
            msg_bottom = generate_gpt_img_msg(bottom_corner_identify_prompt.format(min=min, max=max), img_bottom)
            bottom_res = llm.invoke(msg_bottom).content
            res_dict[v] = (top_res, bottom_res)
        except Exception as e:
            print(e)
        finally:
            with open(cache_fp, 'w+') as f:
                json.dump(res_dict, f, indent=2)
            remote_file.close()

# Heurisics in order to validate or not the missing results

1. Il y a 40 lignes par page du Sommarioni. Il y a toujours au minimum 2 lignes par entrée, donc Max d'entrée par ligne : 20. Toute pages dont la rangée s'étend à plus que 20 => loin. 
2. Enlever toutes les entrées qui sont simplement 0 
3. enlever les entrées non numérique
4. Dès que le min du min range est plus grand que le max du max range => poubelle. 
    * (4.5) tester aussi max(minr) > min(maxr) comme règle de nettoyage _j'ai testé, c'est trop radical, ça enlève des bons cas ou juste chatgpt a overshoot sans que les min max soient faux_.
5. tester que max(maxr) - min(minr) < 20

In [1]:
import json 
import pandas as pd
from ast import literal_eval
import numpy as np

def list_vals_to_list(v):
    try:
        return literal_eval('['+v+']')
    except Exception as e:
        print(v)
        return None
    
def has_any_zero(l:list)->bool:
    return any([v == 0 for v in l])

def infer_missing_range(vals: list[list[int]], step_counter=1) -> list[list[int]]:
    vals_to_fill = vals.copy()
    # to keep a trace of which values were inferred.
    for i in range(1, len(vals)-1):
        curr_min = vals[i][1]
        try:
            prev_max = vals[i-1][2]
            next_min = vals[i+1][1]
            if np.isnan(curr_min) and prev_max != 0 and next_min != 0:
                if prev_max and next_min and prev_max < next_min:
                    vals_to_fill[i][1] = prev_max + 1
                    vals_to_fill[i][2] = next_min - 1
                    vals_to_fill[i][3] = "By Bounds "+str(step_counter)
        except Exception as e:
            print(vals[i], e)
    return vals_to_fill

In [None]:


cache_fp = 'gpt_page_reads_and_res.json'

with open(cache_fp, 'r') as f:
    res_dict = json.load(f)



#res_dict = {[k,list_vals_to_list(v[0]),list_vals_to_list(v[1])] for k, v in res_dict.items()}
page_reads = {}
for t,v in res_dict.items():
    if type(v) == list:
        page_reads[t] = [list_vals_to_list(v[0]),list_vals_to_list(v[1])]
    else:
        page_reads[t] = None

df = pd.DataFrame(columns=['page', 'minr', 'maxr'], data=[[v1, v2[0], v2[1]] for v1, v2 in page_reads.items() if v2])

# removing all zeros entries:
df_no_zeros = df[df.apply(lambda v: not has_any_zero(v['minr']) and not has_any_zero(v['maxr']) , axis=1)]
print('Removing 0 entries:', len(df_no_zeros))
df_less_than_20 = df_no_zeros[df_no_zeros.apply(lambda v: len(v['minr'])+len(v['maxr']) < 20, axis=1)]
print('Removing more than twenty values:', len(df_less_than_20))
df_numeric = df_less_than_20[df_less_than_20.apply(lambda v: all([type(vv) == int for vv in v['minr'] + v['maxr']]), axis=1)]
print('Removing entries no numerics:', len(df_numeric))
df_extreme_mins = df_numeric[df_numeric.apply(lambda v: min(v['minr']) < max(v['maxr']), axis=1)]
print('Removing when the min(minr) > max(maxr):', len(df_extreme_mins))
# df_inclusive_mins = df_extreme_mins[df_extreme_mins.apply(lambda v: max(v['minr']) < min(v['maxr']), axis=1)]
# print('Removing when the max(minr) > min(maxr):', len(df_inclusive_mins))
df_range_less_20 = df_extreme_mins[df_extreme_mins.apply(lambda v: max(v['maxr']) - min(v['minr']) < 20, axis=1)]
print('Removing when the max(maxr) - min(minr) < 20:', len(df_range_less_20))
df_range_less_20.to_csv('gpt_page_reads_filtered.csv', index=False)

df_extreme_vals_less_20 = df_extreme_mins[df_extreme_mins.apply(lambda v: v['maxr'][-1] - v['minr'][0] < 20, axis=1)]
print('Instead when the maxr[-1] - minr[0] < 20:', len(df_extreme_vals_less_20))
df_filtered = df_extreme_vals_less_20.copy()

In [None]:
# completing missing page numbers in the case we have max-min values for page n and n+2, but not n+1, 
#  and that way infer the missing range for them

import numpy as np
df_filtered['min'] = df_filtered['minr'].apply(lambda v: min(v))
df_filtered['max'] = df_filtered['maxr'].apply(lambda v: max(v))

df_aug = df[['page']].merge(df_filtered[['page', 'min', 'max']], on='page', how='left')
df_aug['inferred'] = "GPT"
vals = df_aug[['page', 'min', 'max', 'inferred']].values

print(len(df_aug[~df_aug['min'].isna()]))
df_aug = pd.DataFrame(infer_missing_range(vals), columns=['page', 'min', 'max', 'inferred'])
print(len(df_aug[~df_aug['min'].isna()]))
df_aug[~df_aug['min'].isna()]
df_aug['min'] = df_aug['min'].apply(lambda v: int(v) if v and not np.isnan(v) else None)
df_aug['max'] = df_aug['max'].apply(lambda v: int(v) if v and not np.isnan(v) else None)
# df_aug.to_csv('gpt_page_reads_augmented.csv', index=False)

In [5]:
# now keeping only the pages without min max that are after OR before a page with the values entered.
FMT_val = 'https://image-timemachine.epfl.ch/iiif/3/venice%2Fsommarioni%2Fregistry%2F{reg}%2F{pg}.jpg/full/max/0/default.jpg'
def page_range_to_iiif(page_range: tuple[int,int]) -> str:
    reg, nmb = page_range.split('/')
    return FMT_val.format(reg=reg, pg=nmb)

vals_to_fill = []
df_inferred = df_aug.copy()
for i in range(2, len(df_aug.values)-1):
    if np.isnan(df_inferred.iloc[i]['min']):
        if not np.isnan(df_inferred.iloc[i-1]['min']):
            vals_to_fill.append([df_aug.iloc[i]['page'], page_range_to_iiif(df_aug.iloc[i]['page']), None, None])
        elif not np.isnan(df_inferred.iloc[i+1]['min']) and not np.isnan(df_inferred.iloc[i-2]['min']):
            vals_to_fill.append([df_aug.iloc[i]['page'], page_range_to_iiif(df_aug.iloc[i]['page']), None, None])
            
#pd.DataFrame(vals_to_fill, columns=['page', 'iiif_link', 'min', 'max']).to_csv('gpt_page_reads_to_fill.csv', index=False)

In [None]:

filled_manually = pd.read_csv('manual_labeling/gpt_page_reads_gap_manual_fills.csv')
filled_manually['inferred'] = "Manual"
filled_manually['min'] = filled_manually['min'].apply(lambda v: float(v) if v.isnumeric() else None)
filled_manually['max'] = filled_manually['max'].apply(lambda v: float(v) if v.isnumeric() else None)
df_inferred2 = df_inferred.set_index('page')
for i, row in filled_manually.iterrows():
    df_inferred2.loc[row['page'], 'min'] = row['min']
    df_inferred2.loc[row['page'], 'max'] = row['max']
    df_inferred2.loc[row['page'], 'inferred'] = row['inferred']

print(len(df_inferred2[~df_inferred2['min'].isna()]))
df_inferred2 = pd.DataFrame(infer_missing_range(df_inferred2.reset_index().values), columns=['page', 'min', 'max', 'inferred'])
print(len(df_inferred2[~df_inferred2['min'].isna()]))


In [7]:
df_inferred2['iiif_link'] = df_inferred2['page'].apply(lambda v: page_range_to_iiif(v))
df_inferred2['min'] = df_inferred2['min'].apply(lambda v: int(v) if v and not np.isnan(v) else None)
df_inferred2['max'] = df_inferred2['max'].apply(lambda v: int(v) if v and not np.isnan(v) else None)
# df_inferred2[['iiif_link', 'min', 'max']].to_csv('gpt_page_reads_augmented_manually.csv', index=False)

In [None]:
# fetching the second pass and revert the iiif link to reg number
df_inferred3 = df_inferred2.copy()
df_manual2 = pd.read_csv('manual_labeling/gpt_page_reads_augmented_manually_2nd_pass.csv')
df_manual2['page'] = df_manual2['iiif_link'].apply(lambda v: v.split('%2F')[-2] + '/' + v.split('%2F')[-1].split('/')[0].replace('.jpg', ''))

df_manual2['min'] = df_manual2['min'].apply(lambda v: float(v))
df_manual2['max'] = df_manual2['max'].apply(lambda v: float(v))
df_manual2['inferred'] = "Manual"
df_inferred3 = df_inferred3.set_index('page')
for i, row in df_manual2.iterrows():
    df_inferred3.loc[row['page'], 'min'] = row['min']
    df_inferred3.loc[row['page'], 'max'] = row['max']
    df_inferred3.loc[row['page'], 'inferred'] = row['inferred']

cols = ['page', 'min', 'max', 'inferred']
print(len(df_inferred3[~df_inferred3['min'].isna()]))
df_inferred3 = pd.DataFrame(infer_missing_range(df_inferred3.reset_index()[cols].values, step_counter=2), columns=cols)
print(len(df_inferred3[~df_inferred3['min'].isna()]))
df_inferred3['iiif_link'] = df_inferred3['page'].apply(lambda v: page_range_to_iiif(v))
df_inferred3[['iiif_link','page', 'min', 'max', 'inferred']].to_csv('final_manual_pass_on_all_data.csv', index=False)

In [None]:
# now cheking all gpt reads that seems out of bounds wrt the closest ones

df_3rd_pass = pd.read_csv('manual_labeling/3rd_pass_on_all_data.csv')
df_3rd_pass
df3gpt = df_3rd_pass[df_3rd_pass['inferred'] == 'GPT']
df3gpt
gpt_wrong_idx = []
for i, row in df3gpt.iterrows():
    if i < 2 or i > len(df3gpt) - 2:
        continue
    cmin, cmax = row['min'], row['max']
    prev_max = df_3rd_pass.iloc[i-1]['max']
    next_min = df_3rd_pass.iloc[i+1]['min']
    if cmin < prev_max or cmax > next_min:
        gpt_wrong_idx.append(i)

print(len(gpt_wrong_idx))
# df3gpt.iloc[gpt_wrong_idx].to_csv('wrong_gpt_values.csv', index=False)

In [10]:
df_all = df_3rd_pass.copy()
df_all = df_all.set_index('page')
df_4th_pass = pd.read_csv('manual_labeling/4th_pass_wrong_gpt_values.csv')
for i, row in df_4th_pass.iterrows():
    df_all.loc[row['page'], 'min'] = row['min']
    df_all.loc[row['page'], 'max'] = row['max']
    df_all.loc[row['page'], 'inferred'] = row['inferred']

df_all = df_all.reset_index()
df_all.to_csv('test.csv')


In [None]:
# Finally. whenever we have the previous max_page == min_page, that means the subparcels are split between two pages,
# so we have to manually check on which subparcel the current page starts.

df_5th = pd.read_csv('manual_labeling/5th_pass_over_all_data.csv')
print(len(df_5th))
df_5th = df_5th[df_5th['min'] != "0.0"]
print(len(df_5th))
df_5th['subparcel_start'] = 0.0
subparcels_rows_idx = []
for i in range(len(df_5th)):
    # un exemple ou ça joue: reg4/0076 & 0077
    if i < 2 or i > len(df_5th) - 2:
        continue
    cmin = df_5th.iloc[i]['min']
    prev_max = df_5th.iloc[i-1]['max']

    if cmin == prev_max:
        subparcels_rows_idx.append(i)
# df_5th.iloc[subparcels_rows_idx][['page', 'min', 'subparcel_start', 'iiif_link']].to_csv('subparcel_split_1st_pass.csv')

In [None]:
df_final = pd.read_csv('manual_labeling/5th_pass_over_all_data.csv').set_index('page')
df_subparcels = pd.read_csv('manual_labeling/subparcel_split_1st_manual_pass.csv')
df_subparcels
for _, row in df_subparcels.iterrows():
    df_final.loc[row['page'], 'min'] = row['min']
    df_final.loc[row['page'], 'subparcel_start'] = row['subparcel_start']
df_final = df_final.reset_index()
df_final = df_final[df_final['min'] != "0.0"].reset_index()
df_final

In [None]:
df_final = pd.read_csv('manual_labeling/5th_pass_over_all_data.csv')
df_final

In [None]:
from tqdm.notebook import tqdm

tqdm.pandas()

def find_page_from_parcel_number(parcel_number:str, subparcel_number:str, df_pages:pd.DataFrame) -> str:
    if type(parcel_number) == str and parcel_number.isnumeric():
        parcel_number = float(parcel_number)
    else:
        return None
    candidates = []
    for i, row in df_pages.iterrows():
        try:
            if float(row['min']) <= parcel_number and float(row['max']) >= parcel_number:
                candidates.append(i)
        except ValueError:
            continue

    if len(candidates) == 0:
        print(f'No candidates for {parcel_number}')
        return None
    if len(candidates) == 1:
        return df_pages.iloc[candidates[0]]['page']
    if len(candidates) == 2 and candidates[1] - candidates[0] == 1:
        # subparcel case
        if not subparcel_number:
            print(f'Missing subparcel number for {parcel_number}, got two candidates for the value:', [df_pages.iloc[v]['page'] for v in candidates])
            return None
        if subparcel_number.isnumeric():
            subparcel_number = float(subparcel_number) 
        else:
            print(f'Weird subparcel number for {parcel_number}:', subparcel_number)
            return None
        if subparcel_number < df_pages.iloc[candidates[0]]['subparcel_start']:
            return df_pages.iloc[candidates[0]]['page']
        else:
            return df_pages.iloc[candidates[1]]['page']
    else:
        # I have to check with Isabella what's the matter with Reg7 parcel numbers
        if 'reg1' in df_pages.iloc[candidates[0]]['page']:
            return df_pages.iloc[candidates[0]]['page']
        print(f'Multiple candidates for {parcel_number}:', [df_pages.iloc[v]['page'] for v in candidates])
        return None
sommarioni = pd.read_json('../../1808_Sommarioni/sommarioni_text_data_20240709.json')
sommarioni['page'] = sommarioni.progress_apply(lambda r: find_page_from_parcel_number(r['parcel_number'], r['sub_parcel_number'], df_final), axis=1)
sommarioni

In [282]:
sommarioni[sommarioni['page'].isna()][['parcel_number', 'sub_parcel_number', 'page']].to_csv('sommarioni_pn_not_found.csv', index=False)

In [28]:
import pandas as pd
from pathlib import Path
# finally, matching all the manual work done on the parcel and subparcels:
df_matching_pn = pd.read_csv('manual_labeling/sommarioni_pn_manual_reconciliation.csv')
sommarioni_fp = sorted(list(Path('../../1808_Sommarioni/').rglob('sommarioni_text_data_with_pages_*.json')))[-1]
sommarioni = pd.read_json(sommarioni_fp)

In [29]:
import numpy as np
s_matched = sommarioni.copy()
for i, row in df_matching_pn.iterrows():
    spn = row['sub_parcel_number']
    if type(spn) is str:
        s_matched.loc[(s_matched['parcel_number'] == row['parcel_number']) & (s_matched['sub_parcel_number'] == spn) , 'page'] = row['page']
    else:
        s_matched.loc[s_matched['parcel_number'] == row['parcel_number'] , 'page'] = row['page']
        
# prevented some HRs in the data configuration to be correctly linked to the right canvas id.
manual_correction = {
    'reg3/205':'reg3/0205',
    'reg5/118':'reg5/0118',
 'reg6bis/205':'reg6bis/0205',
 'reg6bis/206':'reg6bis/0206',
 'reg6bis/207':'reg6bis/0207',
 'reg6bis/209':'reg6bis/0209',
 'reg6bis/211':'reg6bis/0211',
 'reg6bis/212':'reg6bis/0212',
}
s_matched['page'] = s_matched['page'].apply(lambda v: manual_correction[v] if v in manual_correction else v)

In [None]:
print('entries not matched yet:', s_matched.page.isna().sum())
from datetime import datetime as dt

def today_date() -> str:
    return dt.strftime(dt.today(), '%Y%m%d')
txt_file_path = f'../../1808_Sommarioni/sommarioni_text_data_with_pages_{today_date()}.json'

# to get rid of utf-8 errors.
with open(txt_file_path, 'w', encoding='utf-8') as file:
    s_matched.to_json(file, orient='records', indent=4, force_ascii=False)

In [12]:
s_matched[s_matched.page.isna()][['place', 'owner','house_number', 'quality', 'parcel_number', 'sub_parcel_number', 'page']].to_csv('sommarioni_pn_not_found_with_owner_place.csv', index=False)