In [None]:
!pip install google-generativeai



In [None]:
cache = {}
Triplets_collection = []

def get_text(response):
    candidates = response.candidates

    if candidates:
        content_parts = candidates[0].content.parts
        texts = [part.text for part in content_parts if part.text]  # Ensure that part.text is not None or empty

        if texts:
            json_text = texts[0]
            # Check if json_text starts and ends with ```json which indicates it's a JSON block
            if json_text.startswith("```json") and json_text.endswith("```"):
                json_text = json_text[7:-3].strip()  # Strip the markdown code block indicators
                try:
                    data = json.loads(json_text)
                    return data
                except json.JSONDecodeError as e:
                    print("Failed to decode JSON:", e)  # Print error message if JSON is invalid
                    return None  # Return None or handle as needed
            else:
                print("No JSON block found in the text.")
                return None  # Return None if there is no JSON block
        else:
            print("No text available to decode.")
            return None  # Return None if there are no text parts

    print("No candidates available.")
    return None


In [None]:
import os
import json
import google.generativeai as genai
from google.colab import userdata
import pandas as pd
import time
key = userdata.get('GEMINI_API_KEY')
genai.configure(api_key=key)

generation_config = {
  "temperature": 1,
  "top_p": 0.95,
  "top_k": 64,
  "max_output_tokens": 8192,
  "response_mime_type": "text/plain",
}

In [None]:
df = pd.read_csv("harry_potter_books.csv")

def list_chapters(book):
    return df[df['book'] == book]['chapter'].unique()

def get_chapter_text(book, chapter, max_length=1000):

    chapter_data = df[(df['book'] == book) & (df['chapter'] == chapter)]
    chapter_text = ' '.join(chapter_data['text'])
    chunks = []
    while len(chapter_text) > max_length:
        cut_off = chapter_text.rfind(' ', 0, max_length)
        if cut_off == -1:
            cut_off = max_length
        chunks.append(chapter_text[:cut_off])
        chapter_text = chapter_text[cut_off:].lstrip()

    if chapter_text:
        chunks.append(chapter_text)

    return chunks





In [None]:
def send_request_with_retries(chat_sessionC, request, max_retries=4, delay=2):
    retry_count = 0
    while retry_count < max_retries:
        try:
            # Attempt to send the request
            response = chat_sessionC.send_message(request)
            return response.text
        except Exception as e:
            # Print error message and retry after delay
            print(f"Error on attempt {retry_count + 1}: {e}")
            time.sleep(delay)
            retry_count += 1
    # Return None if all retries fail
    return None

In [None]:
modelE = genai.GenerativeModel(
  model_name="gemini-1.5-flash",
  generation_config=generation_config,
  system_instruction = "You are an expert in Knowledge Graph. You will try your best to extract the entities and relationships from the text. Do not produce any triplets that is not in the text."
)

chat_sessionE = modelE.start_chat(
  history=[
  ]
)


In [None]:
modelE = genai.GenerativeModel(
  model_name="gemini-1.5-flash",
  generation_config=generation_config,
  system_instruction = "You are an expert in Knowledge Graph. You will try your best to extract the entities and relationships from the text. Do not produce any triplets that is not in the text. And the triplets should two entities' interaction."
)

chat_sessionE = modelE.start_chat(
  history=[
  ]
)


In [None]:
def Extraction(text):
  request = f"""
  Given a piece of text, extract relational triplets in the form of {{head: Subject, type: Relation, tail: Object}} from it. Here are some examples.
  ### Example ###
  Text: The 17068.8 millimeter long ALCO RS-3 has a diesel-electric transmission.
  Triplets: {{head: "ALCO RS-3", type: "powerType", tail: "Diesel-electric transmission"}}, {{head: "ALCO RS-3", type: "length", tail: "17068.8 millimeters"}}

  Now extract triplets from the following text:
  {text}
  """
  response = chat_sessionE.send_message(request)
  return get_text(response)

In [None]:
modelD = genai.GenerativeModel(
  model_name="gemini-1.5-flash",
  generation_config=generation_config,
  system_instruction = "You are an expert in giving the definition of entities and relationships from the text."
)

chat_sessionD = modelD.start_chat(
  history=[
  ]
)



In [None]:
def Definition(text,free_extract):
  request = f"""
  Given a piece of text and a list of relational triplets extracted from it, write a definition for each relationship present.
  ### Example ###
  Text: The 17068.8 millimeter long ALCO RS-3 has a diesel-electric transmission.
  Triplets:{{"head": "ALCO RS-3", "type": "powerType", "tail": "Diesel-electric transmission"}}, {{"head": "ALCO RS-3", "type": "length", "tail": "17068.8 millimeters"}}
  Definition: {{"powerType": "The subject entity uses the type of power or energy source specified by the object entity"}},{{"length": "The measurement or extent of something from end to end; the greater of two or the greatest of three dimensions of an object."}}

  Now extract triplets from the following text and triplets:
  {text}
  {free_extract}
  """

  response = chat_sessionD.send_message(request)
  return get_text(response)

In [None]:
modelC = genai.GenerativeModel(
  model_name="gemini-1.5-flash",
  generation_config=generation_config,
  system_instruction = "You are an expert in finding the most relevant words from file. Only if the two words are similar and you are confident about or you seem this word is a new category."
)

chat_sessionC = modelC.start_chat(
  history=[
  ]
)

requestC = f"""
Given a piece of text, triplets, and their definitions, check if any synonym pairs in the definition have a similar meaning to entries in the file "cache". If a synonym is found, replace the corresponding word in the triplets. Respond with "no" if no synonymous word can be found in the cache. You just answer "no" if there are not corresponding words, or answer only the replaced triplets. I do not need any extract information.

Text: {{text}}
Triplets: {{triplets}}
Definition: {{definition}}
Cache: {{cache}}
"""



In [None]:
def Canonicalization(text,free_extract,definition):
  for triplets in free_extract:

    print('process request')
    request = requestC.format(text=text, triplets=triplets, definition=definition, cache=cache)
    print('finish process request')
    response_text = send_request_with_retries(chat_sessionC, request)
    print(response_text)
    if response_text is not None:
      if "no" in response_text:
        cache[triplets['type']] = definition[triplets['type']]
        Triplets_collection.append(triplets)
      else:
        Triplets_collection.append(triplets)

In [None]:
def mainfunction(text):
  print(text)
  free_extract = Extraction(text)
  if not free_extract:
    print('no extract from the chunk')
    return
  print('free_Extract:', free_extract)
  definition = Definition(text,free_extract)
  if not definition:
    print('no definition from the chunk')
    return
  print('definition:',definition)
  Canonicalization(text,free_extract,definition)
  print('Triplets_collection:' ,Triplets_collection,'/nCache:',cache)

In [None]:
book_name = "Book 1: Philosopher's Stone"
list_chatper = list_chapters(book_name)
for chapter in list_chatper:
  chunks_of_chapter = get_chapter_text(book_name, chapter, 3000)
  for chunk in chunks_of_chapter:
    mainfunction(chunk)

In [None]:
with open('Triplets_collection.json', 'w') as json_file:
    json.dump(Triplets_collection, json_file)
