In [11]:
from pathlib import Path
from collections import defaultdict
import yaml
import re
import string
import math
import pandas as pd
import duckdb
from unidecode import unidecode
import iteround
from utils.pandas_setup import pandas_setup
pandas_setup()

with open('../folders.yaml', 'r') as in_file:
    folders = yaml.safe_load(in_file)
snapshot_duckdb = folders.get('snapshot_duckdb')
working_duckdb = folders.get("working_duckdb")
temp_duckdb = folders.get("temp_duckdb")
print(f'{snapshot_duckdb = } {Path(snapshot_duckdb).exists() = }')
print(f'{working_duckdb = } {Path(working_duckdb).exists() = }')
print(f'{temp_duckdb = } {Path(temp_duckdb).exists() = }')

project = folders.get("project")
print(f'{project = }')
PUNC = string.punctuation
print(f'{PUNC = }')

type_types = ['erratum', 'retraction', 'letter', 'libguides', 'editorial', 'article', 'preprint',
          'report', 'review', 'standard', 'other', 'reference-entry', 'book', 'dissertation', 
          'paratext', 'supplementary-materials', 'book-chapter', 'dataset', 'grant', 'peer-review']

def reduce_works_topics(lst: list) -> str:
    # print(type(lst), lst)
    if not isinstance(lst, list) or not lst:
        return None
    return '; '.join([item.get('id')  for k, item in enumerate(lst) if k < 3]) # and item.get('score') > 0.5]) # keep top three topics if score > 0.5

def clean_name(s) -> str:
    if s is None or len(s) < 1:
        return s
    return re.sub(' +', ' ',  ''.join([c if c not in PUNC else ' ' for c in unidecode(s).lower()])).strip().capitalize()

with duckdb.connect(f'{working_duckdb}/{project}.duckdb') as db:
    topic_for_map = db.sql(f"SELECT * FROM {project}.topic_to_for").df()
    print(f'{topic_for_map.shape}\n{topic_for_map.head()}')
    dd = dict(zip(topic_for_map.topic_id, topic_for_map.for_weights))
    dd = {k: {kk:vv for kk, vv in v.items() if vv/max(v.values()) > 0.1} for k, v in dd.items()}
    dd = {k:v for k, v in dd.items() if v}
    print(len(dd.keys()), dd)

def transform_topics_to_for(lst: list) -> list[dict]:
    # print(f'{type(lst) = } {lst = }')
    if not lst:
        return {}
    d = defaultdict(float)
    for item in lst:
        score = item['score']
        if not (temp := dd.get(item['id'])):
            return {}
        for k, v in temp.items():
            d[k] += v*score
    d = {k: 10*v/sum(d.values()) for k, v in d.items()}
    # print(f'{d = }')
    scaled = [{'topic_id':k, 'topic_score': v} for k, v in iteround.saferound(d, 0).items() if v > 0.1]
    # print(f'{scaled = }')
    return scaled

snapshot_duckdb = '/home/lc/m/openalex_dec24/duckdb' Path(snapshot_duckdb).exists() = True
working_duckdb = '/home/lc/m/working' Path(working_duckdb).exists() = True
temp_duckdb = '/home/lc/m/.tmp' Path(temp_duckdb).exists() = True
project = 'ERA'
PUNC = '!"#$%&\'()*+,-./:;<=>?@[\\]^_`{|}~'
(4516, 2)
                      topic_id                                                                                                                                      for_weights
0  https://openalex.org/T10299  {'5102': 149994.4779999996, '4006': 117460.00689999975, '4009': 107355.16840000029, '4008': 37415.87250000001, '5108': 26377.08370000006, '5...
1  https://openalex.org/T10325  {'5101': 140388.8558999996, '5109': 105560.57780000023, '5107': 72654.96279999992, '3705': 10892.295500000004, '3706': 9184.778599999989, '5...
2  https://openalex.org/T10172  {'3201': 135760.53630000015, '3202': 77431.21989999998, '4207': 8689.351800000004, '4003': 5556.420299999982, '4203': 3938.0520999999862, 

In [12]:
class SetUp:

    def __init__(self):
        self.project = project
        self._set_database()
        df = self.db.sql("SHOW ALL TABLES").df()
        print(df.head(999))
        print("SETUP IS COMPLETED")
    
    def _set_database(self):
        # open a "memory" database
        self.db = duckdb.connect(":memory: AS memory")
        try:
            self.db.create_function('transform_topics_to_for', 
                                    transform_topics_to_for, 
                                    return_type=duckdb.duckdb.typing.DuckDBPyType(list[dict[str,float]]),
                                    null_handling='special',
                                    # exception_handling='return_null'
                                    )
        except Exception as e:
            print(f'{e = }')
        self.db.sql(f"""                
                    SET memory_limit = '48GB';
                    SET threads = 20;
                    -- SET partitioned_write_flush_threshold = 1024;
                    SET preserve_insertion_order = false;                                        
                    SET temp_directory = '{temp_duckdb}';
                    """)
        self._attach_project_database()
        self._attach_oa_database()
        # self.db.sql("SHOW ALL TABLES").show()
        return self
       
    def _attach_project_database(self):
        # attach the project "ERA" working database 
        db_file = f'{working_duckdb}/{self.project}.duckdb'
        print(f'{db_file = } {Path(db_file).exists() = }')
        self.db.sql(f"ATTACH IF NOT EXISTS '{db_file}'")    # def setup_test(self):
        return
    
    def _attach_oa_database(self):
        # attach full OA databases

        self.db.sql("ATTACH IF NOT EXISTS'/home/lc/m/openalex_dec24/duckdb/sources.duckdb'")
        # self.db.sql("SELECT * FROM sources.sources").show()
        self.db.sql("ATTACH IF NOT EXISTS '/home/lc/m/openalex_dec24/duckdb/topics.duckdb'")
        # self.db.sql("SELECT * FROM topics.topics").show() 
        self.db.sql("ATTACH IF NOT EXISTS '/home/lc/m/openalex_dec24/duckdb/works.duckdb'")
        # self.db.sql("SELECT * FROM works.topics").show()                 
        # self.db.sql("SHOW ALL TABLES").show()
        return self

In [13]:
class LoadJournal(SetUp):

    def __init__(self):
        super().__init__()
        return
    
    def extract_all_journal(self, issn=None):
        sql = """
                SELECT * 
                    FROM works.works
            """
        self.db.sql(sql).show()
        return

In [14]:
def main():

    lj = LoadJournal()
    lj.extract_all_journal()
    
    return

In [15]:
if __name__ == "__main__":
    main()
    print("DONE!")

e = CatalogException('Catalog Error: Scalar Function with name "transform_topics_to_for" already exists!')
db_file = '/home/lc/m/working/ERA.duckdb' Path(db_file).exists() = True
   database schema                     name                                                                                                                                     column_names                                                                                                                                     column_types  temporary
0       ERA   main                  authors  [for_codes, for_full_count, author_id, orcid, display_name, display_name_alternatives, works_count, cited_by_count, most_cited_work, summary...  [VARCHAR[], BIGINT[], VARCHAR, VARCHAR, VARCHAR, VARCHAR[], BIGINT, BIGINT, VARCHAR, STRUCT("2yr_mean_citedness" DOUBLE, h_index BIGINT, i10...      False
1       ERA   main              authors_for                                                                                        