In [1]:
import os
import glob
import re
import json
from transformers import AutoModelForCausalLM, AutoTokenizer
import numpy as np

In [2]:
md_paths = '../converted'
json_paths = '../extracted'

In [3]:
# Use glob to list all .md files in the folder
markdown_files = glob.glob(os.path.join(md_paths, "*.md"))
np.random.shuffle(markdown_files)
os.makedirs(json_paths, exist_ok=True)

# model setup

In [None]:
# Specify the path to your LLaMA 3 model directory
model_path = "/Users/nunocalaim/.llama/llama-hf"

# Load the tokenizer
tokenizer = AutoTokenizer.from_pretrained(model_path)

# Ensure the pad token is set
if tokenizer.pad_token is None:
    tokenizer.add_special_tokens({'pad_token': tokenizer.eos_token})

model = AutoModelForCausalLM.from_pretrained(model_path)
model.resize_token_embeddings(len(tokenizer))

# Set the pad_token_id and eos_token_id in model configuration
model.config.pad_token_id = tokenizer.pad_token_id
model.config.eos_token_id = tokenizer.eos_token_id

# Print token IDs to confirm they are integers
print("pad_token_id:", model.config.pad_token_id)
print("eos_token_id:", model.config.eos_token_id)

# Make sure eos_token_id and pad_token_id are valid integers
assert isinstance(model.config.pad_token_id, int), "pad_token_id should be an integer."
assert isinstance(model.config.eos_token_id, int), "eos_token_id should be an integer."

In [5]:
def generate_response(prompt, json_path):
    # Tokenize the input text and create an attention mask
    inputs = tokenizer(prompt, return_tensors="pt", padding=True)

    # Generate a response using the model
    output = model.generate(
        inputs["input_ids"],
        attention_mask=inputs["attention_mask"],
        max_new_tokens=300,
        num_return_sequences=1,
        pad_token_id=model.config.pad_token_id  # Use pad_token_id from the model's config
    )

    # Decode and print the output text
    generated_text = tokenizer.decode(output[0], skip_special_tokens=True)

    # Remove the input prompt from the generated response
    generated_text = generated_text[len(prompt)-100:].strip()

    # print("Response:", generated_text)

    # Use regex to extract the JSON block from the response
    json_str_match = re.search(r'{.*}', generated_text, re.DOTALL)

    if json_str_match:
        json_str = json_str_match.group(0)
        # Parse the JSON string
        try:
            json_data = json.loads(json_str)

            # Save the JSON data to a file
            with open(json_path, 'w') as json_file:
                json.dump(json_data, json_file, indent=4)  # Save the JSON with indentation for readability

            print(f"JSON data saved to '{json_path}'")
            print("Extracted JSON:", json_data)
        except json.JSONDecodeError as e:
            print(f"Error decoding JSON: {e}")
            print(f"generated text is {generated_text}")
    else:
        print("No JSON found in the response")
        print(f"generated text is {generated_text}")

In [6]:
def run_model(content, json_path):
    prompt = f"""
    <|system|> You are a helpful assistant. You process tax records of Norwegian companies and extract specific information from them. This is the tax record you will base your answers on: {content}

    You provide your response only in a predefined JSON format and nothing else. Do not include any explanation, comments, or extra information—only the JSON.

    The JSON structure should strictly follow this format:

    "json"
    {{
        "company_name": "string",          
        "company_id": "string",            
        "leadership": {{
            "CEO": "string or null",       
            "board_members": [
                "string or null"           
            ],
            "chairman_of_the_board": "string or null" 
        }},
        "subsidiaries": [
            "string or null"               
        ],
        "parent_company": "string or null", 
        "profit_status": "boolean or null", 
        "property_status": "string",        
        "number_of_employees": "integer or null" 
    }}

    <|user|> I want to extract the following information from the tax record:

    - Company name
    - Company ID
    - Leadership: names of CEO, board members, and chairman of the board
    - A list of subsidiaries if they exist
    - The parent company if it exists
    - Whether the company made a profit
    - Whether the company owns or rents property
    - The number of employees

    Remember, only return the JSON. No explanations.
    <|assistant|>
    """

    generate_response(prompt, json_path)

In [7]:
def process_md(path):
    match = re.search(r"/(\d+)_arso_2020", path)
    print(match)
    if match:
        id = match.group(1)  # group(1) will contain everything before "_arso_2020"
    else:
        return None
    json_path = f"{json_paths}/{id}.json"
    if not os.path.exists(json_path):
        print(f"processing {path}...")
        with open(path, 'r', encoding='utf-8') as file:
            content = file.read()
        run_model(content, json_path)

In [None]:
for path in markdown_files:
    print(path)
    process_md(path)