FEW-SHOT PROMPT

In [None]:
from langchain.prompts import (
    ChatPromptTemplate,
    FewShotChatMessagePromptTemplate,
)

In [None]:
from langchain.prompts.few_shot import FewShotPromptTemplate
from langchain.prompts.prompt import PromptTemplate

In [None]:
import os
from langchain_openai import ChatOpenAI
from langchain_core.prompts import ChatPromptTemplate
from langchain_core.output_parsers import StrOutputParser
from langchain_core.runnables import RunnablePassthrough

# Set OpenAI API key
os.environ['OPENAI_API_KEY'] = "sk-proj-SsGOtCoSAspE2vsTXEuST3BlbkFJdXj67Au7IZxs4GtwbH75"

# Define the prompt template
prompt_template = """
I want to generate an access control policy as a JSON file from a user request. The JSON file looks like this:
{{
    "trigger": {{
        "src": {{
            "user1": "U1",
            "platform": "SmartThings"
        }},
        "dst": {{
            "user2": "U2",
            "platform": "Alexa"
        }},
        "device_type": "{{device_type}}"
    }},
    "condition": {{
        "location": ["{{location}}"],
        "time": ["{{time}}", ["day", "night"]]
    }},
    "decision": {{
        "requester": "U2",
        "approver": "U1",
        "outcome": ["accept", "decline"]
    }},
    "actions": [
        {{
            "if": {{
                "decision": "accept",
                "equals": {{
                    "left": {{
                        "device": {{
                            "deviceId1": "device_id1",
                            "component": "main",
                            "capability": "{{capability}}",
                            "attribute": "{{attribute}}"
                        }}
                    }},
                    "right": {{
                        "string": "{{state}}"
                    }}
                }},
                "then": [
                    {{
                        "command": {{
                            "deviceId2": "device_id2",
                            "commands": [
                                {{
                                    "component": "main",
                                    "capability": "{{capability}}",
                                    "command": "{{command}}"
                                }}
                            ]
                        }}
                    }}
                ],
                "else": [
                    {{
                        "command": {{
                            "deviceId2": "device_id2",
                            "commands": [
                                {{
                                    "component": "main",
                                    "capability": "{{capability}}",
                                    "command": "off"
                                }}
                            ]
                        }}
                    }}
                ]
            }}
        }}
    ]
}}

Use this {request} to create the JSON file. No extra explanation needed. User needs to specify the time.
The device type is the device that is requested. 
Requester is the user in the dst, and approver is the user in the src. 
The capability, attribute, and command should be based on the functions User2 asks User1 for granting access control.
Do not change the decision value in the if statement and do not skip the action part. 
If there is missing information, keep asking the user to input the missing information. 
The outcome of the decision will be accept or decline.
"""

prompt = ChatPromptTemplate.from_template(prompt_template)

output_parser = StrOutputParser()
model = ChatOpenAI(model="gpt-4-turbo")

chain = (
    {"request": RunnablePassthrough()} 
    | prompt
    | model
    | output_parser
)




In [None]:
response = chain.invoke("U2 wants to turn on the switch in the living room in U1's account at 08:00, Day. U1 allows.")

print(response)

In [None]:
response = chain.invoke("I want the JSON file only")

print(response)

In [None]:
import json

# Function to safely parse JSON with error handling
def parse_json_response(response):
    try:
        # Filter out unwanted characters or content if necessary
        response = response.strip()  # Removing leading/trailing whitespace
        start_idx = response.find('{')
        end_idx = response.rfind('}') + 1
        if start_idx != -1 and end_idx != -1:
            response = response[start_idx:end_idx]
        return json.loads(response)
    except json.JSONDecodeError:
        print("Failed to decode JSON. Response was:", response)
        return None

# Parse the response
policy_json = parse_json_response(response)


In [None]:
import csv

def save_policy_to_csv(policy, filename="access_control_policy.csv"):
    if policy is None:
        print("No valid policy to save.")
        return
    
    # Flatten the JSON policy to save in CSV format
    flattened_policy = []
    for action in policy['actions']:
        if_action = action['if']
        flattened_policy.append({
            "trigger_src_user1": policy['trigger']['src']['user1'],
            "trigger_src_platform": policy['trigger']['src']['platform'],
            "trigger_dst_user2": policy['trigger']['dst']['user2'],
            "trigger_dst_platform": policy['trigger']['dst']['platform'],
            "trigger_device_type": policy['trigger']['device_type'],
            "condition_location": policy['condition']['location'][0],
            "condition_time": policy['condition']['time'][0],
            "decision_requester": policy['decision']['requester'],
            "decision_approver": policy['decision']['approver'],
            "decision_outcome": policy['decision']['outcome'][0],  # assuming outcome is always accept or decline
            "if_decision": if_action['decision'],
            "if_equals_left_deviceId1": if_action['equals']['left']['device']['deviceId1'],
            "if_equals_right_string": if_action['equals']['right']['string'],
            "then_command_deviceId2": if_action['then'][0]['command']['deviceId2'],
            "then_command": if_action['then'][0]['command']['commands'][0]['command'],
            "else_command_deviceId2": if_action['else'][0]['command']['deviceId2'],
            "else_command": if_action['else'][0]['command']['commands'][0]['command'],
        })
    
    # Write the CSV file
    keys = flattened_policy[0].keys()
    with open(filename, 'w', newline='') as output_file:
        dict_writer = csv.DictWriter(output_file, keys)
        dict_writer.writeheader()
        dict_writer.writerows(flattened_policy)

# Save the policy to CSV
save_policy_to_csv(policy_json)


In [None]:
import json
import csv

def parse_json_response(response):
    try:
        response = response.strip()  # Removing leading/trailing whitespace
        start_idx = response.find('{')
        end_idx = response.rfind('}') + 1
        if start_idx != -1 and end_idx != -1:
            response = response[start_idx:end_idx]
        return json.loads(response)
    except json.JSONDecodeError:
        print("Failed to decode JSON. Response was:", response)
        return None

def generate_policy_json(request):
    response = chain.invoke({"request": request})
    print("Response from model:", response)  # Debugging: Print the response
    return parse_json_response(response)

def is_function_allowed(request, filename="access_control_policy.csv"):
    # Generate the policy JSON from the request
    request_policy = generate_policy_json(request)
    if request_policy is None:
        return False

    # Flatten the request policy for comparison
    request_flattened = {
        "trigger_src_user1": request_policy['trigger']['src']['user1'],
        "trigger_src_platform": request_policy['trigger']['src']['platform'],
        "trigger_dst_user2": request_policy['trigger']['dst']['user2'],
        "trigger_dst_platform": request_policy['trigger']['dst']['platform'],
        "trigger_device_type": request_policy['trigger']['device_type'],
        "condition_location": request_policy['condition']['location'][0],
        "condition_time": request_policy['condition']['time'][0],
        "decision_requester": request_policy['decision']['requester'],
        "decision_approver": request_policy['decision']['approver'],
        "decision_outcome": request_policy['decision']['outcome'][0],  # assuming outcome is always accept or decline
        "if_decision": request_policy['actions'][0]['if']['decision'],
        "if_equals_left_deviceId1": request_policy['actions'][0]['if']['equals']['left']['device']['deviceId1'],
        "if_equals_right_string": request_policy['actions'][0]['if']['equals']['right']['string'],
        "then_command_deviceId2": request_policy['actions'][0]['if']['then'][0]['command']['deviceId2'],
        "then_command": request_policy['actions'][0]['if']['then'][0]['command']['commands'][0]['command'],
        "else_command_deviceId2": request_policy['actions'][0]['if']['else'][0]['command']['deviceId2'],
        "else_command": request_policy['actions'][0]['if']['else'][0]['command']['commands'][0]['command'],
    }

    print("Flattened request policy for comparison:", request_flattened)  # Debugging: Print the flattened policy

    # Check against the existing policies in the CSV
    with open(filename, mode='r') as file:
        csv_reader = csv.DictReader(file)
        for row in csv_reader:
            print("Checking against stored policy:", row)  # Debugging: Print each stored policy
            # Detailed comparison debug
            for key, value in request_flattened.items():
                if row[key] != str(value):
                    print(f"Mismatch found: CSV {key} = {row[key]} != Request {key} = {value}")
            if all(row[key] == str(value) for key, value in request_flattened.items()):
                return True
    return False

# Example usage to check if a function is allowed
user_request = "U2 wants to turn on the switch in the living room in U1's account at 08:00"
is_allowed = is_function_allowed(user_request)
print(f"Is the request '{user_request}' allowed: {is_allowed}")


NEW CODE

In [11]:
import os
from langchain_openai import ChatOpenAI
from langchain_core.prompts import ChatPromptTemplate
from langchain_core.output_parsers import StrOutputParser
from langchain_core.runnables import RunnablePassthrough

os.environ['OPENAI_API_KEY'] = "sk-proj-SsGOtCoSAspE2vsTXEuST3BlbkFJdXj67Au7IZxs4GtwbH75"


prompt_template = """
I want to generate an access control policy as a JSON file from a user request. The JSON file looks like this:
{{
    "trigger": {{
        "src": {{
            "user1": "U1",
            "platform": "SmartThings"
        }},
        "dst": {{
            "user2": "U2",
            "platform": "Alexa"
        }},
        "device_type": "{{device_type}}"
    }},
    "condition": {{
        "location": ["{{location}}"],
        "time": ["{{time}}", ["day", "night"]]
    }},
    "decision": {{
        "requester": "U2",
        "approver": "U1",
        "outcome": ["accept", "decline"]
    }},
    "actions": [
        {{
            "if": {{
                "decision": "accept",
                "equals": {{
                    "left": {{
                        "device": {{
                            "deviceId1": "st_dv1",
                            "component": "main",
                            "capability": "{{capability}}",
                            "attribute": "{{attribute}}"
                        }}
                    }},
                    "right": {{
                        "string": "{{state}}"
                    }}
                }},
                "then": [
                    {{
                        "command": {{
                            "deviceId2": "st_dv2",
                            "commands": [
                                {{
                                    "component": "main",
                                    "capability": "{{capability}}",
                                    "command": "{{command}}"
                                }}
                            ]
                        }}
                    }}
                ],
                "else": [
                    {{
                        "command": {{
                            "deviceId2": "st_dv2",
                            "commands": [
                                {{
                                    "component": "main",
                                    "capability": "{{capability}}",
                                    "command": "off"
                                }}
                            ]
                        }}
                    }}
                ]
            }}
        }}
    ]
}}

Use this {request} to create the JSON file. No extra explanation needed. User needs to specify the time.
The device type is the device that is requested. 
Requester is the user in the dst, and approver is the user in the src. 
The capability, attribute, and command should be based on the functions User2 asks User1 for granting access control.
Do not change the decision value in the if statement and do not skip the action part. 
If there is missing information, keep asking the user to input the missing information. 
The outcome of the decision will be accept or decline.
"""

prompt = ChatPromptTemplate.from_template(prompt_template)

output_parser = StrOutputParser()
model = ChatOpenAI(model="gpt-4-turbo")

chain = (
    {"request": RunnablePassthrough()} 
    | prompt
    | model
    | output_parser
)


In [14]:
import json

response = chain.invoke({"request": "U2 wants to turn on the switch in the living room in U1's account at 10:00, Day. U1 allows."})

print(response)


def parse_json_response(response):
    try:
        response = response.strip()  
        start_idx = response.find('{')
        end_idx = response.rfind('}') + 1
        if start_idx != -1 and end_idx != -1:
            response = response[start_idx:end_idx]
        return json.loads(response)
    except json.JSONDecodeError:
        print("Failed to decode JSON. Response was:", response)
        return None

# Parse the response
policy_json = parse_json_response(response)


Based on the request provided, here is the JSON file with the specific details filled in:

```json
{
    "trigger": {
        "src": {
            "user1": "U1",
            "platform": "SmartThings"
        },
        "dst": {
            "user2": "U2",
            "platform": "Alexa"
        },
        "device_type": "switch"
    },
    "condition": {
        "location": ["living room"],
        "time": ["10:00", ["day", "night"]]
    },
    "decision": {
        "requester": "U2",
        "approver": "U1",
        "outcome": ["accept", "decline"]
    },
    "actions": [
        {
            "if": {
                "decision": "accept",
                "equals": {
                    "left": {
                        "device": {
                            "deviceId1": "st_dv1",
                            "component": "main",
                            "capability": "switch",
                            "attribute": "switch"
                        }
                    },
       

In [10]:
response = chain.invoke({"Device ID 1 is st_device1, and device id 2 is st_device2"})
print(response)

To generate the JSON file accurately, I need the following information from you:
1. The device type being requested.
2. The capability, attribute, and command that User2 is asking User1 to grant.
3. The state that should be compared in the "if" condition.
4. The time and location conditions for the access.

Please provide the required details.


In [15]:
import csv

def save_policy_to_csv(policy, filename="access_control_policy.csv"):
    if policy is None:
        print("No valid policy to save.")
        return
    
    # Save in CSV format
    flattened_policy = []
    for action in policy['actions']:
        if_action = action['if']
        flattened_policy.append({
            "trigger_src_user1": policy['trigger']['src']['user1'],
            "trigger_src_platform": policy['trigger']['src']['platform'],
            "trigger_dst_user2": policy['trigger']['dst']['user2'],
            "trigger_dst_platform": policy['trigger']['dst']['platform'],
            "trigger_device_type": policy['trigger']['device_type'],
            "condition_location": policy['condition']['location'][0],
            "condition_time": policy['condition']['time'][0],
            "decision_requester": policy['decision']['requester'],
            "decision_approver": policy['decision']['approver'],
            "decision_outcome": policy['decision']['outcome'][0], 
            "if_decision": if_action['decision'],
            "if_equals_left_deviceId1": if_action['equals']['left']['device']['deviceId1'],
            "if_equals_right_string": if_action['equals']['right']['string'],
            "then_command_deviceId2": if_action['then'][0]['command']['deviceId2'],
            "then_command": if_action['then'][0]['command']['commands'][0]['command'],
            "else_command_deviceId2": if_action['else'][0]['command']['deviceId2'],
            "else_command": if_action['else'][0]['command']['commands'][0]['command'],
        })
    
    # Check if the CSV file exists
    file_exists = os.path.isfile(filename)

    # Write the CSV file
    keys = flattened_policy[0].keys()
    with open(filename, 'a', newline='') as output_file:
        dict_writer = csv.DictWriter(output_file, keys)
        if not file_exists:
            dict_writer.writeheader()
        dict_writer.writerows(flattened_policy)

# Save the policy to CSV
save_policy_to_csv(policy_json)


In [18]:
def parse_json_response(response):
    try:
        response = response.strip()  
        start_idx = response.find('{')
        end_idx = response.rfind('}') + 1
        if start_idx != -1 and end_idx != -1:
            response = response[start_idx:end_idx]
        return json.loads(response)
    except json.JSONDecodeError:
        print("Failed to decode JSON. Response was:", response)
        return None

def generate_policy_json(request):
    response = chain.invoke({"request": request})
    print("Response from model:", response)
    return parse_json_response(response)

def is_function_allowed(request, filename="access_control_policy.csv"):
    # Generate the policy JSON from the request
    request_policy = generate_policy_json(request)
    if request_policy is None:
        return False

    request_flattened = {
        "trigger_src_user1": request_policy['trigger']['src']['user1'],
        "trigger_src_platform": request_policy['trigger']['src']['platform'],
        "trigger_dst_user2": request_policy['trigger']['dst']['user2'],
        "trigger_dst_platform": request_policy['trigger']['dst']['platform'],
        "trigger_device_type": request_policy['trigger']['device_type'],
        "condition_location": request_policy['condition']['location'][0],
        "condition_time": request_policy['condition']['time'][0],
        "decision_requester": request_policy['decision']['requester'],
        "decision_approver": request_policy['decision']['approver'],
        "decision_outcome": request_policy['decision']['outcome'][0], 
        "if_decision": request_policy['actions'][0]['if']['decision'],
        "if_equals_left_deviceId1": request_policy['actions'][0]['if']['equals']['left']['device']['deviceId1'],
        "if_equals_right_string": request_policy['actions'][0]['if']['equals']['right']['string'],
        "then_command_deviceId2": request_policy['actions'][0]['if']['then'][0]['command']['deviceId2'],
        "then_command": request_policy['actions'][0]['if']['then'][0]['command']['commands'][0]['command'],
        "else_command_deviceId2": request_policy['actions'][0]['if']['else'][0]['command']['deviceId2'],
        "else_command": request_policy['actions'][0]['if']['else'][0]['command']['commands'][0]['command'],
    }

    print("Flattened request policy for comparison:", request_flattened)

    # Check against the existing policies in the CSV
    with open(filename, mode='r') as file:
        csv_reader = csv.DictReader(file)
        for row in csv_reader:
            print("Checking against stored policy:", row)
            match = True
            for key, value in request_flattened.items():
                if row[key] != str(value):
                    print(f"Mismatch found: CSV {key} = {row[key]} != Request {key} = {value}")
                    match = False
            if match:
                return True
    return False

user_request = "U2 wants to turn on the switch in the living room in U1's account at 10:00"
is_allowed = is_function_allowed(user_request)
print(f"Is the request '{user_request}' allowed: {is_allowed}")


Response from model: {
    "trigger": {
        "src": {
            "user1": "U1",
            "platform": "SmartThings"
        },
        "dst": {
            "user2": "U2",
            "platform": "Alexa"
        },
        "device_type": "switch"
    },
    "condition": {
        "location": ["living room"],
        "time": ["10:00", ["day", "night"]]
    },
    "decision": {
        "requester": "U2",
        "approver": "U1",
        "outcome": ["accept", "decline"]
    },
    "actions": [
        {
            "if": {
                "decision": "accept",
                "equals": {
                    "left": {
                        "device": {
                            "deviceId1": "st_dv1",
                            "component": "main",
                            "capability": "switch",
                            "attribute": "switch"
                        }
                    },
                    "right": {
                        "string": "on"
               