In [1]:
import pandas as pd
from transformers import AutoTokenizer
from transformers import RobertaTokenizer, EncoderDecoderModel
import gradio as gr
import string

  from .autonotebook import tqdm as notebook_tqdm


In [2]:
metadata = "dataset/metadata.csv"

In [3]:
def get_function_mapping(df_metadata, split_ref):
    df_cp = df_metadata[['function', 'desc', 'split']].copy()
    df_cp.drop_duplicates(inplace=True)
    return {function:desc for function, desc, split in df_cp.values if split==split_ref}

def get_channel_mapping(df_metadata):
    df_cp = df_metadata[['channel', 'channel_desc']].copy()
    df_cp.drop_duplicates(inplace=True)
    return {channel:desc for channel, desc in df_cp.values}

def get_valid_field(df_metadata):
    df_cp = df_metadata[["field"]].copy()
    df_cp.drop_duplicates(inplace=True)
    field_list = df_cp.field.tolist()
    field_list_out=[]
    for temp_field in field_list:
        temp_field = temp_field.split(" ### ")
        temp_field = [x.strip().lower() for x in temp_field if x.strip() != '']
        temp_field = [x.translate(str.maketrans(' ', '_', string.punctuation)) for x in temp_field]
        for item in temp_field:
            if item not in field_list_out:
                field_list_out.append(item)
    return field_list_out

def get_metadata(path):
    df_metadata = pd.read_csv(path)
    df_metadata = df_metadata.fillna('')
    function_dict_trigger = get_function_mapping(df_metadata=df_metadata, split_ref='trigger')
    function_dict_action = get_function_mapping(df_metadata=df_metadata, split_ref='action')
    channel_dict = get_channel_mapping(df_metadata=df_metadata)
    valid_field = get_valid_field(df_metadata=df_metadata)
    return channel_dict, function_dict_trigger, function_dict_action, valid_field

In [4]:
channel_dict, function_dict_trigger, function_dict_action, valid_field = get_metadata(path=metadata)

In [5]:
ckpt = "imamnurby/rob2rand_merged_w_prefix_c_fc_interactive"
model_interactive = EncoderDecoderModel.from_pretrained(ckpt)
tokenizer_interactive = RobertaTokenizer.from_pretrained(ckpt)

In [6]:
def append_prefix(desc, prefix):
    return prefix + desc

def append_suffix(desc, suffix):
    return desc + suffix

def generate_preds_tc(input_desc, num_beams_interactive):
    desc = input_desc.lower()
    desc = append_prefix(desc=desc, 
                         prefix= "GENERATE TRIGGER CHANNEL <pf> ")
    
    input_ids = tokenizer_interactive.encode(desc, return_tensors='pt')
    
    preds = model_interactive.generate(input_ids,
                          max_length=200,
                          num_beams=num_beams_interactive,
                          num_return_sequences=num_beams_interactive,
                          early_stopping=True)
    
    decoded_preds=[]
    descriptions=[]
    count_arr = []
    count = 0 
    
    for item in preds:
        temp_pred = (tokenizer_interactive.decode(item, skip_special_tokens=True))
        if temp_pred in channel_dict.keys():
            count = count + 1
            count_arr.append(count)
            decoded_preds.append(temp_pred)
            temp_desc = channel_dict.get(temp_pred, "null")
            descriptions.append(temp_desc)
    
    df = {'No.':count_arr,
          'Trigger Channel': decoded_preds,
          'Description': descriptions}
    return pd.DataFrame(df)

def generate_preds_tf(input_desc, num_beams_interactive, selected_tc):
    desc = input_desc.lower()
    desc = append_prefix(desc=desc, 
                         prefix="GENERATE TRIGGER FUNCTION <pf> ")
    
    desc = append_suffix(desc=desc,
                        suffix=f" <out> {selected_tc}")
    
    input_ids = tokenizer_interactive.encode(desc, return_tensors='pt')
    
    preds = model_interactive.generate(input_ids,
                          max_length=200,
                          num_beams=num_beams_interactive,
                          num_return_sequences=num_beams_interactive,
                          early_stopping=True)
    
    decoded_preds=[]
    descriptions=[]
    count_arr = []
    count = 0 
    
    for item in preds:
        temp_pred = (tokenizer_interactive.decode(item, skip_special_tokens=True))
        if temp_pred in function_dict_trigger.keys():
            temp_desc = function_dict_trigger.get(temp_pred, "null")
            if selected_tc in temp_pred:
                count = count + 1
                count_arr.append(count)
                decoded_preds.append(temp_pred)
                descriptions.append(temp_desc)
    
    df = {'No.': count_arr,
          'Trigger Function': decoded_preds,
          'Description': descriptions}
    return pd.DataFrame(df)

def generate_preds_ac(input_desc, num_beams_interactive, selected_tc, selected_tf):
    desc = input_desc.lower()
    desc = append_prefix(desc=desc, 
                         prefix= "GENERATE ACTION CHANNEL <pf> ")
    
    desc = append_suffix(desc=desc,
                        suffix=f" <out> {selected_tc} {selected_tf}")
    
    input_ids = tokenizer_interactive.encode(desc, return_tensors='pt')
    
    preds = model_interactive.generate(input_ids,
                          max_length=200,
                          num_beams=num_beams_interactive,
                          num_return_sequences=num_beams_interactive,
                          early_stopping=True)
    
    decoded_preds=[]
    descriptions=[]
    count_arr = []
    count = 0 
    
    for item in preds:
        temp_pred = (tokenizer_interactive.decode(item, skip_special_tokens=True))
        if temp_pred in channel_dict.keys():
            count = count + 1
            count_arr.append(count)
            decoded_preds.append(temp_pred)
            temp_desc = channel_dict.get(temp_pred, "null")
            descriptions.append(temp_desc)
    
    df = {'No.':count_arr,
          'Action Channel': decoded_preds,
          'Description': descriptions}
    return pd.DataFrame(df)

def generate_preds_af(input_desc, num_beams_interactive, selected_tc, selected_tf, selected_ac):
    desc = input_desc.lower()
    desc = append_prefix(desc=desc, 
                         prefix="GENERATE TRIGGER FUNCTION <pf> ")
    
    desc = append_suffix(desc=desc,
                        suffix=f" <out> {selected_tc} {selected_tf} {selected_ac}")
    
    input_ids = tokenizer_interactive.encode(desc, return_tensors='pt')
    
    preds = model_interactive.generate(input_ids,
                          max_length=200,
                          num_beams=num_beams_interactive,
                          num_return_sequences=num_beams_interactive,
                          early_stopping=True)
    
    decoded_preds=[]
    descriptions=[]
    count_arr = []
    count = 0 
    
    for item in preds:
        temp_pred = (tokenizer_interactive.decode(item, skip_special_tokens=True))
        if temp_pred in function_dict_action.keys():
            temp_desc = function_dict_action.get(temp_pred, "null")
            
            if selected_ac in temp_pred:
                count = count + 1
                count_arr.append(count)
                decoded_preds.append(temp_pred)
                descriptions.append(temp_desc)
    
    df = {'No.':count_arr,
          'Action Function': decoded_preds,
         'Description': descriptions}
    df = pd.DataFrame(df)
    df.index.names = ['Ranking']
    return df

In [7]:
# desc = "GENERATE TRIGGER CHANNEL <pf> Upload a new photo when motion_detected"
# generate_preds_tc(input_desc=desc, num_beams_interactive=50)

In [8]:
# generate_preds_tf(input_desc=desc, num_beams_interactive=50, selected_tc="SpotCam_HD")

In [9]:
# generate_preds_ac(input_desc=desc, 
#                   num_beams_interactive=50,
#                   selected_tc="SpotCam_HD",
#                   selected_tf="SpotCam_HD.Motion_event_detected")

In [10]:
# generate_preds_af(input_desc=desc, 
#                   num_beams_interactive=70,
#                   selected_tc="SpotCam_HD",
#                   selected_tf="SpotCam_HD.Motion_event_detected",
#                   selected_ac="Flickr")

In [11]:
ckpt = "imamnurby/rob2rand_merged_w_prefix_c_fc_field"
model_oneshot = EncoderDecoderModel.from_pretrained(ckpt)
tokenizer_oneshot = RobertaTokenizer.from_pretrained(ckpt)

def generate_preds_tap(input_desc, gen_mode, num_beams_oneshot):
    desc = input_desc.lower()    
    if gen_mode=="Channel":
        prefix="GENERATE CHANNEL ONLY WITHOUT FUNCTION <pf> "
        
    elif gen_mode=="Channel+Function":
        prefix="GENERATE CHANNEL AND FUNCTION FOR BOTH TRIGGER AND ACTION <pf> "
        
    elif gen_mode=="Channel+Function+Field":
        prefix="GENERATE ON THE FIELD-LEVEL GRANULARITY <pf> "
    
    desc = append_prefix(desc=desc, prefix=prefix)
    
    input_ids = tokenizer_oneshot.encode(desc, return_tensors='pt')
    
    # activate beam search and early_stopping
    preds = model_oneshot.generate(input_ids,
                          max_length=200,
                          num_beams=num_beams_oneshot,
                          num_return_sequences=num_beams_oneshot,
                          early_stopping=True)
    
    decoded_preds = []
    for item in preds:
        decoded_preds.append(tokenizer_oneshot.decode(item, skip_special_tokens=True))
    
    trigger = []
    trigger_desc = []
    action = []
    action_desc = []
    if gen_mode=="Channel":
        for item in decoded_preds:
            channels = item.split("<sep>")
            channels = [ch.strip() for ch in channels]
            if len(channels)==2:
                if channels[0] in channel_dict.keys() and channels[1] in channel_dict.keys():
                    temp_tc_desc = channel_dict.get(channels[0])
                    trigger_desc.append(temp_tc_desc)
                    trigger.append(channels[0])
                    
                    temp_ac_desc = channel_dict.get(channels[1])
                    action_desc.append(temp_ac_desc)
                    action.append(channels[1])
                    
    elif gen_mode=="Channel+Function":
        for item in decoded_preds:
            functions = item.split("<sep>")
            functions = [fc.strip() for fc in functions]
            if len(functions)==4:
                if functions[1] in function_dict_trigger.keys() and functions[3] in function_dict_action.keys():
                    temp_tf_desc = function_dict_trigger.get(functions[1])
                    trigger_desc.append(temp_tf_desc)
                    trigger.append(functions[1])

                    temp_af_desc = function_dict_action.get(functions[3])
                    action_desc.append(temp_af_desc)
                    action.append(functions[3])
        
    elif gen_mode=="Channel+Function+Field":
        for item in decoded_preds:
            invalid_field = False
            splitted_items = item.split("<sep>")
            processed = []
            if len(splitted_items)==6:
                for idx, subitem in enumerate(splitted_items):
                    if idx!=2 or idx!=4:
                        subitem = subitem.strip()
                    processed.append(subitem)
                assert(len(processed)==6)
                temp_tf = processed[1]
                temp_af = processed[4]
                
                temp_tf_field = processed[2].split(" ### ")
                temp_tf_field = [x.strip().lower() for x in temp_tf_field if x.strip() != '']
                temp_tf_field = [x.translate(str.maketrans(' ', '_', string.punctuation)) for x in temp_tf_field]
                
                for field in temp_tf_field:
                    if field not in valid_field:
                        invalid_field = True
                        break
                
                if invalid_field:
                    continue
                
                temp_tf_field = "(" + ", ".join(temp_tf_field) + ")"
                
                temp_af_field = processed[-1].split(" ### ")
                temp_af_field = [x.strip().lower() for x in temp_af_field if x.strip() != '']
                temp_af_field = [x.translate(str.maketrans(' ', '_', string.punctuation)) for x in temp_af_field]
                
                for field in temp_af_field:
                    if field not in valid_field:
                        invalid_field = True
                        break
                
                if invalid_field:
                    continue
                
                temp_af_field = "(" + ", ".join(temp_af_field) + ")"
                
                if temp_tf in function_dict_trigger.keys() and temp_af in function_dict_action.keys():
                    temp_tf_desc = function_dict_trigger.get(temp_tf)
                    temp_af_desc = function_dict_action.get(temp_af)
                    
                    temp_tf = temp_tf + temp_tf_field
                    temp_af = temp_af + temp_af_field

                    trigger.append(temp_tf)
                    trigger_desc.append(temp_tf_desc)
                
                    action.append(temp_af)
                    action_desc.append(temp_af_desc)
                    
    # df_trigger = {"Trigger": trigger,
    #               "Trigger Description": trigger_desc}
    # df_action = {"Action": action,
    #             "Action Description": action_desc}
    # return pd.DataFrame(df_trigger), pd.DataFrame(df_action)
    df = {"Trigger": trigger,
        "Trigger Description": trigger_desc,
        "Action": action,
        "Action Description": action_desc}
    df_len = len(df)
    return df

In [12]:
# desc = "If motion is detected, then log to spreadsheet"
# generate_preds_tap(input_desc=desc, gen_mode="Channel+Function+Field", num_beams_oneshot=50)

In [13]:
def return_same(input_desc):
    return input_desc

def update_dropdown(df_result):
    list_result = []
    for ind in df_result["No."]:
        list_result.append(str(ind))
    return gr.Dropdown.update(choices=list_result)

def set_selected_channel(df_result, index_chosen,chosen):
    index_chosen = int(index_chosen)
    chosen.append(
        {
            "trigger_channel": df_result.iloc[index_chosen-1]["Trigger Channel"]
        }
    )
    return chosen

In [14]:
desc = "Log to my spreadsheet if motion is detected in the living room"
answer2 = generate_preds_tc(input_desc=desc, num_beams_interactive=20)
answer3 = set_selected_channel(answer2, 5, [])
answer4 = update_dropdown(answer2)
print(answer2)

    No. Trigger Channel                                        Description
0     1     WeMo_Motion  Belkin’s WeMo system is home automation made e...
1     2     SmartThings  SmartThings unlocks a new world of possibiliti...
2     3       Manything  Manything is a smart app that turns your iPhon...
3     4        Nest_Cam  Nest takes the unloved products of your home a...
4     5          ecobee  ecobee is the smarter wi-fi thermostat with re...
5     6   Withings_Home  Withings Home, Wi-Fi security camera with air ...
6     7            Ring  Ring is a dual powered (wired or battery-opera...
7     8    XY_Findables  XY Findables allows you to control your world ...
8     9      Oco_Camera  Oco Smart Camera - weather and vandal proof sm...
9    10            MESH  MESH makes the world around you come to life. ...
10   11   LG_Smartphone  LG smartphones are designed to keep all aspect...
11   12            Arlo  Arlo HD security cameras have everything you n...
12   13           Camio  

In [15]:
demo = gr.Blocks()
with demo:
    gr.Markdown("<h1><center>RecipeGen: Automated Trigger Action Programs (TAPs) Generation Tool</center></h1>")
    # gr.Markdown("This demo allows you to generate TAPs using functionality description described in English. You can learn the working detail of our tool from our paper")
    gr.Markdown("<h3>What is TAP?</h3>")
    gr.Markdown("""
        TAPs or Trigger Action Programs are event-driven rules used to automate smart devices and/or internet services. TAPs are written in the form of "IF a **{trigger}** is
        satisfied then execute an **{action}**, where the **{trigger}** and the **{action}** correspond to API calls. TAPs have been used in various use cases, ranging from home monitoring 
        system to business workflow automation.
        """)
    gr.Markdown("<h3>What is RecipeGen?</h3>")
    gr.Markdown("""
        RecipeGen is a deep learning-based tool that can assist end-users to generate TAPs using natural language description. End-users can describe the functionality of the intended TAP, then RecipeGen
        will generate the TAP candidates based on the given description.
    """)
    with gr.Tabs():
        with gr.TabItem("Interactive Mode"):
            gr.Markdown("<h3><center>Instructions for Interactive Mode</center></h3>")
            gr.Markdown("""1. There are 4 generation steps, i.e., generating trigger channel, trigger function, action channel, and action function.
                2. **[STEP 1]** Describe the functionality in the `Functionality Description` text box. Click `Generate Trigger Channel` button. The channel candidates and its descriptions will show up in the `Trigger Channel Results` table.
                3. **[STEP 2]** Select one trigger channel from the `Trigger Channel Results` table and enter it to the `Selected Trigger Channel` text box. Click `Generate Trigger Function` button. The function candidates and its descriptions will show up in the `Trigger Function Results` table.
                4. **[STEP 3]** Select one trigger function from the `Trigger Function Results` table and enter it to the `Selected Trigger Function` text box. Click `Generate Action Channel` button. The channel candidates and its descriptions will show up in the `Action Channel Results` table.
                5. **[STEP 4]** Select one action channel from the `Action Channel Results` table and enter it to the `Selected Action Channel` text box. Click `Generate Action Function` button. The function candidates and its descriptions will show up in the `Action Function Results` table.
                 """)
            gr.Markdown(""" NOTE: You can control how many sequences are returned by tuning the `Beam Width` slider.
            """)
            chosen = []
            with gr.Box():
                with gr.Column():
                    try_function = gr.Dropdown(type ="value",choices = ["Log to my spreadsheet if motion is detected in the living room","Notify me when someone open the front door", "Turn on my Philips lamp every sunset","Update my picture in Twitter when I change my profile picture in Facebook","Send and append to my note  when I create a new bookmark"], label = "Here are some sample functionality descriptions that you can try")
                    try_function_button = gr.Button("Try this")
                    
                    
            with gr.Box():
                with gr.Column():
                    gr.Markdown("<h4><center>Step 1: Generate Trigger Channel</center></h4>")
                    input_desc_textbox = gr.Textbox(label="Functionality Description", placeholder="Describe the functionality here")
                    num_beams_interactive = gr.Slider(minimum=2, maximum=100, value=20, step=1, label="Beam Width")
                    # num_returned_seqs_interactive = gr.Slider(minimum=2, maximum=100, value=50, step=1, label="#Returned Sequences")
                    generate_tc_button = gr.Button("Generate Trigger Channel")
                    gr.Markdown("<br>")
                    gr.Markdown("<h4><center>Trigger Channel Results</center></h4>")
                    results_tc = gr.Dataframe(headers=["No.","Trigger Channel", "Description"])
            
            with gr.Box():
                with gr.Column():
                    gr.Markdown("<h4><center>Step 2: Generate Trigger Function</center></h4>")
                    selected_tc_dropdown = gr.Dropdown(label="Select the Trigger Channel from above",type="value", choices=[0])
                    
                    generate_tf_button = gr.Button("Generate Trigger Function")
                    gr.Markdown("<br>")
                    gr.Markdown("<h4><center>Trigger Function Results</center></h4>")
                    results_tf = gr.Dataframe(headers=["No.","Trigger Function", "Description"])
                
            with gr.Box():
                with gr.Column():
                    gr.Markdown("<h4><center>Step 3: Generate Action Channel</center></h4>")
                    selected_tf_textbox = gr.Textbox(label="Selected Trigger Function", placeholder="Select trigger function from the results above")
                    generate_ac_button = gr.Button("Generate Action Channel")
                    gr.Markdown("<br>")
                    gr.Markdown("<h4><center>Action Channel Results</center></h4>")
                    results_ac = gr.Dataframe(headers=["No.","Action Channel", "Description"])
            
            with gr.Box():
                with gr.Column():
                    gr.Markdown("<h4><center>Step 4: Generate Action Function</center></h4>")
                    selected_ac_textbox = gr.Textbox(label="Selected Action Channel", placeholder="Select action channel from the results above")
                    generate_af_button = gr.Button("Generate Action Function")
                    gr.Markdown("<br>")
                    gr.Markdown("<h4><center>Action Function Results</center></h4>")
                    results_af = gr.Dataframe(headers=["No.","Action Function", "Description"])
        
        try_function_button.click(return_same, inputs=[try_function], outputs=[input_desc_textbox])
        generate_tc_button.click(generate_preds_tc, inputs=[input_desc_textbox, num_beams_interactive], outputs=[results_tc])
        
        results_tc.change(fn=update_dropdown, inputs=[results_tc], outputs=[selected_tc_dropdown])
        selected_tc_dropdown.change(fn=set_selected_channel, inputs=[results_tc,selected_tc_dropdown,chosen], outputs=[chosen])
        generate_tf_button.click(generate_preds_tf, inputs=[input_desc_textbox, num_beams_interactive, selected_tc_textbox], outputs=[results_tf])
        
        generate_ac_button.click(generate_preds_ac, inputs=[input_desc_textbox, num_beams_interactive, selected_tc_textbox, selected_tf_textbox], outputs=[results_ac])
        generate_af_button.click(generate_preds_af, inputs=[input_desc_textbox, num_beams_interactive, selected_tc_textbox, selected_tf_textbox, selected_ac_textbox], outputs=[results_af])
        
        with gr.TabItem("One-shot Mode"):
            gr.Markdown("<h3><center>Instructions for One-shot Mode</center></h3>")
            gr.Markdown("""
                1. Select the generation granularity (i.e., Channel, Function, or Field).
                2. Describe the functionality in the `Functionality Description` text box.
                3. Click `Generate TAP` button. The trigger candidates will show up in the `TAP Results` table. The table consists of 4 columns: Trigger, Trigger Description, Action, and Action Description.
                """)
            gr.Markdown("NOTE: You can control how many sequences are returned by tuning the `Beam Width` slider.")
            gr.Markdown("You can try some description samples below:")
           
            with gr.Box():
                with gr.Column():
                    try_fn = gr.Dropdown(type ="value",choices = ["Log to my spreadsheet if motion is detected in the living room","Notify me when someone open the front door", "Turn on my Philips lamp every sunset","Update my picture in Twitter when I change my profile picture in Facebook","Send and append to my note  when I create a new bookmark"], label = "Here are some sample functionality descriptions that you can try")
                    try_fn_button = gr.Button("Try this")
                    
            with gr.Box():
                with gr.Column():
                    gen_mode = gr.Radio(label="Granularity", choices=["Channel", "Channel+Function", "Channel+Function+Field"])
                    input_desc_textbox_oneshot = gr.Textbox(label="Functionality Description", placeholder="Describe the functionality here")
                    num_beams_oneshot = gr.Slider(minimum=2, maximum=100, value=20, step=1, label="Beam Width")
                    generate_tap_button = gr.Button("Generate TAPs")
                    gr.Markdown("<br>")
                    #count_result = 0
                    gr.Markdown("<h4><center>TAP Results</center></h4>")
                    #gr.Markdown("<h4><center>TAP Results" + str(count_result) +"</center></h4>")
                    results_trigger = gr.Dataframe(headers=["Trigger", "Trigger Description", "Action", "Action Description"], overflow_row_behaviour="paginate", max_cols=10, max_rows=10)
                    # gr.Markdown("<br>")
                    # gr.Markdown("<h4><center>Action Results</center></h4>")
                    # results_action = gr.Dataframe(headers=["Action", "Action Description"])
                    
        try_fn_button.click(return_same, inputs=[try_fn], outputs=[input_desc_textbox_oneshot])            
        generate_tap_button.click(generate_preds_tap, inputs=[input_desc_textbox_oneshot, gen_mode, num_beams_oneshot], outputs=[results_trigger])
        
demo.launch(server_port=8333, server_name="0.0.0.0")

AttributeError: 'list' object has no attribute '_id'