In [1]:
import os
import pandas as pd
import logging
from pathlib import Path
from typing import List, Optional
from datetime import datetime

from utils import  parse_contributors, parse_leitzahl

from constants import INPUT_DATA_PATH, OUT_BCP_PATH, ALL_COLUMNS_SORTED, PUBLICATION_COLUMNS, PUBLICATION_TABLE, AUTHORSHIP_COLUMNS, AUTHORSHIP_TABLE, MASTER_TABLE
from db.db_importer import dump_table

# Get current date in yyyy_mm_dd format
current_date = datetime.now().strftime('%Y_%m_%d')

# Configure logging
log_dir = 'logs'
os.makedirs(log_dir, exist_ok=True)
log_file_path = os.path.join(log_dir, f'{current_date}_dump_processing.log')

logger = logging.getLogger('metadata_logger')
if not logger.hasHandlers():
    logger.setLevel(logging.INFO)
    file_handler = logging.FileHandler(log_file_path)
    formatter = logging.Formatter('%(asctime)s - %(levelname)s - %(message)s')
    file_handler.setFormatter(formatter)
    logger.addHandler(file_handler)

INFO - Connected to DB: <bound method PSQL_DB.__repr__ of Postgres('biblioowner', <password hidden>, 'id-hdb-psgr-cp46.ethz.ch', '5432', 'bibliometrics')>


In [2]:
def bulk_load_df() -> pd.DataFrame:
    """
    Load multiple CSV files from the input directory into a single DataFrame.

    Returns:
        pd.DataFrame: A concatenated DataFrame containing data from all CSV files.
    """
    df_list: List[pd.DataFrame] = []

    for file_name in sorted(Path(INPUT_DATA_PATH).iterdir()):
        if file_name.is_file() and file_name.suffix == '.csv':
            try:
                logging.info(f'Reading file: {file_name}')
                df_list.append(pd.read_csv(file_name, index_col=False, low_memory=False, dtype=str))
            except pd.errors.EmptyDataError:
                logging.error(f'EmptyDataError - {file_name} is empty and will be skipped.')
            except pd.errors.ParserError:
                logging.error(f'ParserError - {file_name} is malformed and will be skipped.')
            except Exception as error:
                logging.error(f'Error reading {file_name} => {error}')
    
    if df_list:
        logging.info('Concatenating dataframes')
        return pd.concat(df_list, ignore_index=True)
    else:
        logging.warning('No dataframes to concatenate, returning an empty DataFrame.')
        return pd.DataFrame()

In [3]:
OUT_BCP_PATH2 = Path(OUT_BCP_PATH)

def parse_and_dump(
    df: pd.DataFrame, 
    bcp_file_name: str, 
    table_name: str, 
    columns: Optional[List[str]] = None, 
    reload: bool = True, 
    drop_table: bool = False, 
    sep: str = '\t',
    extract_year: bool = False
) -> None:
    """
    Generic method to parse DataFrame and dump to a file with error handling.

    Parameters:
        df (pd.DataFrame): DataFrame to be processed.
        bcp_file_name (str): Name of the BCP file.
        table_name (str): Name of the table to dump data.
        columns (Optional[List[str]]): List of columns to reindex the DataFrame.
        reload (bool): Flag to reload data.
        drop_table (bool): Flag to drop the table before dumping.
        sep (str): Separator for the CSV file.
        extract_year (bool): Flag to extract the year from 'dc_date_issued' column.
    """
    bcp_file_path = OUT_BCP_PATH2 / bcp_file_name
    skipped_file_path = OUT_BCP_PATH2 / f'skipped_{bcp_file_name}'

    if reload:
        if columns:
            df = df[df.columns.intersection(set(columns))]
            df = df.reindex(columns=columns)
        if extract_year:
            df['rc_year'] = df['dc_date_issued'].str.extract(r'([0-9]{4})', expand=True)
        df.to_csv(bcp_file_path, sep=sep, index=False, index_label='\t', header=False)
        logging.info(f'== INFO - DONE - Wrote data to file {bcp_file_path}')

    try_again = -1
    count = 0

    while try_again != 0:
        logging.info(f"== START Try Again {count}")
        try_again = dump_table(table_name=table_name, bcp_file=bcp_file_path, columns=columns or df.columns.to_list(), drop_table=drop_table, sep=sep)
        logging.info(f'{try_again=}')
        if try_again > 0:
            count += 1
            with open(bcp_file_path, "r+") as f:
                lines = f.readlines()
                f.seek(0)
                skipped_row = lines.pop(try_again - 1)
                logging.info(f"INFO - Removed row: {skipped_row.strip()}")
                f.truncate()
                f.writelines(lines)
            with open(skipped_file_path, "a") as f2:
                f2.write(skipped_row)
        elif try_again == -1:
            logging.error("ERROR - Unrecoverable error during dumping table")
            break
    else:
        logging.info("== Try Again DONE")

In [4]:
# thse steps can be done year by year or as a one big dataset
big_df = bulk_load_df()
reordered_df = big_df.reindex(columns=ALL_COLUMNS_SORTED)
print(big_df.info())
big_df['ethz_size'] = big_df['ethz_size'].apply(lambda size: size.replace('\t', " ") if pd.notna(size) else size)
big_df['handle_id'] = big_df['dc_identifier_uri'].apply(lambda uri: uri.split("/")[-1] if pd.notna(uri) else uri)
big_df['dc_title'] = big_df['dc_title'].apply(lambda title: title.replace('\\', "") if pd.notna(title) else title)

reordered_df = big_df.reindex(columns=ALL_COLUMNS_SORTED)
print(reordered_df.duplicated(subset=['handle_id']).value_counts())
#print(reordered_df.duplicated(subset=['handle_id']))
reordered_df.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 266038 entries, 0 to 266037
Columns: 134 entries, dc_contributor_author to ethz_date_openbisupload
dtypes: object(134)
memory usage: 272.0+ MB
None


  big_df['handle_id'] = big_df['dc_identifier_uri'].apply(lambda uri: uri.split("/")[-1] if pd.notna(uri) else uri)


False    266036
True          2
Name: count, dtype: int64
<class 'pandas.core.frame.DataFrame'>
RangeIndex: 266038 entries, 0 to 266037
Columns: 135 entries, dc_contributor to rc_item_id
dtypes: object(135)
memory usage: 274.0+ MB


In [5]:
leitzhal_df = reordered_df[['handle_id', 'rc_item_id','ethz_leitzahl', 'ethz_leitzahlidentifiers', 'ethz_leitzahl_certified', 'ethz_leitzahlidentifiers_certified']]
parse_leitzahl(leitzhal_df)

== INFO - Executed table creation for RCLeitzahl
== INFO - cursor and connection CLOSED.


In [6]:
AUTHORSHIP_BCP_FILE = 'authorship.bcp'
bcp_file_path = OUT_BCP_PATH+AUTHORSHIP_BCP_FILE
contributors_set, local_authorship_list = parse_contributors(reordered_df)
global_authorship_df = pd.DataFrame(local_authorship_list, columns=AUTHORSHIP_COLUMNS)
global_authorship_df.to_csv(bcp_file_path, sep='\t', index=False, index_label='\t', header=False)

parse_and_dump(global_authorship_df, AUTHORSHIP_BCP_FILE, AUTHORSHIP_TABLE, AUTHORSHIP_COLUMNS, drop_table=True)

== INFO - Executed table creation for RCAuthorship
== INFO - cursor and connection CLOSED.


In [7]:
parse_and_dump(reordered_df, 'publications.bcp', PUBLICATION_TABLE, PUBLICATION_COLUMNS, extract_year=True, drop_table=True)

== INFO - Executed table creation for RCPublication
== INFO - cursor and connection CLOSED.


In [8]:
parse_and_dump(reordered_df, 'master.bcp', MASTER_TABLE, drop_table=True)

== INFO - Executed table creation for RCMasterTable
== INFO - cursor and connection CLOSED.
