In [1]:
import pandas as pd
import sqlite3
import glob
from elasticsearch import Elasticsearch, helpers
import os 
import json
from tqdm import tqdm

In [None]:
ES_HOST = "http://elasticsearch:9200" 
ES_PASSWORD = "" 
DB_PATH = "/workspace/0-utils/1-data/pubmed/db/pubmed_n25_0000.db" 
INDEX_NAME = "pubmed"
BULK_SIZE = 5000

print(f"1. Reading data from SQLite: {DB_PATH}")
try:
    conn = sqlite3.connect(DB_PATH)
    df = pd.read_sql_query("SELECT * FROM raw", conn)
    conn.close()
    df = df.fillna("")
    print(f"   -> Data loaded. Total records: {len(df)}")
except Exception as e:
    print(f"   -> Error reading SQLite: {e}")
    exit()

es = Elasticsearch(
    ES_HOST, 
    basic_auth=("elastic", ES_PASSWORD),
    request_timeout=60
)

print(f"\n2. Connecting to Elasticsearch and creating index: {INDEX_NAME}")

MAPPING_BODY = {
  "settings": {
    "analysis": {
      "analyzer": {
        "english_exact": {
          "tokenizer": "standard",
          "filter": ["lowercase"]
        }
      }
    }
  },
  "mappings": {
    "properties": {
      "PMID":        { "type": "keyword" },
      "TITLE":       { "type": "text", "analyzer": "english_exact" },
      "ABST":        { "type": "text", "analyzer": "english_exact" },  
      "ABST_ENG":    { "type": "integer" },
      "TRUNCUTATED": { "type": "integer" },
      "JOURNAL":     { "type": "keyword" },
      "ISSN":        { "type": "keyword" },
      "PUB_YEAR":    { "type": "integer" },
      "PUB_MONTH":   { "type": "integer" }
    }
  }
}

es.indices.create(index=INDEX_NAME, body=MAPPING_BODY)
print(f"   -> Index '{INDEX_NAME}' created successfully with optimized mapping.")

print("\n3. Starting high-speed Bulk Indexing...")

def generate_actions(df):
    """pandas DataFrameからElasticsearchのBulk API用アクションを生成"""
    for index, row in df.iterrows():
        doc = row.to_dict()
        yield {
            "_index": INDEX_NAME,
            "_id": doc["PMID"],
            "_source": doc
        }

try:
    successes, errors = helpers.bulk(
        es, 
        generate_actions(df),
        chunk_size=BULK_SIZE,
        request_timeout=60 # タイムアウトを延長
    )
    print(f"\nBulk Indexing Complete.")
    print(f"   -> Total documents indexed: {len(df)}")
    print(f"   -> Successfully indexed: {successes}")
    if errors:
        print(f"   -> Errors encountered: {len(errors)}")

except Exception as e:
    print(f"   -> An error occurred during bulk indexing: {e}")

print("\n4. Final Index Count Check:")
try:
    count_res = es.count(index=INDEX_NAME)
    print(f"   -> Total documents in ES index '{INDEX_NAME}': {count_res['count']}")
except Exception as e:
    print(f"   -> Error checking document count: {e}")

1. Reading data from SQLite: /workspace/0-utils/1-data/pubmed/db/pubmed_n25_0000.db
   -> Data loaded. Total records: 100000

2. Connecting to Elasticsearch and creating index: pubmed
   -> Index 'pubmed' created successfully with optimized mapping.

3. Starting high-speed Bulk Indexing...


  successes, errors = helpers.bulk(



Bulk Indexing Complete.
   -> Total documents indexed: 100000
   -> Successfully indexed: 100000

4. Final Index Count Check:
   -> Total documents in ES index 'pubmed': 90000


In [None]:
DB_DIR = "/workspace/0-utils/1-data/pubmed/db/"

es = Elasticsearch(ES_HOST, basic_auth=("elastic", ES_PASSWORD), request_timeout=60)
db_files = glob.glob(os.path.join(DB_DIR, "pubmed_n25_*.db"))

if not db_files:
    print(f"Error: No .db files found in {DB_DIR}")
    exit()

print(f"1. Found {len(db_files)} database files to process.")

MAPPING_BODY = {
  "settings": {
    "analysis": {
      "analyzer": {
        "english_exact": {
          "tokenizer": "standard",
          "filter": ["lowercase"]
        }
      }
    }
  },
  "mappings": {
    "properties": {
      "PMID":        { "type": "keyword" },
      "TITLE":       { "type": "text", "analyzer": "english_exact" },
      "ABST":        { "type": "text", "analyzer": "english_exact" },
      "ABST_ENG":    { "type": "integer" },
      "TRUNCUTATED": { "type": "integer" },
      "JOURNAL":     { "type": "keyword" },
      "ISSN":        { "type": "keyword" },
      "PUB_YEAR":    { "type": "integer" },
      "PUB_MONTH":   { "type": "integer" }
    }
  }
}

print(f"\n2. Connecting to Elasticsearch and creating index: {INDEX_NAME}")
try:
    if es.indices.exists(index=INDEX_NAME):
        print(f"   -> Index '{INDEX_NAME}' already exists. Deleting...")
        es.indices.delete(index=INDEX_NAME)

    es.indices.create(index=INDEX_NAME, body=MAPPING_BODY)
    print(f"   -> Index '{INDEX_NAME}' created successfully with optimized mapping.")
except Exception as e:
    print(f"   -> Error creating index: {e}")
    exit()

total_docs_indexed = 0

def generate_actions(df_chunk, index_name):
    """pandas DataFrameからElasticsearchのBulk API用アクションを生成"""
    for index, row in df_chunk.iterrows():
        doc = row.to_dict()
        yield {
            "_index": index_name,
            "_id": doc["PMID"],
            "_source": doc
        }

for i, db_file in enumerate(tqdm(db_files)):
    
    try:
        conn = sqlite3.connect(db_file)
        df = pd.read_sql_query("SELECT * FROM raw", conn)
        conn.close()
        df = df.fillna("")
        
        num_records = len(df)
        print(f"   -> Total records in file: {num_records}")
        successes, errors = helpers.bulk(
            es, 
            generate_actions(df, INDEX_NAME),
            chunk_size=BULK_SIZE,
            request_timeout=120 # タイムアウトをさらに延長
        )
        
        if errors:
            print(f"   -> WARNING: {len(errors)} errors encountered in this batch.")
        
        total_docs_indexed += successes

    except Exception as e:
        print(f"   -> FATAL ERROR processing {os.path.basename(db_file)}: {e}")

print("\n--- Final Summary ---")
print(f"Total documents successfully processed (sent to ES): {total_docs_indexed}")

try:
    count_res = es.count(index=INDEX_NAME)
    print(f"Total unique documents in ES index '{INDEX_NAME}': {count_res['count']}")
except Exception as e:
    print(f"Error checking final document count: {e}")

In [None]:
ES_HOST = "http://elasticsearch:9200" 
ES_PASSWORD = "" 
INDEX_NAME = "pubmed"
BULK_SIZE = 5000

query_word = "Coproporphyrin I"
search_index = "pubmed"

es = Elasticsearch(
    ES_HOST, 
    basic_auth=("elastic", ES_PASSWORD),
    request_timeout=60 
)

print(f"\n--- 5. Searching for '{query_word}' in {search_index}/ABST ---")

search_body = {
  "query": {
    "multi_match": {
      "query": query_word,
      "fields": ["ABST"],
      "type": "phrase"
    }
  },
  "sort": [
    {"PUB_YEAR": {"order": "asc"}}
  ],
  "_source": ["PMID", "TITLE", "PUB_YEAR"],
  "size": 3
}

try:
    res = es.search(index=search_index, body=search_body)
    
    total_hits = res['hits']['total']['value']
    
    print(f"   -> Total documents found: {total_hits}")

    if total_hits > 0:
        print("\n--- Top 3 Results ---")
        for hit in res['hits']['hits'][:3]:
            source = hit['_source']
            print(f"   PMID: {source.get('PMID')}")
            print(f"   YEAR: {source.get('PUB_YEAR')}")
            print(f"   TITLE: {source.get('TITLE')}")
            print("-" * 30)

except Exception as e:
    print(f"   -> Error during search query: {e}")


--- 5. Searching for 'Coproporphyrin I' in pubmed/ABST ---
   -> Total documents found: 196

--- Top 3 Results ---
   PMID: 1578410
   YEAR: 0
   TITLE: Porphyrin rings and phospholipids: stimulators of cloning efficiency in certain species of Tetrahymena.
------------------------------
   PMID: 12830817
   YEAR: 0
   TITLE: Influence of linker unit on performance of palladium(II) coproporphyrin labelling reagent and its bioconjugates.
------------------------------
   PMID: 9861496
   YEAR: 0
   TITLE: Porphyrins in urine, plasma, erythrocytes, bile and faeces in a case of congenital erythropoietic porphyria (Gunther's disease) treated with blood transfusion and iron chelation: lack of benefit from oral charcoal.
------------------------------


# make sentence table

In [14]:
ES_HOST = "http://elasticsearch:9200" 
ES_PASSWORD = "micgm1Gemini" 
DB_PATH = "/workspace/0-utils/1-data/pubmed/db/pubmed_n25_0000.db" 
BULK_SIZE = 5000

INDEX_NAME = "pubmed_sentence"

MAPPING_BODY = {
  "settings": {
    "analysis": {
      "analyzer": {
        "english_exact": {
          "tokenizer": "standard",
          "filter": ["lowercase"]
        }
      }
    }
  },
  "mappings": {
    "properties": {
      "PMID": { "type": "keyword" }, 
      "SENTID":        { "type": "keyword" },
      "SENTENCE":    { "type": "text", "analyzer": "english_exact" },
    }
  }
}

def generate_actions(df):
    for index, row in df.iterrows():
        doc = row.to_dict()
        yield {
            "_index": INDEX_NAME,
            "_id": doc["SENTID"],
            "_source": doc
        }

es = Elasticsearch(
    ES_HOST, 
    basic_auth=("elastic", ES_PASSWORD),
    request_timeout=60
)
es.indices.create(index=INDEX_NAME, body=MAPPING_BODY)

conn = sqlite3.connect(DB_PATH)
df = pd.read_sql_query("SELECT * FROM sent_split", conn)
conn.close()
df = df.fillna("")

successes, errors = helpers.bulk(
    es, 
    generate_actions(df),
    chunk_size=BULK_SIZE,
    request_timeout=60
)

  successes, errors = helpers.bulk(


In [None]:
DB_DIR = "/workspace/0-utils/1-data/pubmed/db/"
db_files = glob.glob(os.path.join(DB_DIR, "pubmed_n25_*.db"))

def generate_actions(df, index_name):
    for row in df.itertuples(index=False):
        doc = row._asdict()
        yield {
            "_index": index_name,
            "_id": doc.get("SENTENCE_ID"), 
            "_source": doc
        }

print(f"\n2. Connecting to Elasticsearch and creating index: {INDEX_NAME}")
try:
    if es.indices.exists(index=INDEX_NAME):
        print(f"   -> Index '{INDEX_NAME}' already exists. Deleting...")
        es.indices.delete(index=INDEX_NAME)

    es.indices.create(index=INDEX_NAME, body=MAPPING_BODY)
    print(f"   -> Index '{INDEX_NAME}' created successfully with optimized mapping.")
except Exception as e:
    print(f"   -> Error creating index: {e}")
    exit()


for i, db_file in enumerate(tqdm(db_files)):
    try:
        conn = sqlite3.connect(db_file)
        df = pd.read_sql_query("SELECT * FROM sent_split", conn)
        conn.close()
        df = df.fillna("")
        
        num_records = len(df)
        successes, errors = helpers.bulk(
            es, 
            generate_actions(df, INDEX_NAME),
            chunk_size=BULK_SIZE,
            request_timeout=120 # タイムアウトをさらに延長
        )
        
        if errors:
            print(f"   -> WARNING: {len(errors)} errors encountered in this batch.")

    except Exception as e:
        print(f"   -> FATAL ERROR processing {os.path.basename(db_file)}: {e}")

print("\n--- Final Summary ---")
try:
    count_res = es.count(index=INDEX_NAME)
    print(f"Total unique documents in ES index '{INDEX_NAME}': {count_res['count']}")
except Exception as e:
    print(f"Error checking final document count: {e}")


2. Connecting to Elasticsearch and creating index: pubmed_sentence
   -> Index 'pubmed_sentence' already exists. Deleting...
   -> Index 'pubmed_sentence' created successfully with optimized mapping.


  successes, errors = helpers.bulk(
  3%|▎         | 13/382 [05:26<2:46:03, 27.00s/it]