In [1]:
import os
from lxml import etree
from sqlalchemy import create_engine, text
from sqlalchemy.orm import sessionmaker
from dotenv import load_dotenv
import os
load_dotenv()


True

In [2]:
from sqlalchemy import Column, Integer, String
from sqlalchemy.ext.declarative import declarative_base
from pydantic import ValidationError
from sqlalchemy import Sequence


# ORM Class

In [3]:
Base = declarative_base()

class MetaDataOrm(Base):
    __tablename__ = 'metadata'

    id = Column(Integer, Sequence('metadata_table_id_seq'), primary_key=True, autoincrement=True)
    title = Column(String)
    publisher = Column(String, default=2023)
    availability_status = Column(String, default="")
    analytic = Column(String, default="Not Available")
    imprinted_date = Column(String)
    abstract = Column(String, default="")


  Base = declarative_base()


In [4]:
# Load environment variables
user = os.getenv('SNOWFLAKE_USER')
password = os.getenv('SNOWFLAKE_PASSWORD')
account = os.getenv('SNOWFLAKE_ACCOUNT')
warehouse = os.getenv('SNOWFLAKE_WAREHOUSE')
database = os.getenv('SNOWFLAKE_DATABASE')
schema = os.getenv('SNOWFLAKE_SCHEMA')

# Extract Data from XML

In [5]:
def extract(input_path):
    """
    Extracts metadata from an XML file using XPath expressions and appends the information to a list.

    Parameters:
    - input_path (str): The path to the XML file.
    - pdf_content_list (list): A list to which extracted metadata dictionaries will be appended.
    - bucket_links (dict): A dictionary containing links to S3 buckets corresponding to XML files.

    Returns:
    None
    """
    # Get the absolute path of the XML file
    xml_file_path = os.path.abspath(input_path)
    # Parse the XML file
    if os.path.exists(xml_file_path):
        tree = etree.parse(xml_file_path)
        root = tree.getroot()
        # Define XML namespaces
        namespaces = {
            'tei': 'http://www.tei-c.org/ns/1.0',
            'xlink': 'http://www.w3.org/1999/xlink'
        }

        def get_first_item(xpath_result):
            """
            Helper function to get the first item from an XPath result.

            Parameters:
            - xpath_result (list): List of XPath results.

            Returns:
            str: The first item or "No Data" if the list is empty.
            """
            if xpath_result:
                # Remove newline and tab characters and return the first item
                xpath_result[0] = xpath_result[0].replace('\n', '').replace('\t','')
                return f"{xpath_result[0]}"  
            else:
                return "Not Available"

        # Extract metadata using XPath expressions
        metadata_dict = {
            "Title": get_first_item(root.xpath('//tei:titleStmt/tei:title[@level="a" and @type="main"]/text()', namespaces=namespaces)),
            "Publisher": get_first_item(root.xpath('//tei:publicationStmt/tei:publisher/text()', namespaces=namespaces)),
            "AvailabilityStatus": get_first_item(root.xpath('//tei:availability/@status', namespaces=namespaces)),
            "Analytic": get_first_item(root.xpath('//tei:analytic/text()', namespaces=namespaces)),
            "ImprintedDate": get_first_item(root.xpath('//tei:imprint/tei:date/text()', namespaces=namespaces)),
            "AppInfoDescription": get_first_item(root.xpath('//tei:application/tei:desc/text()', namespaces=namespaces)),
            "Abstract": get_first_item(root.xpath('//tei:profileDesc/tei:abstract/tei:p/text()', namespaces=namespaces)),
        }
        # Append the metadata dictionary to the list
        return metadata_dict
    else:
        print(f"The file {xml_file_path} does not exist.")



# Create Csv and transform data

In [6]:
import pandas as pd
raw_csv_path = '../resources/grobid_xml_data/raw_metadata.csv'
def extract_content(paths,raw_csv_path):
    pdf_content_list=[]
    for path in paths:
        pdf_content_list.append(extract(path))

    md = pd.DataFrame(pdf_content_list)
    
    md.to_csv(raw_csv_path, index=False)


Pyarrow will become a required dependency of pandas in the next major release of pandas (pandas 3.0),
(to allow more performant data types, such as the Arrow string type, and better interoperability with other libraries)
but was not found to be installed on your system.
If this would cause problems for you,
please provide us feedback at https://github.com/pandas-dev/pandas/issues/54466
        
  import pandas as pd


In [7]:
paths = ['../resources/grobid_xml_data/Level1_combined.grobid.tei.xml',
         '../resources/grobid_xml_data/Level2_combined.grobid.tei.xml',
         '../resources/grobid_xml_data/Level3_combined.grobid.tei.xml',
        ]
extract_content(paths,raw_csv_path)

In [8]:
def process_data(input_path,processed_csv_path):
    md = pd.read_csv(input_path)
    md = md.fillna('Not Available')
    md = md.applymap(lambda x: x.replace('\n', '').replace('\t', '') if isinstance(x, str) else x)
    md.to_csv(processed_csv_path, index=False)

processed_csv_path = '../resources/clean_csv/processed_metadata.csv'

process_data(raw_csv_path,processed_csv_path)

  md = md.applymap(lambda x: x.replace('\n', '').replace('\t', '') if isinstance(x, str) else x)


# Create ORM Instances

In [9]:
def create_orm_instances(df):
    orm_instances = []
    for _, row in df.iterrows():
        orm_instance = MetaDataOrm(
                title=row['Title'],
                publisher= row['Publisher'],
                analytic=row['Analytic'],
                imprinted_date=row['ImprintedDate'],
                abstract=row['AppInfoDescription'],
                availability_status=row['Abstract']
        )
        orm_instances.append(orm_instance)
    return orm_instances

# Use Pydantic to validate

In [10]:
from pydantic import BaseModel, Field, validator, ValidationError

class MetaData(BaseModel):
    title: str = Field(alias='Title', default="Unknown")  
    publisher: str = Field(default="Unknown")
    availability_status: str = Field(default="", alias='availability_status')
    analytic: str = Field(default="Not Available", alias='analytic', min_length=2)
    imprinted_date: str = Field(..., alias='imprinted_date')  
    abstract: str = Field(default="", min_length=2)

    @validator('title', 'publisher', 'availability_status', 'analytic', 'imprinted_date', 'abstract', pre=True, each_item=False)
    def check_empty_string(cls, v,values):
        if v == "":
            raise ValueError(f'empty string not allowed')

        return v

/var/folders/6q/q891flcj0r375hpjwrt2wtbm0000gn/T/ipykernel_12910/601115387.py:11: PydanticDeprecatedSince20: Pydantic V1 style `@validator` validators are deprecated. You should migrate to Pydantic V2 style `@field_validator` validators, see the migration guide for more details. Deprecated in Pydantic V2.0 to be removed in V3.0. See Pydantic V2 Migration Guide at https://errors.pydantic.dev/2.6/migration/
  @validator('title', 'publisher', 'availability_status', 'analytic', 'imprinted_date', 'abstract', pre=True, each_item=False)


In [11]:
from pydantic import BaseModel, Field, validator, ValidationError

def orm_instance_to_meta_pydantic(orm_instance):
    return MetaData(**orm_instance.__dict__)
def convert_to_pydantic_instances(orm_instances):
        return [orm_instance_to_meta_pydantic(orm_instance) for orm_instance in orm_instances]


# create orm instances from csv and then validate with pydantic model

In [12]:
def create_orm_instances_and_validate_using_pydantic(path):
    try:
        df = pd.read_csv(path)
        orm_instances = create_orm_instances(df)
        for i in orm_instances:
            print(i.title)
        pydantic_instances = convert_to_pydantic_instances(orm_instances)
        print(f"{len(pydantic_instances)} are validated")
        return orm_instances
    except Exception as e:
        print(str(e))
        print("Error in validation")


In [13]:
orm_instances = create_orm_instances_and_validate_using_pydantic(processed_csv_path)

Not Available
Not Available
2024 Level III Topic Outlines Economics LEARNING OUTCOMES Capital Market Expectations, Part 1: Framework and Macro Considerations
3 are validated


# Upload to snowflake

In [14]:
def create_database_if_not_exists(engine, database):
    connection = engine.connect()
    connection.execute("CREATE DATABASE IF NOT EXISTS {}".format(database))
    # connection.execute("USE schema cfa_rr_list")
    connection.close()
def upload_to_snowflake(engine, orm_instances, database):
        create_database_if_not_exists(engine, database)
        Base.metadata.bind = engine
        if not engine.dialect.has_table(engine, MetaDataOrm.__tablename__):
            MetaDataOrm.__table__.create(bind=engine)
        else:
            print(f"Table '{MetaDataOrm.__tablename__}' already exists.")
        
        SessionClass = sessionmaker(bind= engine)
        session = SessionClass()

        # Get the total number of ORM instances to insert
        total_instances = len(orm_instances)

        # Set a threshold for printing progress updates (adjust as needed)
        progress_threshold = 1

        for idx, orm_instance in enumerate(orm_instances, start=1):
            session.add(orm_instance)
            print(orm_instance.id)
            # Commit the changes periodically to the database
            if idx % progress_threshold == 0:
                session.commit()
                print(f"Inserted {idx}/{total_instances} records.")

        # Commit any remaining changes
        session.commit()
        print(f"Inserted {total_instances}/{total_instances} records.")


In [15]:
engine = create_engine(
    f'snowflake://{user}:{password}@{account}/{database}/{schema}?warehouse={warehouse}'
)
upload_to_snowflake(engine=engine, orm_instances=orm_instances, database=database)

None
Inserted 1/3 records.
None
Inserted 2/3 records.
None
Inserted 3/3 records.
Inserted 3/3 records.
