In [6]:
import requests
import pandas as pd
import json
from bs4 import BeautifulSoup as bs
import re

class Page:
    def __init__(self, xml_path=None, xml_content=None):
        if xml_path:
            self.load_xml_path(xml_path)
        elif xml_content:
            self.load_xml(xml_content)
        else:
            raise ValueError("No xml path or content provided.")

    def load_xml_path(self, path):
        with open(path, "r", encoding="utf-8") as f:
            xml = f.read()
        self.load_xml(xml)

    def load_xml(self, xml):
        self.soup = bs(xml, features="xml")

    def sentence_from_keyword(self, keyword):
        token = self.soup.find("String", attrs={"CONTENT": keyword})
        yield self.token_to_sentence(token)
        while (token := token.find_next("String", attrs={"CONTENT": keyword})) is not None:
            yield self.token_to_sentence(token)

    def token_to_sentence(self, token):
        strings = token.parent.find_all("String")
        return " ".join(t["CONTENT"] for t in strings)

    def paragraph_from_keyword(self, keyword):
        token = self.soup.find("String", attrs={"CONTENT": keyword})
        yield self.token_to_composed_block(token)
        while (token := token.find_next("String", attrs={"CONTENT": keyword})) is not None:
            yield self.token_to_composed_block(token)

    def token_to_composed_block(self, token):
        composed_block = token.find_parent("ComposedBlock")
        if composed_block:
            text_blocks = composed_block.find_all("TextBlock")
            result = ""
            for text_block in text_blocks:
                text_lines = text_block.find_all("TextLine")
                for text_line in text_lines:
                    strings = text_line.find_all("String")
                    line_content = " ".join(string["CONTENT"] for string in strings)
                    result += f"{line_content}\n"
            return result.strip()
        return ""

def clean_json(text):
    return text.replace("\n", " ")

counter = 0

def read_system_message():
    try:
        with open('oldtimey_touringbot_prompt_for_deployment.txt', 'r') as file:
            return file.read().strip()
    except FileNotFoundError:
        return "You are a helpful assistant."

def row_to_json(row):
    global counter
    counter += 1

    system_message_content = read_system_message()
    system_message = {"role": "system", "content": system_message_content}

    user_content_parts = [str(row[col]) for col in row.index if col not in ['System Content', 'Package ID', 'Part', 'Page']]
    user_message = {"role": "user", "content": " ".join(user_content_parts)}

    custom_id = f"{row['Package ID']}-{row['Part']}-{row['Page']}-{counter}"

    return {
        "custom_id": custom_id,
        "method": "POST",
        "url": "/v1/chat/completions",
        "body": {
            "model": "gpt-3.5-turbo-0125",
            "messages": [system_message, user_message],
            "max_tokens": 1000
        }
    }

def search_swedish_newspapers(to_date, from_date, collection_id, query):
    base_url = 'https://data.kb.se/search'
    params = {
        'to': to_date,
        'from': from_date,
        'isPartOf.@id': collection_id,
        'q': query,
        'searchGranularity': 'part'
    }

    headers = {
        'Accept': 'application/json'
    }

    response = requests.get(base_url, params=params, headers=headers)

    if response.status_code == 200:
        try:
            return response.json()
        except ValueError:
            return {'error': 'Invalid JSON response'}
    else:
        return {'error': response.status_code, 'message': response.text}

def extract_urls(result):
    base_url = 'https://data.kb.se'
    details = []

    for hit in result['hits']:
        part_number = hit.get('part')
        page_number = hit.get('page')
        package_id = hit.get('hasFilePackage', {}).get('@id', '').split('/')[-1]

        if part_number and page_number and package_id:
            url = f"{base_url}/{package_id}/part/{part_number}/page/{page_number}"
            details.append({
                'part_number': part_number,
                'page_number': page_number,
                'package_id': package_id,
                'url': url
            })

    return details

def extract_xml_urls(api_response, page_numbers=None):
    xml_urls = {}
    parts_list = api_response.get('hasPart', [])

    if page_numbers is not None:
        page_numbers = [int(page) for page in page_numbers]

    for part in parts_list:
        pages_list = part.get('hasPartList', [])
        for page in pages_list:
            page_id = page['@id']
            page_number = int(page_id.split('/')[-1].replace('page', ''))

            if page_numbers is None or page_number in page_numbers:
                includes_list = page.get('includes', [])
                for include in includes_list:
                    if include['@id'].endswith('alto.xml'):
                        xml_urls[page_number] = include['@id']

    return xml_urls

def fetch_xml_content(xml_urls_by_package):
    xml_content_by_package = {}

    for package_id, parts in xml_urls_by_package.items():
        for part_number, xml_urls in parts.items():
            if package_id not in xml_content_by_package:
                xml_content_by_package[package_id] = {}

            if part_number not in xml_content_by_package[package_id]:
                xml_content_by_package[package_id][part_number] = {}

            for page_number, url in xml_urls.items():
                response = requests.get(url)
                if response.status_code == 200:
                    xml_content_by_package[package_id][part_number][page_number] = response.content
                else:
                    print(f"Failed to fetch XML content from {url}. Status code: {response.status_code}")

    return xml_content_by_package

def main():
    from_date = '1908-01-01'
    to_date = '1908-01-03'
    collection_id = 'https://libris.kb.se/2ldhmx8d4mcrlq9#it'  # Svenska dagbladet
    query = 'konsert'

    result = search_swedish_newspapers(to_date, from_date, collection_id, query)
    if 'error' in result:
        print(f"Search error: {result['error']}")
        return

    detailed_info = extract_urls(result)
    if not detailed_info:
        print("No URLs found.")
        return

    print(f"Total hits found: {len(detailed_info)}")

    api_responses = []
    for info in detailed_info:
        url = info['url']
        response = requests.get(url, headers={'Accept': 'application/json'})
        if response.status_code == 200:
            data = response.json()
            api_responses.append((data, info))
        else:
            print(f"Failed to fetch data from {url}. Status code: {response.status_code}")

    xml_urls_by_package = {}
    for data, info in api_responses:
        package_id = info['package_id']
        part_number = info['part_number']
        page_numbers = [info['page_number']]
        xml_urls = extract_xml_urls(data, page_numbers)
        if package_id not in xml_urls_by_package:
            xml_urls_by_package[package_id] = {}
        if part_number not in xml_urls_by_package[package_id]:
            xml_urls_by_package[package_id][part_number] = {}
        xml_urls_by_package[package_id][part_number].update(xml_urls)

    xml_content_by_package = fetch_xml_content(xml_urls_by_package)

    def get_xml_content(package_id, part_number, page_number):
        return xml_content_by_package.get(package_id, {}).get(part_number, {}).get(page_number, None)

    all_data_frames = []

    for info in detailed_info:
        package_id = info['package_id']
        part_number = info['part_number']
        page_number = info['page_number']

        retrieved_xml_content = get_xml_content(package_id, part_number, page_number)
        if not retrieved_xml_content:
            print(f"No XML content found for Package ID: {package_id}, Part: {part_number}, Page: {page_number}")
            continue

        xml_string = retrieved_xml_content.decode('utf-8')
        page = Page(xml_content=xml_string)
        
        print(f"Processing XML content for Package ID: {package_id}, Part: {part_number}, Page: {page_number}")
        
        formatted_date = extract_and_format_date(page)
        if not formatted_date:
            print("Date extraction failed.")
            continue

        matching_composed_blocks = extract_textblocks(page, query)
        if not matching_composed_blocks:
            print(f"No matching ComposedBlocks found in Package ID: {package_id}, Part: {part_number}, Page: {page_number}")
            continue

        contents = extract_textblock_content(matching_composed_blocks)

        df = pd.DataFrame(contents, columns=["ComposedBlock Content"])
        df['Date'] = formatted_date
        df['Package ID'] = package_id
        df['Part'] = part_number
        df['Page'] = page_number

        all_data_frames.append(df)

    if all_data_frames:
        final_df = pd.concat(all_data_frames, ignore_index=True)
        output_xls_path = "all_pages_output.xlsx"
        output_jsonl_path = "all_pages_output.jsonl"

        final_df.to_excel(output_xls_path, index=False)
        print(f"Data exported to {output_xls_path}")

        with open(output_jsonl_path, 'w') as jsonl_file:
            for _, row in final_df.iterrows():
                jsonl_file.write(json.dumps(row_to_json(row)) + '\n')
        print(f"Data exported to {output_jsonl_path}")
    else:
        print("No data to export.")

def extract_and_format_date(page):
    file_name_element = page.soup.find('fileName')
    if file_name_element:
        file_name = file_name_element.text
        date_match = re.search(r'_(\d{8})_', file_name)
        if date_match:
            date_str = date_match.group(1)
            formatted_date = f"{date_str[6:8]}.{date_str[4:6]}.{date_str[0:4]}"
            return formatted_date
    return None

def extract_textblocks(page, keyword):
    matching_composed_blocks = []
    for composed_block in page.paragraph_from_keyword(keyword):
        if composed_block:  # Check if composed block is not None
            print(f"Matching ComposedBlock found with keyword '{keyword}': {composed_block}")
            matching_composed_blocks.append(composed_block)
    return matching_composed_blocks

def extract_textblock_content(matching_composed_blocks):
    contents = []
    for composed_block_content in matching_composed_blocks:
        contents.append(composed_block_content.strip())
    return contents

if __name__ == "__main__":
    main()


Total hits found: 4
Processing XML content for Package ID: dark-77536, Part: 1, Page: 12


AttributeError: 'NoneType' object has no attribute 'find_parent'