In [61]:
import os
from dotenv import load_dotenv
load_dotenv(override=True)
from langchain_openai import ChatOpenAI
from pydantic import BaseModel, Field
from typing import List
from langchain_community.document_loaders import PyPDFLoader

In [62]:
# get root directory
root_dir = os.getcwd()
# get the path to the data directory
data_dir = os.path.join(root_dir, 'data')

In [63]:

file_name = 'The_Oxford_3000_by_CEFR_Level.pdf'
file_path = os.path.join(data_dir, file_name)

loader = PyPDFLoader(file_path)
pages = []
async for page in loader.alazy_load():
    pages.append(page)

In [64]:
len(pages)

12

In [66]:
class WordSchema(BaseModel):
    word: str = Field(description="The word")
    pos: str = Field(description="The part of speech")
    cefr_level: str = Field(description="The CEFR level of the word. A1, A2, B1, B2, C1, C2. If the CEFR level is not available in the document, you must infer it based on your knowledge of the word. Never leave this field empty.")

In [67]:
class OutputSchema(BaseModel):
    words: List[WordSchema] = Field(description="The words extracted from the document")

In [68]:
model = ChatOpenAI(model="gpt-4o-mini", temperature=0)
model_with_structure = model.with_structured_output(OutputSchema)
structured_output = model_with_structure.ainvoke("extract the word and part of speech from the page: {}".format(pages[0].page_content))

In [69]:
import asyncio

model = ChatOpenAI(model="gpt-4o-mini", temperature=0)
model_with_structure = model.with_structured_output(OutputSchema)

async def extract_data_from_page(page_content):
    return await model_with_structure.ainvoke("extract the word and part of speech from the page: {}".format(page_content))

async def main():
    tasks = [extract_data_from_page(page.page_content) for page in pages]
    results = await asyncio.gather(*tasks)
    return results

results = await main()

In [70]:
results

[OutputSchema(words=[WordSchema(word='a', pos='indefinite article', cefr_level='A1'), WordSchema(word='about', pos='prep., adv.', cefr_level='A1'), WordSchema(word='above', pos='prep., adv.', cefr_level='A1'), WordSchema(word='across', pos='prep., adv.', cefr_level='A1'), WordSchema(word='action', pos='n.', cefr_level='A1'), WordSchema(word='activity', pos='n.', cefr_level='A1'), WordSchema(word='actor', pos='n.', cefr_level='A1'), WordSchema(word='actress', pos='n.', cefr_level='A1'), WordSchema(word='add', pos='v.', cefr_level='A1'), WordSchema(word='address', pos='n.', cefr_level='A1'), WordSchema(word='adult', pos='n.', cefr_level='A1'), WordSchema(word='advice', pos='n.', cefr_level='A1'), WordSchema(word='afraid', pos='adj.', cefr_level='A1'), WordSchema(word='after', pos='prep.', cefr_level='A1'), WordSchema(word='afternoon', pos='n.', cefr_level='A1'), WordSchema(word='again', pos='adv.', cefr_level='A1'), WordSchema(word='age', pos='n.', cefr_level='A1'), WordSchema(word='ago'

In [71]:
valid_cefr_levels = {'A1', 'A2', 'B1', 'B2', 'C1', 'C2'}
invalid_entries = []

for result in results:
    for word in result.words:
        if word.cefr_level not in valid_cefr_levels:
            invalid_entries.append(word)

invalid_entries

[]

In [72]:
len(results)

12

# Write to database 

In [74]:
def prepare_data(parsing_result) -> List[tuple]:
    return [
        (word.word, word.pos, word.cefr_level, 'Oxford 3000') for word in parsing_result.words
    ]

In [75]:
def insert_data_to_pg_db(data: List[tuple]):
    import psycopg2  # Example for PostgreSQL. Use the correct library for your DB.
    try:
        # Establish connection
        conn = psycopg2.connect(os.getenv('DATABASE_URL'))
        cur = conn.cursor()

        # SQL query (use parameterized queries to prevent SQL injection)
        sql = "INSERT INTO predefined_word_bank (word, word_type, cefr_level, from_source) VALUES (%s, %s, %s, %s)"

        # Execute the query for all rows of data
        cur.executemany(sql, data)

        # Commit the changes
        conn.commit()

        print("Data inserted successfully!")

    except (Exception, psycopg2.Error) as error:
        print(f"Error: {error}")

    finally:
        if conn:
            cur.close()
            conn.close()
            print("Database connection closed.")

In [76]:
# test the process with the first page
test_page = results[0]
data = prepare_data(test_page)
insert_data_to_pg_db(data)

Data inserted successfully!
Database connection closed.


In [None]:
# write a loop to process the rest of the pages
for result in results[1:]:
    data = prepare_data(result)
    insert_data_to_pg_db(data)

Data inserted successfully!
Database connection closed.
Data inserted successfully!
Database connection closed.
Data inserted successfully!
Database connection closed.
Data inserted successfully!
Database connection closed.
Data inserted successfully!
Database connection closed.
Data inserted successfully!
Database connection closed.
