# Explorations in Building Search Functionality for Local Chron Am

In [None]:
import os
import tarfile
import re
import pandas as pd
from tqdm.notebook import tqdm
import ast

## Method 1: Search by Selected Year Range, State, and Keyword

In [None]:
df = pd.read_csv('https://raw.githubusercontent.com/MatthewKollmer/chron_am_backup/refs/heads/main/newspapers.csv')

# be sure to change these filepaths accordingly
directory = '/Volumes/t5_evo_8tb/ChronAm/tarbiz2_files'
search_results = '/Volumes/t5_evo_8tb/ChronAm/search_results'
os.makedirs(search_results, exist_ok=True)

In [None]:
# change these variables to the state you want to search, the year range (start_year to end_year), and the keyword search
# you can search by multiple keywords. Just enter them as a list (i.e. 'search_terms = ['lynching', 'outrage', 'etc', 'etc']

state = 'Alaska' # must be full-text spelling since that's how they appear in newspapers.csv, column 'State'
start_year = 1903
end_year = 1904
search_terms = ['lynching']

In [None]:
# some necessary functions

# this one puts together a list of years within the selected range
def overlapping_years(year_list):
    return any(start_year <= y <= end_year for y in year_list)

# this one takes the search term and compiles all possible OCR variations with one character difference from the standard spelling
# it ignores search terms that are four characters or less, however
def ocr_variations(term: str) -> str:
    words, variations = term.split(), []
    for word in words:
        if len(word) < 4:
            variations.append(re.escape(word))
            continue
            
        potential_ocr_errors = [re.escape(word)]
        for i in range(len(word)):
            potential_ocr_errors.append(re.escape(word[:i]) + '.' + re.escape(word[i+1:]))
        variations.append(f'(?:{"|".join(potential_ocr_errors)})')
        
    return r'\W+'.join(variations)

In [None]:
# before running further, these filters ensure your state selection and year range appear somewhere in the data
state_selection_filter = df[df['State'].str.strip().str.lower() == state.lower()]
if state_selection_filter.empty:
    raise SystemExit(f'your state selection does not appear in the newspaper data. Check spelling or results for: {state}')

target_tarfiles = set()
for _, row in state_selection_filter.iterrows():
    for entry in ast.literal_eval(row['tarfiles']):
        if isinstance(entry, dict) and overlapping_years(entry['years']):
            target_tarfiles.add(entry['file_name'])

if not target_tarfiles:
    raise SystemExit(f'there does not seem to be any files containing digitized pages from the state ({state}) you selected during the time period you selected ({start_year} - {end_year}).')

print(f'Number of files to search through: {len(target_tarfiles)}')

In [None]:
# first, use ocr_variations() to compile possible variations of your search term
fuzzy_search_term = [ocr_variations(search_word) for search_word in search_terms]
keyword_regex = re.compile('|'.join(fuzzy_search_term), re.IGNORECASE)

# then run the search. Heads up: could take a while, especially if you're searching a large timeframe!
hit_count = 0
for file in tqdm(sorted(target_tarfiles), desc='Searching'):
    tarfile_path = os.path.join(directory, file)

    try:
        with tarfile.open(tarfile_path, 'r:bz2') as tar:
            for page in tar:
                if not page.isfile() or not page.name.endswith('.txt'):
                    continue

                year_match = re.search(r'/(\d{4})/', page.name)
                if year_match and not (start_year <= int(year_match.group(1)) <= end_year):
                    continue

                extracted = tar.extractfile(page)
                if extracted is None:
                    continue
                page_bytes = extracted.read()
                page_text  = page_bytes.decode('utf-8', 'ignore')

                if not keyword_regex.search(page_text):
                    continue

                save_name = f'{file}-{page.name.replace("/", "-")}'
                save_path = os.path.join(search_results, save_name)

                with open(save_path, 'wb') as out_fh:
                    out_fh.write(page_bytes)

                hit_count += 1

    except Exception as e:
        print('Error processing', file, ':', e)

print()
print(f'Done! {hit_count} pages were found to contain relevant results.')

## Method 2: Search by Selected State, Newspaper, and Year Range

### And Attempting to Incorporate Widgets as Well

In [None]:
import os
import shutil
import tarfile
import re
import pandas as pd
from tqdm.notebook import tqdm
import ast
import itertools
import ipywidgets as w
from IPython.display import display, HTML

In [None]:
batches_df = pd.read_csv('https://raw.githubusercontent.com/MatthewKollmer/chron_am_backup/refs/heads/main/ocr_batches.csv', converters={'contents': ast.literal_eval}) # converters here ensures the lists of dictionaries in ocr_batches.csv are read as such
papers_df = pd.read_csv('https://raw.githubusercontent.com/MatthewKollmer/chron_am_backup/refs/heads/main/newspapers.csv')

# be sure to change these directories accordingly
directory = '/Volumes/t5_evo_8tb/ChronAm/tarbiz2_files'
search_results  = '/Volumes/t5_evo_8tb/ChronAm/state_year_results'
os.makedirs(search_results, exist_ok=True)

In [None]:
# we'll need a map of sn codes and the tarfiles containing them
sn_to_tarfiles = {}
for _, batch_row in batches_df.iterrows():
    tarball = batch_row['file_name']
    for item in batch_row['contents']:
        sn_code, info = next(iter(item.items()))
        sn_to_tarfiles.setdefault(sn_code, []).append({'file_name': tarball, 'years': info['years']})

In [None]:
# assembling the widgets
state_selection_dropdown = w.Dropdown(options=sorted(papers_df['State'].dropna().unique()), description='State:')

min_year = min(itertools.chain.from_iterable(dictionary['years'] for list in sn_to_tarfiles.values() for dictionary in list))
max_year = max(itertools.chain.from_iterable(dictionary['years'] for list in sn_to_tarfiles.values() for dictionary in list))

year_slider = w.IntRangeSlider(value=[min_year, max_year], min=min_year, max=max_year, step=1, description='Search Year Range:', continuous_update=False, layout=w.Layout(width='70%'))

paper_select = w.SelectMultiple(options=[], description='Newspapers:', layout=w.Layout(width='95%', height='200px'))
search_button = w.Button(description='Start Search')
out_box = w.Output()

In [None]:
# some necessary functions

# a function for assembling the full range of selectable years in the database
def full_year_range(year_list, year0, year1):
    return any(year0 <= year <= year1 for year in year_list)

# a function for assembling the full list of newspapers in the database
def update_newspaper_list(*args):
    state = state_selection_dropdown.value
    year0, year1 = year_slider.value
    subset = papers_df[papers_df['State'] == state]

    options = []
    for _, row in subset.iterrows():
        sn_code = row['LCCN']
        title = row['Title']
        if any(full_year_range(entry['years'], year0, year1) for entry in sn_to_tarfiles.get(sn_code, [])):
            options.append(f'{sn_code} — {title}')
            
    paper_select.options = sorted(options)
    paper_select.value = ()

# function to run the search via the widgets created above
def run_search(_):
    out_box.clear_output()
    chosen = paper_select.value
    if not chosen:
        with out_box: print('Select a newspaper')
        return

    year0, year1 = year_slider.value
    saved_results = os.path.join(search_results, f'{state_selection_dropdown.value}_{year0}-{year1}')
    os.makedirs(saved_results, exist_ok=True)

    tarfiles_needed = set()
    chosen_sn_code       = []
    for label in chosen:
        sn_code = label.split(' — ')[0]
        chosen_sn_code.append(sn_code)
        for entry in sn_to_tarfiles[sn_code]:
            if full_year_range(entry['years'], year0, year1):
                tarfiles_needed.add(entry['file_name'])

    with out_box:
        print()
        print(f'Searching {len(tarfiles_needed)} batches ...')
        print()

    year_regex = re.compile(r'/(\d{4})/')
    with out_box:
        for file in tqdm(sorted(tarfiles_needed), desc='Searching'):
            tarfile_path = os.path.join(directory, file)
            try:
                with tarfile.open(tarfile_path, 'r:bz2') as tar:
                    for page in tar:
                        if not page.isfile() or not page.name.endswith('.txt'):
                            continue

                        if not any(sn_code in page.name for sn_code in chosen_sn_code):
                            continue

                        year_match = year_regex.search(page.name)
                        if not year_match:
                            continue
                        year = int(year_match.group(1))
                        if not (year0 <= year <= year1):
                            continue

                        save_name = f'{file}-{page.name.replace("/", "-")}'
                        save_path = os.path.join(saved_results, save_name)
                        with open(save_path, 'wb') as fh_out:
                            shutil.copyfileobj(tar.extractfile(page), fh_out)
            except Exception as e:
                with out_box: print('Error processing️', file, e)

    with out_box:
        print()
        print('Search complete.')
        display(HTML(f'<b>Files saved in:</b> {saved_results}'))

In [None]:
# if you're ready to pull pages by state, year range, and newspaper, run this code
state_selection_dropdown.observe(update_newspaper_list, names='value')
year_slider.observe(update_newspaper_list, names='value')
update_newspaper_list()

search_button.on_click(run_search)
display(state_selection_dropdown, year_slider, paper_select, search_button, out_box)