In [None]:
# ---------------- Imports ----------------
import os
import sys

import pandas as pd
import yaml

from pypdf import PdfReader
from docx import Document



In [None]:
# ---------------- Config ----------------
with open("../../../config/config.yaml", "r") as f:
    config = yaml.safe_load(f)

data_folder = os.path.join(config["paths"]["proj_store"], "data")



In [None]:
# ---------------- Setup ----------------
# Folders to extract from
folders_list = [
    # PDF
    # Manually collected
    ('manually_collected/harvard_dataverse/ai_feedback_moving_beyond', 'pdf'),
    ('manually_collected/harvard_dataverse/biodiversity_offsetting', 'pdf'),
    ('manually_collected/harvard_dataverse/healthworker_interviews', 'pdf'),
    ('manually_collected/harvard_dataverse/leaders_leading_organizational_change', 'pdf'),
    
    ## Machine collected
    ('machine_collected/jfk_library/returned_peace_corps_volunteers', 'pdf'),
    ('machine_collected/nara/assembly_oral_histories', 'pdf'),
    ('machine_collected/nara/nprc_oral_histories', 'pdf'),
    ('machine_collected/nara/oral_history_at_the_national_archives', 'pdf'),
    ('machine_collected/nara/veterans_oral_histories', 'pdf'),
    #('machine_collected/nara/wh_transition_interviews', 'pdf'),
    
    # Word
    # Manually collected
    ('manually_collected/other_collections/flu_vaccination_interviews', 'word'),
    ('manually_collected/harvard_dataverse/covid_19_threshold', 'word'),
    ('manually_collected/harvard_dataverse/drivers_of_food_choice', 'word'),
    ('manually_collected/harvard_dataverse/relationship_building_around_farmers', 'word'),
    
    # Manual (if any, here for accounting purposes only)
]




In [None]:
def extract_text_from_pdf_files(input_folder, output_folder):
    # Ensure the output folder exists
    if not os.path.exists(output_folder):
        os.makedirs(output_folder)

    # Extract the last part of the output folder path
    output_folder_name = os.path.basename(output_folder.rstrip('/\\'))

    # Check if metadata.csv exists; if not, raise an error
    metadata_path = os.path.join(input_folder, 'metadata.csv')
    if not os.path.exists(metadata_path):
        raise FileNotFoundError(f"Please add 'metadata.csv' to {input_folder}")

    # Load metadata.csv
    metadata_df = pd.read_csv(metadata_path)

    # Ensure 'original_file_name' column exists
    if 'original_file_name' not in metadata_df.columns:
        raise ValueError(f"The metadata.csv file in {input_folder} must contain an 'original_file_name' column.")

    counter = 0  # Initialize the counter for filenames

    # Sort files in alphabetical order
    #pdf_files = sorted([f for f in os.listdir(input_folder) if f.endswith('.pdf')])
    pdf_files = sorted([f for f in os.listdir(input_folder) if f.endswith('.pdf')], key=str.lower)
    

    # Iterate through sorted PDF files
    for filename in pdf_files:
        input_path = os.path.join(input_folder, filename)
        output_filename = f"{output_folder_name}_{counter:05d}.txt"
        output_path = os.path.join(output_folder, output_filename)

        # Extract metadata for the file
        matched_metadata = metadata_df[metadata_df['original_file_name'] == filename]

        if matched_metadata.empty:
            raise ValueError(f"Metadata not found for file: {filename}. Please update 'metadata.csv'.")

        try:
            # Extract text from the PDF file
            reader = PdfReader(input_path)
            text = ""

            for page in reader.pages:
                page_text = page.extract_text()
                if page_text:  # Ensure text is not None before concatenation
                    text += page_text + "\n"

            # Convert matched metadata to text format
            metadata_dict = matched_metadata.iloc[0].to_dict()
            metadata_text = "\n".join([f"{key}: {value}" for key, value in metadata_dict.items() if pd.notna(value)])

            # Prepare the output with metadata, filename, and spacing
            formatted_text = f"--- metadata ---\n{metadata_text}\n\n{text}"

            # Save the extracted text to a .txt file
            with open(output_path, 'w', encoding='utf-8') as txt_file:
                txt_file.write(formatted_text)

            print(f"Extracted text saved to: {output_path}")

            counter += 1  # Increment the counter for the next file

        except Exception as e:
            print(f"Failed to process {filename}: {e}")



In [None]:
def extract_text_from_word_files(input_folder, output_folder):
    # Ensure the output folder exists
    if not os.path.exists(output_folder):
        os.makedirs(output_folder)

    # Extract the last part of the output folder path
    output_folder_name = os.path.basename(output_folder.rstrip('/\\'))

    # Check if metadata.csv exists; if not, raise an error
    metadata_path = os.path.join(input_folder, 'metadata.csv')
    if not os.path.exists(metadata_path):
        raise FileNotFoundError(f"Please add 'metadata.csv' to {input_folder}")

    # Load metadata.csv
    metadata_df = pd.read_csv(metadata_path)

    # Ensure 'original_file_name' column exists
    if 'original_file_name' not in metadata_df.columns:
        raise ValueError(f"The metadata.csv file in {input_folder} must contain an 'original_file_name' column.")

    counter = 0  # Initialize the counter for filenames

    # Sort files in alphabetical order
    word_files = sorted([f for f in os.listdir(input_folder) if f.endswith('.docx')], key=str.lower)

    # Iterate through sorted Word files
    for filename in word_files:
        
        print(filename)
        
        input_path = os.path.join(input_folder, filename)
        output_filename = f"{output_folder_name}_{counter:05d}.txt"
        output_path = os.path.join(output_folder, output_filename)

        # Extract metadata for the file
        matched_metadata = metadata_df[metadata_df['original_file_name'] == filename]

        if matched_metadata.empty:
            raise ValueError(f"Metadata not found for file: {filename}. Please update 'metadata.csv'.")

        try:
            # Extract text from the Word file
            doc = Document(input_path)
            text = "\n".join([paragraph.text for paragraph in doc.paragraphs])

            # Convert matched metadata to text format
            metadata_dict = matched_metadata.iloc[0].to_dict()
            metadata_text = "\n".join([f"{key}: {value}" for key, value in metadata_dict.items() if pd.notna(value)])

            # Prepare the output with metadata, filename, and spacing
            formatted_text = f"--- metadata ---\n{metadata_text}\n\n{text}"

            # Save the extracted text to a .txt file
            with open(output_path, 'w', encoding='utf-8') as txt_file:
                txt_file.write(formatted_text)

            #print(f"Extracted text saved to: {output_path}")

            counter += 1  # Increment the counter for the next file

        except Exception as e:
            print(f"Failed to process {filename}: {e}")




In [None]:
# Function Call
for folder, file_type in folders_list:
    input_folder = f"{data_folder}/raw_data/{folder}"  
    output_folder = f"{data_folder}/intermediate_data/01_txt_files/{folder.split('/')[-2]}/{folder.split('/')[-1]}"  # extract everything after the penultmate and last `/` 

    if file_type == 'pdf':
        extract_text_from_pdf_files(input_folder, output_folder)
    elif file_type == 'word':
        extract_text_from_word_files(input_folder, output_folder)

