## Mapping Values To Datatbase

In [125]:
from dotenv import load_dotenv
import os

load_dotenv()

True

### Load Chat Model

In [126]:
from langchain_anthropic import ChatAnthropic

model = ChatAnthropic(model='claude-3-opus-20240229')

In [142]:
from pydantic import BaseModel, Field, field_validator
from typing import List, Union
from langchain.prompts import ChatPromptTemplate

class PaysokoEntities(BaseModel):
    """Identifying information about Paysoko entities."""
    office_locations: Union[List[str], None] = Field(
        default_factory=list,
        description="Python List object all office locations mentioned in the text (e.g. ['LOC001', 'Paysoko CBD'])"
    )
    services: Union[List[str], None] = Field(
        default_factory=list,
        description="Python List object all payment services mentioned in the text (e.g. ['PS001', 'Money Transfer'])"
    )
    appointments: Union[List[str], None] = Field(
        default_factory=list,
        description="Python List object of all appointments mentioned in the text (e.g. ['APT001'])"
    )
    office_hours: Union[List[str], None] = Field(
        default_factory=list,
        description="Python List object of office hours mentioned (e.g. ['opening_time', 'closing_time', 'monday'])"
    )

    @field_validator('office_locations', 'services', 'appointments', 'office_hours')
    def validate_list_fields(cls, v, info):
        if v is None:
            return []
        if isinstance(v, str):
            # Handle string representation of a list
            v = v.strip('[]').replace("'", "").split(', ')
            return [item.strip() for item in v if item.strip()]
        if isinstance(v, list):
            return v
        return list(v)

    class Config:
        arbitrary_types_allowed = True

prompt = ChatPromptTemplate.from_messages([
    (
        "system", 
        "You are extracting office locations, services, appointments, and operating hours from Paysoko text queries. Do always return full Python objects that are synthatically correct."
    ),
    (
        "human", 
        "Use the given format to extract information from the following input: {question}"
    ),
])

entity_chain = prompt | model.with_structured_output(PaysokoEntities)

In [143]:
entities = entity_chain.invoke({"question": "What are your working hours"})
entities

PaysokoEntities(office_locations=[], services=[], appointments=[], office_hours=['opening_time', 'closing_time'])

In [144]:
from langchain_community.graphs import Neo4jGraph

In [145]:
graph = Neo4jGraph()

In [179]:
from typing import Optional

In [146]:
def map_to_database(entities: PaysokoEntities) -> Optional[str]:
   fulltext_query = """
   CALL db.index.fulltext.queryNodes($indexName, $value) 
   YIELD node, score
   WITH node, score, labels(node)[0] AS type
   RETURN 
       CASE type
           WHEN 'OfficeLocation' THEN node.location_name
           WHEN 'Services' THEN node.service_name
           WHEN 'Appointment' THEN node.appointment_id
       END AS result,
       type,
       score
   ORDER BY score DESC
   LIMIT 1
   """

   # Separate query for office hours
   hours_query = """
   MATCH (h:OfficeHour)
   WHERE h.day_of_week = $time OR h.opening_time = $time OR h.closing_time = $time
   RETURN 
       h.day_of_week + ' ' + h.opening_time + '-' + h.closing_time as result,
       'OfficeHour' as type,
       1.0 as score
   LIMIT 1
   """

   result = ""
       
   # Map entities using fulltext search
   for entity_type, entity_list in [
       ("locationIndex", entities.office_locations),
       ("serviceIndex", entities.services), 
       ("appointmentIndex", entities.appointments)
   ]:
       for entity in entity_list:
           try:
               response = graph.query(fulltext_query, {
                   "indexName": entity_type,
                   "value": entity
               })
               if response and len(response) > 0:
                   result += (f"{entity} maps to {response[0]['result']} "
                            f"({response[0]['type']}) with score "
                            f"{response[0]['score']:.2f}\n")
               else:
                   result += f"No match found for {entity}\n"
           except Exception as e:
               print(f"Error mapping entity {entity}: {e}")

   # Handle office hours separately
   for time in entities.office_hours:
       try:
           response = graph.query(hours_query, {"time": time})
           if response and len(response) > 0:
               result += (f"{time} maps to {response[0]['result']} "
                        f"({response[0]['type']}) with score "
                        f"{response[0]['score']:.2f}\n")
           else:
               result += f"No match found for {time}\n"
       except Exception as e:
           print(f"Error mapping office hour {time}: {e}")

   return result if result else None

In [147]:
result = map_to_database(entities)
print(result)

No match found for opening_time
No match found for closing_time



In [148]:
from langchain_core.output_parsers import StrOutputParser
from langchain_core.runnables import RunnablePassthrough

# Generate Cypher statement based on natural language input for Paysoko system
cypher_template = """Based on the Paysoko Neo4j graph schema below, write a Cypher query that would answer the user's question:

{schema}

The entities mentioned in the question map to these database values:
{entities_list}

User Question: {question}

Write a Cypher query to answer this question.
Note: Focus only on Appointments, Services, OfficeLocations and Office Hours relationships.

Cypher query:"""

# Create chat prompt template
cypher_prompt = ChatPromptTemplate.from_messages([
   (
       "system", 
       "Generate a Cypher query to get information from the Paysoko database. Return only the query without explanation."
   ),
   ("human", cypher_template),
])

# Chain the components together
cypher_response = (
   RunnablePassthrough.assign(entities=entity_chain) |
   RunnablePassthrough.assign(
       entities_list=lambda x: map_to_database(x["entities"]),
        # Creates an anonymous function that takes any parameter (the underscore indicates we won't use this parameter
        # def get_schema(_):
        #  return graph.get_schema
       schema=lambda _: graph.get_schema,
   ) |
   cypher_prompt |
   model.bind(stop=["\nResult:"]) |
   StrOutputParser()
)

In [149]:
cypher = cypher_response.invoke({"question": "What are your working hours"})
cypher

'MATCH (o:OfficeLocation)-[:WORKING_HOURS]->(h:OfficeHour)\nRETURN o.location_name AS office, \n       h.day_of_week AS day,\n       h.opening_time AS opens, \n       h.closing_time AS closes'

### Natural Language Response

In [150]:
from langchain.chains.graph_qa.cypher_utils import CypherQueryCorrector, Schema

In [166]:
from langchain.chains.graph_qa.cypher_utils import CypherQueryCorrector, Schema

corrector_schema = [
    Schema(el["start"], el["type"], el["end"])
    for el in graph.structured_schema.get("relationships")
]
cypher_validation = CypherQueryCorrector(corrector_schema)

# Template for generating natural language responses
response_template = """
Based on the question, Cypher query, and database response, provide a natural language answer in markdown format:

Question: {question}
Cypher query: {query} 
Database Response: {response}

Response should focus on:
- Office locations and working hours
- Available services and costs
- Appointment details and scheduling
"""

response_prompt = ChatPromptTemplate.from_messages([
    (
        "system",
        "You are a customer service assistant for Paysoko. Provide clear, direct answers based on the query results."
    ),
    ("human", response_template),
])

# Chain together the components
chain = (
    RunnablePassthrough.assign(query=cypher_response) |
    RunnablePassthrough.assign(
        response=lambda x: graph.query(cypher_validation(x["query"]))
    ) |
    response_prompt | 
    model | 
    StrOutputParser()
)

In [167]:
response = chain.invoke({"question": "What are your working hours"})

In [169]:
print(response)

Here are the working hours for our different Paysoko office locations:

## Paysoko CBD
- Monday - Friday: 08:00 - 17:00
- Saturday: 09:00 - 15:00 
- Sunday: Closed

## Paysoko Westlands  
- Monday - Friday: 08:30 - 18:00
- Saturday: 09:00 - 16:00
- Sunday: Closed

## Paysoko Eastleigh
- Monday - Friday: 08:00 - 17:30  
- Saturday: 09:00 - 15:00
- Sunday: Closed

## Paysoko Karen
- Monday - Friday: 09:00 - 17:00
- Saturday: 10:00 - 15:00
- Sunday: Closed 

## Paysoko Kasarani
- Monday - Friday: 08:30 - 17:30
- Saturday: 09:00 - 15:00 
- Sunday: Closed

Our offices are open on weekdays and Saturdays with slightly reduced hours. All locations are closed on Sundays.

Please let me know if you need any other information about our services, costs or scheduling an appointment at one of our branches. I'd be happy to assist further.


In [172]:
from IPython.display import Markdown

In [171]:
Markdown(response)

Here are the working hours for our different Paysoko office locations:

## Paysoko CBD
- Monday - Friday: 08:00 - 17:00
- Saturday: 09:00 - 15:00 
- Sunday: Closed

## Paysoko Westlands  
- Monday - Friday: 08:30 - 18:00
- Saturday: 09:00 - 16:00
- Sunday: Closed

## Paysoko Eastleigh
- Monday - Friday: 08:00 - 17:30  
- Saturday: 09:00 - 15:00
- Sunday: Closed

## Paysoko Karen
- Monday - Friday: 09:00 - 17:00
- Saturday: 10:00 - 15:00
- Sunday: Closed 

## Paysoko Kasarani
- Monday - Friday: 08:30 - 17:30
- Saturday: 09:00 - 15:00 
- Sunday: Closed

Our offices are open on weekdays and Saturdays with slightly reduced hours. All locations are closed on Sundays.

Please let me know if you need any other information about our services, costs or scheduling an appointment at one of our branches. I'd be happy to assist further.

### Testing

In [173]:
questions = [
    "What are your working hours",
    "Can I a book an appointment for 2024-12-19,11:00",
    "What kind of services do you offer",
    "Tell me more about your Merchant Services",
]

In [174]:
question_responses = []
for question in questions:
    response = chain.invoke({'question': question})
    question_responses.append(response)
    

### Responses

In [175]:
Markdown(question_responses[0])

Here are the working hours for our Paysoko office locations:

**Paysoko CBD:**
- Monday - Friday: 8:00 AM - 5:00 PM
- Saturday: 9:00 AM - 3:00 PM 
- Sunday: Closed

**Paysoko Westlands:**
- Monday - Friday: 8:30 AM - 6:00 PM
- Saturday: 9:00 AM - 4:00 PM
- Sunday: Closed

**Paysoko Eastleigh:** 
- Monday - Friday: 8:00 AM - 5:30 PM
- Saturday: 9:00 AM - 3:00 PM
- Sunday: Closed

**Paysoko Karen:**
- Monday - Friday: 9:00 AM - 5:00 PM 
- Saturday: 10:00 AM - 3:00 PM
- Sunday: Closed

**Paysoko Kasarani:**
- Monday - Friday: 8:30 AM - 5:30 PM
- Saturday: 9:00 AM - 3:00 PM 
- Sunday: Closed

All of our offices are open Monday through Saturday, but closed on Sundays. Hours vary slightly by location. Please reach out if you need any additional details on our services, costs or to schedule an appointment. We'll be happy to assist you further.

In [176]:
Markdown(question_responses[1])

Yes, there are office locations available for appointments on Thursday, December 19, 2024 at 11:00. Our offices are open during that time and have appointment slots available.

To schedule an appointment for that date and time, please let me know:
- Which office location you'd like to visit
- What service(s) you need (e.g. notary, money order, check cashing, etc.)
- The full name, phone number and email address to book the appointment under

I'll be happy to check appointment availability, provide the service fees, and get you scheduled. Let me know if you need any other information!

In [177]:
Markdown(question_responses[2])

Here is a summary of the services offered by Paysoko:

## Financial Services
- **Money Transfer:** Domestic transfers between bank accounts and mobile wallets. Cost: KES 150. Duration: 15 min.  
- **Bill Payment:** Pay utility bills including electricity, water and internet. Cost: KES 100. Duration: 10 min.
- **International Remittance:** Send money to various countries worldwide. Cost: KES 500. Duration: 20 min.
- **Business Payments:** B2B payment solutions and bulk payment processing. Cost: KES 300. Duration: 25 min.
- **Mobile Top-up:** Purchase airtime and data bundles for all networks. Cost: KES 50. Duration: 5 min.
- **Government Payments:** Process government fees, licenses and permits. Cost: KES 200. Duration: 30 min. 
- **School Fees:** Make payments to educational institutions. Cost: KES 150. Duration: 15 min.

## Merchant Services
- POS terminals and payment gateway solutions for businesses 
- Cost: KES 1000
- Duration: 45 min setup time

Please note that costs and durations are estimates and may vary. Contact us for more details on pricing, to make an appointment, or check office locations and hours.

In [178]:
Markdown(question_responses[3])

Here are the key details about our Merchant Services:

## Service Details
- **Service**: Merchant Services
- **Description**: POS and payment gateway solutions for businesses 
- **Cost**: KSH 1,000
- **Duration**: 45 minutes

## Availability
To set up our Merchant Services, please visit one of our office locations during regular business hours:
- Monday to Friday: 9:00 AM to 5:00 PM
- Saturday: 9:00 AM to 12:00 PM
- Sunday: Closed

## Appointment Scheduling
Appointments are required to get started with Merchant Services. To schedule an appointment:
1. Call our Merchant Services team at 0700-123-456
2. Provide your business details and preferred date and time
3. Our team will confirm your appointment slot within 1 business day

During your 45 minute appointment, we'll discuss your business needs, set up your POS system and payment gateway, and provide training on how to use the service.

Please let me know if you have any other questions! We look forward to working with you to enable digital payments for your business.

## Utility Class

In [184]:
from dotenv import load_dotenv
import os
from langchain_anthropic import ChatAnthropic
from pydantic import BaseModel, Field, field_validator
from typing import List, Union, Optional
from langchain.prompts import ChatPromptTemplate
from langchain_community.graphs import Neo4jGraph
from langchain_core.output_parsers import StrOutputParser
from langchain_core.runnables import RunnablePassthrough
from langchain.chains.graph_qa.cypher_utils import CypherQueryCorrector, Schema

class PaysokoEntities(BaseModel):
   """Identifying information about Paysoko entities."""
   office_locations: Union[List[str], None] = Field(
       default_factory=list,
       description="Python List object all office locations mentioned in the text (e.g. ['LOC001', 'Paysoko CBD'])"
   )
   services: Union[List[str], None] = Field(
       default_factory=list,
       description="Python List object all payment services mentioned in the text (e.g. ['PS001', 'Money Transfer'])"
   )
   appointments: Union[List[str], None] = Field(
       default_factory=list,
       description="Python List object of all appointments mentioned in the text (e.g. ['APT001'])"
   )
   office_hours: Union[List[str], None] = Field(
       default_factory=list,
       description="Python List object of office hours mentioned (e.g. ['opening_time', 'closing_time', 'monday'])"
   )

   @field_validator('office_locations', 'services', 'appointments', 'office_hours')
   def validate_list_fields(cls, v, info):
       if v is None:
           return []
       if isinstance(v, str):
           print(v)
           v = v.strip('[]').replace("'", "").split(', ')
           return [item.strip() for item in v if item.strip()]
       if isinstance(v, list):
           return v
       return list(v)

   class Config:
       arbitrary_types_allowed = True

class PaysokoQA:
    def __init__(self):
       load_dotenv()
       self.model = ChatAnthropic(model='claude-3-opus-20240229')
       self.graph = Neo4jGraph()
       self.setup_chains()

    def setup_chains(self):
       # Entity extraction chain
       prompt = ChatPromptTemplate.from_messages([
           (
               "system", 
               "You are extracting office locations, services, appointments, and operating hours from Paysoko text queries. Do always return full Python objects that are synthatically correct."
           ),
           (
               "human", 
               "Use the given format to extract information from the following input: {question}"
           ),
       ])
       self.entity_chain = prompt | self.model.with_structured_output(PaysokoEntities)

       # Cypher generation chain
       cypher_template = """Based on the Paysoko Neo4j graph schema below, write a Cypher query that would answer the user's question:

       {schema}

       The entities mentioned in the question map to these database values:
       {entities_list}

       User Question: {question}

       Write a Cypher query to answer this question.
       Note: Focus only on Appointments, Services, OfficeLocations and Office Hours relationships.

       Cypher query:"""

       cypher_prompt = ChatPromptTemplate.from_messages([
           (
               "system", 
               "Generate a Cypher query to get information from the Paysoko database. Return only the query without explanation."
           ),
           ("human", cypher_template),
       ])

       self.cypher_response = (
           RunnablePassthrough.assign(entities=self.entity_chain) |
           RunnablePassthrough.assign(
               entities_list=lambda x: self.map_to_database(x["entities"]),
               schema=lambda _: self.graph.get_schema,
           ) |
           cypher_prompt |
           self.model.bind(stop=["\nResult:"]) |
           StrOutputParser()
       )

       # Schema validation
       corrector_schema = [
           Schema(el["start"], el["type"], el["end"])
           for el in self.graph.structured_schema.get("relationships")
       ]
       self.cypher_validation = CypherQueryCorrector(corrector_schema)

       # Response generation chain
       response_template = """
       Based on the question, Cypher query, and database response, provide a natural language answer in markdown format:

       Question: {question}
       Cypher query: {query} 
       Database Response: {response}

       Response should focus on:
       - Office locations and working hours
       - Available services and costs
       - Appointment details and scheduling
       
       The tone of voice you should use in your final response:
       {tone_of_voice}
       """

       response_prompt = ChatPromptTemplate.from_messages([
           (
               "system",
               "You are a customer service assistant for Paysoko. Provide clear, direct answers based on the query results."
           ),
           ("human", response_template),
       ])

       self.chain = (
           RunnablePassthrough.assign(query=self.cypher_response) |
           RunnablePassthrough.assign(
               response=lambda x: self.graph.query(self.cypher_validation(x["query"]))
           ) |
           response_prompt | 
           self.model | 
           StrOutputParser()
       )

    def map_to_database(self, entities: PaysokoEntities) -> Optional[str]:
       fulltext_query = """
       CALL db.index.fulltext.queryNodes($indexName, $value) 
       YIELD node, score
       WITH node, score, labels(node)[0] AS type
       RETURN 
           CASE type
               WHEN 'OfficeLocation' THEN node.location_name
               WHEN 'Services' THEN node.service_name
               WHEN 'Appointment' THEN node.appointment_id
           END AS result,
           type,
           score
       ORDER BY score DESC
       LIMIT 1
       """

       hours_query = """
       MATCH (h:OfficeHour)
       WHERE h.day_of_week = $time OR h.opening_time = $time OR h.closing_time = $time
       RETURN 
           h.day_of_week + ' ' + h.opening_time + '-' + h.closing_time as result,
           'OfficeHour' as type,
           1.0 as score
       LIMIT 1
       """

       result = ""
       
       for entity_type, entity_list in [
           ("locationIndex", entities.office_locations),
           ("serviceIndex", entities.services), 
           ("appointmentIndex", entities.appointments)
       ]:
           for entity in entity_list:
               try:
                   response = self.graph.query(fulltext_query, {
                       "indexName": entity_type,
                       "value": entity
                   })
                   if response and len(response) > 0:
                       result += (f"{entity} maps to {response[0]['result']} "
                               f"({response[0]['type']}) with score "
                               f"{response[0]['score']:.2f}\n")
                   else:
                       result += f"No match found for {entity}\n"
               except Exception as e:
                   print(f"Error mapping entity {entity}: {e}")

       for time in entities.office_hours:
           try:
               response = self.graph.query(hours_query, {"time": time})
               if response and len(response) > 0:
                   result += (f"{time} maps to {response[0]['result']} "
                           f"({response[0]['type']}) with score "
                           f"{response[0]['score']:.2f}\n")
               else:
                   result += f"No match found for {time}\n"
           except Exception as e:
               print(f"Error mapping office hour {time}: {e}")

       return result if result else None

    def ask(self, question: str, tone_of_voice: str) -> str:
       """Main method to ask questions"""
       return self.chain.invoke({"question": question, "tone_of_voice": tone_of_voice})
   
   
   
    async def a_ask(self, question: str, tone_of_voice: str) -> str:
       """Main method to ask questions asynchronously"""
       response = await self.chain.ainvoke({"question": question, "tone_of_voice": tone_of_voice})
       return response

In [189]:
# Initialize the QA system
qa = PaysokoQA()

# Ask a question
question="What are your working hours?"
answer = qa.ask(question=question, tone_of_voice="Be friendly and warm.")
Markdown(answer)

Here are the working hours for our Paysoko office locations:

**Paysoko CBD**
- Monday - Friday: 8:00 AM - 5:00 PM
- Saturday: 9:00 AM - 3:00 PM 
- Sunday: Closed

**Paysoko Westlands**
- Monday - Friday: 8:30 AM - 6:00 PM
- Saturday: 9:00 AM - 4:00 PM
- Sunday: Closed

**Paysoko Eastleigh**
- Monday - Friday: 8:00 AM - 5:30 PM 
- Saturday: 9:00 AM - 3:00 PM
- Sunday: Closed

**Paysoko Karen**
- Monday - Friday: 9:00 AM - 5:00 PM
- Saturday: 10:00 AM - 3:00 PM 
- Sunday: Closed

**Paysoko Kasarani**
- Monday - Friday: 8:30 AM - 5:30 PM
- Saturday: 9:00 AM - 3:00 PM
- Sunday: Closed

We're open Monday through Saturday at all locations to serve you. Feel free to visit us during those hours for any of your financial needs. Let us know if you have any other questions!

### Log User Responses Into An CSV File

In [190]:
import csv
from datetime import datetime

class QALogger:
   def __init__(self, filename="qa_logs.csv"):
       self.filename = filename
       self.setup_csv()
       
   def setup_csv(self):
       """Create CSV file with headers if it doesn't exist"""
       if not os.path.exists(self.filename):
           with open(self.filename, 'w', newline='') as file:
               writer = csv.writer(file)
               writer.writerow(['timestamp', 'question', 'response'])

   def log_qa(self, question: str, response: str):
       """Log a question-answer pair to CSV"""
       timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
       with open(self.filename, 'a', newline='') as file:
           writer = csv.writer(file)
           writer.writerow([timestamp, question, response])

In [191]:
QA_logger = QALogger() 

In [192]:
QA_logger.log_qa(question, response)  # Log Q&A)