In [1]:
%pwd

'c:\\Users\\PrinciaFernandes\\Mresult\\Phenomix\\notebooks'

In [2]:
import os
os.chdir("../")

In [3]:
%pwd

'c:\\Users\\PrinciaFernandes\\Mresult\\Phenomix'

In [12]:
import requests
from bs4 import BeautifulSoup
import pdfplumber
import io
import json
from models.llm_model.gemini_model import json_response
from models.prompts.Prompts import prompt_overview



In [None]:

def overview_extract(pdf_path):
    with pdfplumber.open(pdf_path) as pdf:
        overview_page = ''
        for page in pdf.pages[:4]:
            text = page.extract_text()
            if 'Overview' in text:
                overview_page += text    
            if ' Request Send Date' in text and  'Glossary' not in text: 
                overview_page += text   
        overview = json_response(prompt_overview(overview_page))
    return overview

def description_extract(pdf_path):
    with pdfplumber.open(pdf_path) as pdf:
        description_dict = []
        chunk_size = 4
        pages = len(pdf.pages)
        chunks = [range(i,i+chunk_size) if i + chunk_size <= pages else range(i,pages) for i in range(3,pages,chunk_size) ]
        for chunk in chunks:
            extracted = ''
            extracted_data = []
            for each_page in chunk:
                page = pdf.pages[each_page]
                text = page.extract_text()
                if not 'Glossary' in text and not 'Generic Name' in text and not 'Brand Name' in text:
                    if len((extracted + text).split()) < 1800:
                        extracted += text
                    else:
                        extracted_data.append(text)
            if extracted != '':
                extracted_data.insert(0,extracted)
            for text in extracted_data:
                description = json_response(prompt_description(text))
                description_new = json.loads(description.replace("\n",''))
                if len(description_new) > 1:
                    phenotype_description = []
                    codes = description_new
                    columns = codes[0].split("|")
                    for row in codes[1:]:
                        rows = row.split("|")
                        d ={}
                        for k,v in zip(columns,rows):
                            d[k] = v
                        phenotype_description.append(d)
                    description_dict += phenotype_description
                else:
                    continue
    return description_dict

def extract_exceptionl(pdf_path):
    with pdfplumber.open(pdf_path) as pdf:   
        extracted_text = ''
        for page in pdf.pages:
            text = page.extract_text()
            if 'Overview' in text:
                pass  
            if ' Request Send Date' in text and  'Glossary' not in text: 
                pass
            else:
                extracted_text += text
        overview = overview_extract(pdf_path)
        
        description = json_response(prompt_description(extracted_text))
        description_json = json.loads(description.replace("\n",''))
        if len(description_json) > 1:
            phenotype = []
            codes = description_json
            columns = codes[0].split("|")
            for row in codes[1:]:
                rows = row.split("|")
                code_dictionary ={k:v for k,v in zip(columns,rows)}
                phenotype.append(code_dictionary)
    phenotype_dict = json.loads(overview)
    phenotype_dict["Code_Description"] = phenotype
    return phenotype_dict

def phenotype(pdf_url):
    response = requests.get(pdf_url)
    pdf_content = response.content
    pdf_path = io.BytesIO(pdf_content)

    if not 'algorithm_Critical_COVID_updated.pdf' in pdf_path:
        overview = overview_extract(pdf_path)
        description = description_extract(pdf_path)
        phenotype_dict = json.loads(overview)
        phenotype_dict['Code_Description'] = description
    else:
        phenotype_dict = extract_exceptionl(pdf_path)

    return phenotype_dict


def sentinel_scrapping(base_url):
    sentinel  = []
    response = requests.get(base_url)
    soup = BeautifulSoup(response.content, 'html.parser')
    page_links = []
    for link in soup.find_all('a', href=True):
        if 'health-outcomes-interest/' in link['href']: 
            full_link = requests.compat.urljoin(base_url, link['href'])
            response = requests.get(full_link)
            if response.status_code == 200:
                soup_page = BeautifulSoup(response.text, 'html.parser')
                
                if "Outcomes Assessed in Inferential Analyses" in soup_page.get_text():
                    page_links.append(full_link)
                    
    for link in page_links[:2]:
        content = requests.get(link)
        if response.status_code == 200:  
            soup_page = BeautifulSoup(content.text,'html.parser')
        for link in soup_page.find_all('a', href=True):
            if ".pdf" in link['href']: 
                pdf_link = requests.compat.urljoin(base_url, link['href'])
                sentinel_dict = phenotype(pdf_link)
                sentinel.append(sentinel_dict)
    return sentinel



In [5]:
from src.utils import save_json
from src.config import RAW_DIR,SENTINEL_URL

In [None]:

def main():
    base_url = SENTINEL_URL
    sentinel = sentinel_scrapping(base_url)
    save_json(RAW_DIR,sentinel,'SENTINEL')
    return sentinel

sentinel = main()

In [None]:

from datetime import datetime
import re


In [None]:

def get_query_date(date):
    if date:
        query_period = date.replace(" to "," - ")
        query_period = date.replace(" – "," - ")

        query_dates = re.findall(r"\b[A-Za-z]+\s\d{1,2},\s\d{4}\s-\s[A-Za-z]+\s\d{1,2},\s\d{4}|\b\d{4}\s*-\s*\d{4}\b",query_period)
        if any(" - " in d for d in query_dates): 
            start_date = [datetime.strptime(date.split(" - ")[0],"%B %d, %Y") for date in query_dates]
            end_date = [datetime.strptime(date.split(" - ")[-1],"%B %d, %Y") for date in query_dates]
        else:
            start_date = [datetime.strptime(date.split("-")[0],"%Y") for date in query_dates]
            end_date = [datetime.strptime(date.split("-")[-1],"%Y") for date in query_dates]
        Query_start_date = ", ".join([datetime.strftime(date,"%Y-%m-%dT%H:%M:%S.00Z") for date in start_date])
        Query_end_date = ", ".join([datetime.strftime(date,"%Y-%m-%dT%H:%M:%S.00Z") for date in end_date])
    else:
        Query_start_date = 'NA'
        Query_end_date = 'NA'
    return Query_start_date,Query_end_date

def get_request_date(date):
    if date:
        request_date = re.findall(r"\b[A-Za-z]+\s\d{1,2},\s\d{4}",date)
        Request_date = [datetime.strptime(date,"%B %d, %Y") for date in request_date]
        Request_send_date = ", ".join([datetime.strftime(date,"%Y-%m-%dT%H:%M:%S.00Z") for date in Request_date])
    else:
        Request_send_date = 'NA'
    return Request_send_date

def get_detail(sentinel):
    outcome_list = []
    sentinel_detail = []

    for phenotype in sentinel:
        if phenotype['Overview']['Outcome'] not in outcome_list:
            outcome_list.append(phenotype['Overview']['Outcome'])        
            detail_dictionary = {}
            detail_dictionary['Outcome'] = phenotype['Overview']['Outcome']
            detail_dictionary['Title'] = phenotype['Overview']['Title']
            detail_dictionary['Request_id'] = phenotype['Overview']['Request IDs'] if phenotype['Overview']['Request IDs'] else 'NA'
            detail_dictionary['Query_start_date'],detail_dictionary['Query_end_date'] = get_query_date(phenotype['Overview']['Query period'])
            detail_dictionary['Description'] = phenotype['Overview']['Description']
            detail_dictionary['Algorithm_to_define_outcome'] = phenotype['Overview']['Algorithm to define outcome']
            detail_dictionary['Request_send_date'] = get_request_date(phenotype['Overview']['Request to send dates'])
            sentinel_detail.append(detail_dictionary)
        else:
            for detail in sentinel_detail:
                if detail['Outcome'] == phenotype['Overview']['Outcome']:
                    detail['Title'] = detail['Title']+ f' \n {phenotype['Overview']['Title']}' 
                    detail['Request_id'] = detail['Request_id']+ f' \n {phenotype['Overview']['Request IDs']}' 
                    query_start_date,query_end_date = get_query_date(phenotype['Overview']['Request to send dates'])
                    detail['Query_start_date'] = detail['Query_start_date']+ f' \n {query_start_date}'
                    detail['Query_end_date'] = detail['Query_end_date']+ f' \n {query_end_date}' 
                    detail['Description'] = detail['Description']+ f' \n {phenotype['Overview']['Description']}' 
                    detail['Algorithm_to_define_outcome'] = detail['Algorithm_to_define_outcome']+ f' \n {phenotype['Overview']['Algorithm to define outcome']}' 
                    detail['Request_send_date'] = detail['Request_send_date']+ f' \n {get_request_date(phenotype['Overview']['Request to send dates'])}' 
        
    sorted_detail = sorted(sentinel_detail,key = lambda x: x['Outcome'])

    i = 0
    for detail in sorted_detail:
        i += 1
        detail['PID'] = f'SP{i:06d}'

    return sorted_detail

def get_concept(sentinel,detail):
    sentinel_concept = []
    code_list = []
    for phenotype in sentinel:
        for codes in phenotype['Code_Description']: 
            if codes['Code'] not in code_list:
                code_list.append(codes['Code'])
                concept_dictioanry = {}
                concept_dictioanry['Code'] =  codes['Code']
                concept_dictioanry['Description'] = codes['Description'] if codes['Description'] else ['NA']
                concept_dictioanry['Care_setting'] = [codes['Care_setting']] if 'Care_setting' in codes.keys() else ['NA']
                concept_dictioanry['Code_type']= [codes['Code_Type']] if 'Code_Type' in codes.keys() else ['NA']
                concept_dictioanry['Code_category']= [codes['Code_Category']] if 'Code_Category' in codes.keys() else ['NA']
                concept_dictioanry['Principal_diagnosis']= [codes['Principal diagnosis']] if 'Principal_diagnosis' in codes.keys() else ['NA']
                concept_dictioanry['Outcome']= [phenotype['Overview']['Outcome']]
                concept_dictioanry['Request_id']= [phenotype['Overview']['Request IDs']]
                concept_dictioanry['PIDs']= [d['PID'] for d in detail if d['Outcome'] == phenotype['Overview']['Outcome']]
                sentinel_concept.append(concept_dictioanry)
            else:
                for concept in sentinel_concept:
                    if concept['Code'] == codes['Code']:
                        concept['Care_setting'].append(codes['Care_setting']) if 'Care_setting' in codes.keys() else concept['Care_setting'].append('NA')
                        concept['Code_type'].append(codes['Code_Type']) if 'Code_Type' in codes.keys() else concept['Code_type'].append('NA')
                        concept['Code_category'].append(codes['Code_Category']) if 'Code_Category' in codes.keys() else concept['Code_category'].append('NA')
                        concept['Principal_diagnosis'].append(codes['Principal_diagnosis']) if 'Principal_diagnosis' in codes.keys() else concept['Principal_diagnosis'].append('NA')
                        concept['Outcome'].append(phenotype['Overview']['Outcome'])
                        concept['Request_id'].append(phenotype['Overview']['Request IDs']) if phenotype['Overview']['Request IDs'] else concept['Request_id'].append('NA')
                        concept['PIDs'].extend([d['PID'] for d in detail if d['Outcome'] == phenotype['Overview']['Outcome']])

    sorted_concept = sorted(sentinel_concept,key = lambda x: x['Code'])
    i = 0
    for dict in sorted_concept:
        i += 1
        dict['CID'] = f'SC{i:06d}'

    return sorted_concept


In [None]:
from src.utils import save_detail,save_concept
from src.config import SENTINEL_DIR

In [12]:

def main():
    # dir = r'Sentinel/'
    # with open(rf'{dir}\sentinel_phenotypes.json','r') as file:
    #     data = file.read()
    #     sentinel = json.loads(data)
    detail = get_detail(sentinel)
    concept = get_concept(sentinel,detail)
    save_detail(SENTINEL_DIR,detail,'SENTINEL')
    save_concept(SENTINEL_DIR,concept,'SENTINEL')
    return detail,concept

sentinel_detail,sentinel_concept = main()

In [14]:
from src.utils import save_json
from src.config import RAW_DIR,SENTINEL_URL
from src.utils import save_detail,save_concept
from src.config import SENTINEL_DIR

from src.scraping.sentinel_webscrapping import sentinel_scrapping

from src.processing.sentinel_concept_detail import get_detail, get_concept



In [16]:


class SentinalPipeline:

    

    def __init__(self):
        self.detail = None

    def main(self):
        base_url = SENTINEL_URL
        sentinel = sentinel_scrapping(base_url)
        save_json(RAW_DIR,sentinel,'SENTINEL')
        self.detail = get_detail(sentinel)
        concept = get_concept(sentinel,self.detail)
        save_detail(SENTINEL_DIR,self.detail,'SENTINEL')
        save_concept(SENTINEL_DIR,concept,'SENTINEL')



In [1]:
from models.llm_model.gemini_model import json_response

json_response("What is gemini?")

'{\n  "gemini": "Gemini may refer to several things, most notably Google\'s Gemini AI model. It could also refer to the constellation Gemini, or other entities that use the name. To provide a more specific definition, please clarify which Gemini you are asking about."\n}'

In [5]:
pdf_url = "https://www.sentinelinitiative.org/sites/default/files/documents/thromboembolic_stroke_algorithm_v1.0.pdf"

import requests
import io

response = requests.get(pdf_url)
pdf_content = response.content
pdf_path = io.BytesIO(pdf_content)


In [4]:
pdf_path

<_io.BytesIO at 0x22cb1913970>

In [50]:
pdf_url = 'https://www.sentinelinitiative.org/sites/default/files/surveillance-tools/validations-literature/any-fractures_codelist.pdf'

response = requests.get(pdf_url)
pdf_content = response.content
pdf_path = io.BytesIO(pdf_content)

In [51]:
import pdfplumber

In [52]:

def prompt_description(description_page):
    return f'''
    Your task is to return only a list of rows from the text {description_page}.

    Input Details:
    The input consists of rows containing different diseases extracted from a PDF file. Your task is to create a list of multiple strings following the given structure:

    1. Extract text from "code description" or "codes description" up to the end of the page.
    2. Take the first row as a header values i.e column names. 
    3. The elements of the list should be multiple strings.

    Output Structure:
    Each element or row in the list should follow this format:
    "Code|Description|Code_Type|Code_Category"
    These four columns are mandatory in each row. Use "|" to seperate the values in each element of the list, where each element is the data in  a row.

    - Code (Example: "K57.20" or "18.79", it can be either fully numeric or alphanumeric, but not fully alphabets like 'bleeding')
    - Description (Example: "Occlusion and stenosis of basilar artery with cerebral infarction")
    - Code_Type (Example: "ICD-9-CM")
    - Code_Category ( "Diagnosis" or "Procedure")

    Handling Multiline Descriptions:
    If a description spans multiple lines, do not split it into a new row, consider it as a single row.
    Example:
    "433.31 Occlusion and stenosis of multiple and bilateral precerebral arteries with cerebral ICD-9-CM Diagnosis infarction"
    The word "infarction" should remain in the same string as the code "433.31" and not move to the next row.

    Handling Missing Columns:
    If any of the four mandatory columns (Code, Description, Code_Type, Code_Category) is missing in a row, replace it with "None".
    Example: If Code_Category is missing:
    "433.01|Occlusion and stenosis of basilar artery with cerebral infarction|ICD-9-CM|None"
    If Code_Category column is missing in a row then, identify the code category from text before the words "code description" or "codes description" .
    If this is a text "International Classification of Diseases, Ninth Edition, Clinical Modification (ICD-9-CM) Diagnosis Codes, Generic Names, 
    and Healthcare Common Procedure Coding System (HCPCS) Codes Used to Define Clostridium Difficile in this Request", then Code_Category will be 'Diagnosis'.
    Thus find the code_category in the texts before the words "code description" or "codes description".

    Handling Extra Columns:
    If additional columns are present, include them.
    The minimum number of columns must always be four, but there could be five or more if extra columns exist.

    Special Cases:
    If Principal_Diagnosis and Care_Setting are present as column headers, include them in the output.
    If a row does not start with a code (e.g., "433.01"), assume None for the Code.
    Example:
    "None|NITAZOXANIDE|National Drug Code|ANY"

    Exclusions:
    Ignore disease explanations or general descriptions, such as:
    - "Intentional self-harm"
    - "Non-Cardiac Malformations"
    - "Clostridium difficile treatment dispensing within 7 days of encounter"
    Exclude the records under the column "Generic Name".
    DO NOT consider any texts or rows under the column name *'Generic Name'* and *'Brand Name'*.
    If the text has "Generic Name" or "Brand Name" exclude the records after it.

    Exclude any text enclosed in Italian formatting.
    Remove all backslashes (`\\`) and newline characters (`\\n`), replacing them with a single space.

    **Final Output**:
    * A valid list of rows, where:
     - The first string is the column headers: "Code|Description|Code_Type|Code_Category"
     - Each subsequent rows represents a disease record.

**NOTE**:
* Do not proviode any python code.

'''


In [None]:

from src.utils import get_gemini_client
from dotenv import load_dotenv
import os
from langchain_google_genai import ChatGoogleGenerativeAI


load_dotenv()
api_key = os.getenv("API_KEY")
base_url = os.getenv("BASE_URL")
client = get_gemini_client(api_key,base_url)

def json_response(prompt):
    response = client.chat.completions.create(
            model="gemini-2.0-flash",
            messages=[
                {"role": "user", "content": prompt}
            ],
            response_format = {'type': 'json_object'}
        )
    return response.choices[0].message.content


def description_extract(pdf_path):
    with pdfplumber.open(pdf_path) as pdf:
        description_dict = []
        chunk_size = 5
        pages = len(pdf.pages)
        chunks = [range(i,i+chunk_size) if i + chunk_size <= pages else range(i,pages) for i in range(3,pages,chunk_size) ]
        for chunk in chunks:
            print("Chunk:",chunk)
            extracted = ''
            extracted_data = []
            for each_page in chunk:
                page = pdf.pages[each_page]
                text = page.extract_text()
                if not 'Glossary' in text and not 'Generic Name' in text and not 'Brand Name' in text:
                    if len((extracted + text).split()) < 5000:
                        extracted += text
                    else:
                        extracted_data.append(text)
            if extracted != '':
                extracted_data.insert(0,extracted)
            print("length of extracted:",len(extracted_data))
            for text in extracted_data:
                description = json_response(prompt_description(text))

                description_new = json.loads(description.replace("\n",''))
                if len(description_new) > 1:
                    phenotype_description = []
                    codes = description_new
                    columns = codes[0].split("|")
                    for row in codes[1:]:
                        rows = row.split("|")
                        d ={}
                        for k,v in zip(columns,rows):
                            d[k] = v
                        phenotype_description.append(d)
                    description_dict += phenotype_description
                else:
                    continue
    return description_dict

In [57]:
desc = description_extract(pdf_path)

Chunk: range(3, 13)
length of extracted: 1


JSONDecodeError: Unterminated string starting at: line 1 column 29428 (char 29427)

In [55]:
desc

[{'Code': '733.1',
  'Description': 'Pathologic fracture',
  'Code_Type': 'ICD-9-CM',
  'Code_Category': 'Diagnosis'},
 {'Code': '800',
  'Description': 'Fracture of vault of skull',
  'Code_Type': 'ICD-9-CM',
  'Code_Category': 'Diagnosis'},
 {'Code': '800.0',
  'Description': 'Closed fracture of vault of skull without mention of intracranial injury',
  'Code_Type': 'ICD-9-CM',
  'Code_Category': 'Diagnosis'},
 {'Code': '800.00',
  'Description': 'Closed fracture of vault of skull without mention of intracranial injury, unspecified state of consciousness',
  'Code_Type': 'ICD-9-CM',
  'Code_Category': 'Diagnosis'},
 {'Code': '800.01',
  'Description': 'Closed fracture of vault of skull without mention of intracranial injury, no loss of consciousness',
  'Code_Type': 'ICD-9-CM',
  'Code_Category': 'Diagnosis'},
 {'Code': '800.02',
  'Description': 'Closed fracture of vault of skull without mention of intracranial injury, brief (less than one hour) loss of consciousness',
  'Code_Type':

In [46]:
from src.utils import get_gemini_client
from dotenv import load_dotenv
import os
from langchain_google_genai import ChatGoogleGenerativeAI


load_dotenv()
api_key = os.getenv("API_KEY")
base_url = os.getenv("BASE_URL")
client = get_gemini_client(api_key,base_url)

def json_response(prompt):
    response = client.chat.completions.create(
            model="gemini-2.0-flash",
            messages=[
                {"role": "user", "content": prompt}
            ],
            response_format=None
        )
    return response

In [47]:
data = json_response("What is the full form of LLM")

In [48]:
data.usage

CompletionUsage(completion_tokens=10, prompt_tokens=8, total_tokens=18, completion_tokens_details=None, prompt_tokens_details=None)

In [49]:
data.choices[0].message.content

'LLM stands for **Large Language Model**.\n'

In [36]:

def overview_extract(pdf_path):
    with pdfplumber.open(pdf_path) as pdf:
        overview_page = ''
        for page in pdf.pages[:4]:
            text = page.extract_text()
            if 'Overview' in text:
                overview_page += text    
            if ' Request Send Date' in text and  'Glossary' not in text: 
                overview_page += text   
        overview = json_response(prompt_overview(overview_page))
    return overview

In [38]:

ov = overview_extract(pdf_path)

In [40]:
ov.choices[0].message.content

'```json\n{\n  "phenotype": "Any fracture",\n  "Overview": {\n    "Title": "Fracture Algorithm Defined in Osteoporotic Fractures following Lupron Depot-PED Use: A Multiple Factor Matched Analysis",\n    "Request IDs": "cder_mpl2p_wp011_nsdp_v01",\n    "Description": "This report lists International Classification of Diseases, Ninth Revision, Clinical Modification (ICD-9- CM) and International Classification of Diseases, Tenth Revision, Clinical Modification (ICD-10-CM) diagnosis codes, and algorithms used to define the secondary outcome fractures in this request.",\n    "Outcome": "Any fracture",\n    "Algorithm to define outcome": "Evidence of an ICD-9-CM or ICD-10-CM diagnosis code used to define any type of fracture, in any care setting.",\n    "Query period": "January 1, 2000 - August 31, 2018",\n    "Request to send dates": "April 22, 2019"\n  }\n}\n```'