<a href="https://colab.research.google.com/github/ieg-dhr/Notebooks4Historical_Newspapers/blob/main/Internet_Archive.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# Accessing Historical Sources from the Internet Archive using the internetarchive.api

*Notebook created by Sarah Oberbichler (oberbichler@ieg-mainz.de)*

Internet Archive is a non-profit library of millions of free books, movies, software, music, websites, newspapers, and more.

Internetarchive is a python interface to archive.org. This notebook uses code provided by the internet archive (copyright 2012-2019 by Internet Archive; License AGPL 3). A documentation on how to use this python interface is availble here: https://archive.org/developers/internetarchive/internetarchive.html#module-internetarchive.api

###This notebook shows how:


*   to get the identifiers of a document (with or without in combination with a search keyword)
*   to download the documents with their metadata
*   to create a DataFrame bringing metadata and fulltext together
*   to reduce the full text to a specific context window
*   to exort the DataFrame as an Excel file for further processing






# Setting up the requirements for the internetarchive.api

In [None]:
pip install internetarchive

In [None]:
pip install --upgrade internetarchive

In [None]:
# @markdown ####  Get the identifiers of all documents in a collection from a specific creator. Write the name of the newspaper (e.g., La Stampa) and the collection (e.g., newspapers). You can also choose a date range by adding --> AND date:[1913-10-17 TO 1944-11-17]:
# Initialize an ArchiveSession
import pandas as pd
from internetarchive.session import ArchiveSession
from internetarchive.search import Search
import os
import pandas as pd
import xml.etree.ElementTree as ET
from internetarchive import get_item
s = ArchiveSession()
add = "title:(La patria del friuli) AND collection:(newspapers) AND date:[1909-01-01 TO 1912-01-01]" # @param {type:"string"}

# Perform the search
search = Search(s, add)

# Collect search results into a list of dictionaries
collection_identifiers = []
for result in search:
    collection_identifiers.append(result['identifier'])

print(collection_identifiers)
print(len(collection_identifiers))

In [None]:

# @markdown #### Or get the identifiers of documents that contain a specific keyword.
# @markdown #####  Write the text of the first part of the identifier (e.g., lastampa):

Collection = "LaPatriadelFriuli" # @param {type:"string"}
# @markdown #####  Search the full text with a keyword:
Keyword = "terremoto" # @param {type:"string"}

# Initialize an ArchiveSession
s = ArchiveSession()

# Perform the search
search = Search(s, Keyword, full_text_search = True)

# Collect search results into a list of dictionaries
results_list = []
for result in search:
    results_list.append(result)

keyword_identifiers = []
identifier_collection = Collection

# Iterate over the search results and extract identifiers
for result in results_list:
    identifier = result['_id'].split('|')[0]
    if identifier_collection in identifier:
        keyword_identifiers.append(identifier)

print(keyword_identifiers)
print(len(keyword_identifiers))


In [None]:
import re
from datetime import datetime
# @markdown #### You can also select a date range for your keyword search
# @markdown #### Insert the Start Date:

Start_year = 1908 # @param {type:"integer"}
# @markdown #####  Insert the End Date:
End_year = 1915 # @param {type:"integer"}


# Define the date range
start_year = Start_year
end_year = End_year

import re
from datetime import datetime

# Function to extract the year from the identifier
def extract_year(identifier):
    # Use regex to find a 4-digit number that represents a year
    match = re.search(r'_(\d{4})_', identifier)
    if match:
        year_str = match.group(1)
        try:
            # Convert the year string to a datetime object
            return datetime.strptime(year_str, '%Y')
        except ValueError:
            print(f"Warning: Invalid date format in identifier: {identifier}")
            return None
    else:
        print(f"Warning: Could not find a valid year in identifier: {identifier}")
        return None



# Filter the identifiers based on the date range
keyword_date_identifiers = []
for identifier in keyword_identifiers:
    year = extract_year(identifier)
    if year is not None and start_year <= year.year <= end_year:
        keyword_date_identifiers.append(identifier)

print("Filtered Identifiers:", keyword_date_identifiers)
print("Count:", len(keyword_date_identifiers))

# Download the documents with their metadata and create a DataFrame

In [None]:
# @markdown #####  The documents can be downloaded as text files, xml, json, image files, pdf's, etc. The advanced search page of the internet archive gives an overview of the possible formats https://archive.org/
# @markdown #####  Select the list of indentifiers here:
Indentifier_List = collection_identifiers # @param ["collection_identifiers", "keyword_identifiers", "keyword_date_identifiers"] {type:"raw"}


# @markdown #####  Select the format here:
Format = "djvu.txt" # @param {type:"string"}

# Create an empty list to store data
data = []


# Define a function to extract metadata from XML files
def extract_metadata(xml_file):
    metadata = {}
    tree = ET.parse(xml_file)
    root = tree.getroot()
    for child in root:
        metadata[child.tag] = child.text
    return metadata


# Process each identifier
for identifier in Indentifier_List:
    try:
        # Get the item from Internet Archive
        item = get_item(identifier)
        # Get all files associated with the item
        files = item.get_files()
        # Process each file
        metadata = None  # Initialize metadata
        for file in files:
            try:
                # Check if the file name contains the specific extension
                if file.name.lower().endswith(Format):
                    print(f"Downloading text file: {file.name}")
                    # Download the text file
                    file.download(f'./downloads/{file.name}', verbose=True)
                    with open(f"./downloads/{file.name}", 'r', encoding='utf-8') as f:
                        text = f.read()
                # Check if the file name contains the specific extension ".meta.xml"
                if file.name.lower().endswith('meta.xml'):
                    print(f"Downloading metadata file: {file.name}")
                    # Download the metadata file
                    file.download(f'./downloads/{file.name}', verbose=True)
                    # Extract metadata from XML file
                    metadata = extract_metadata(f'./downloads/{file.name}')
            except Exception as e:
                print(f"Error processing file {file.name} for identifier {identifier}: {e}")
        # Add the content and metadata to the data list
        data.append({ **metadata, 'content': text,})
    except Exception as e:
        print(f"Error processing identifier {identifier}: {e}")

# Create a DataFrame from the collected data
df = pd.DataFrame(data)

# Display the DataFrame
df['content'].replace("\n", "")
df


In [None]:
# @markdown #### For the full text we can narrow down the text surrounding the keyword in order to reduce the input tokens for the model. Choose the size of the context window here:
context_window = 8000 # @param {type:"number"}
import pandas as pd
from typing import List, Set, Tuple

def extract_unique_contexts(df: pd.DataFrame, keywords: List[str], window_size: int = 8000) -> pd.DataFrame:
    """
    Extract context windows for multiple keywords while preventing duplicates.

    Args:
        df: DataFrame containing the text column
        keywords: List of keywords to search for
        window_size: Size of the context window (default: 2000)

    Returns:
        DataFrame with new context column
    """

    def get_context_bounds(text: str, keyword: str, window_size: int) -> Tuple[int, int]:
        """Get start and end indices for context window."""
        index = text.find(keyword)
        if index == -1:
            return (-1, -1)

        start_index = max(0, index - window_size)
        end_index = min(len(text), index + len(keyword) + window_size)
        return (start_index, end_index)

    def check_overlap(bounds: Tuple[int, int], existing_bounds: Set[Tuple[int, int]]) -> bool:
        """Check if new bounds overlap with any existing bounds."""
        start, end = bounds
        for existing_start, existing_end in existing_bounds:
            if not (end < existing_start or start > existing_end):
                return True
        return False

    # Create a copy of the DataFrame to avoid modifying the original
    result_df = df.copy()

    # Initialize columns for contexts
    result_df['context'] = ''
    result_df['keyword_found'] = ''

    # Process each row
    for idx, row in result_df.iterrows():
        text = row['content']
        if not isinstance(text, str):
            continue

        used_bounds: Set[Tuple[int, int]] = set()
        contexts = []
        keywords_found = []

        # Process each keyword
        for keyword in keywords:
            bounds = get_context_bounds(text, keyword, window_size)

            if bounds[0] == -1:  # Keyword not found
                continue

            # Check for overlap with existing contexts
            if not check_overlap(bounds, used_bounds):
                used_bounds.add(bounds)
                context = text[bounds[0]:bounds[1]]
                contexts.append(context)
                keywords_found.append(keyword)

        # Join all unique contexts and keywords
        result_df.at[idx, 'context'] = ' ||| '.join(contexts) if contexts else 'No unique context found'
        result_df.at[idx, 'keyword_found'] = ', '.join(keywords_found) if keywords_found else 'No keywords found'

    return result_df

# Example usage:
keywords = ["terremoto", "erremoto", "seismo", "terremoti", "erremoti"]  # Add your keywords


# Assuming your DataFrame is called 'df' and has a 'text' column
processed_df = extract_unique_contexts(df, keywords, context_window)

In [None]:
processed_df

In [None]:
processed_df.to_excel('name.xlsx', index=False)