# Manditory Requirements

In [None]:
import pandas as pd 
import requests
from pymongo import MongoClient

def send_http_request(url: str, body=None, method="POST"):
    try:
        response = requests.request(method, url, json=body)
        # Check if the request was successful (status code 200)        
        if response.status_code == 200:
            content_type = response.headers.get('Content-Type')
            if content_type and 'application/json' in content_type:
                try:
                    return response.json()
                except requests.exceptions.JSONDecodeError as e1:
                    return response.text
            else:
                return response.text

        else:
            print(f"Request exited with status code {response.status_code}: {response.reason}")
    except requests.RequestException as e:
        return e
    return None

Constant mapping between exercise names and their root derivation trees:

In [None]:
name_to_model_ids = {
    'Courses': ['JDKw8yJZF5fiP3jv3', 'PSqwzYAfW9dFAa9im'],
    'ProductionLine_v2_v3': ['aTwuoJgesSd8hXXEP', 'bNCCf9FMRZoxqobfX'],
    'Train': ['QxGnrFQnXPGh2Lh8C'],
    'SocialNetwork': ['dkZH6HJNQNLLDX6Aj'],
    'TrashFOL': ['sDLK7uBCbgZon3znd'],
    'ClassroomFOL': ['YH3ANm7Y5Qe5dSYem'],
    'TrashRL': ['PQAJE67kz8w5NWJuM'],
    'ClassroomRL': ['zRAn69AocpkmxXZnW'],
    'Graphs': ['gAeD3MTGCCv8YNTaK'],
    'LTS': ['zoEADeCW2b2suJB2k'],
    'ProductionLine_v1': ['jyS8Bmceejj9pLbTW'],
    'CV': ['JC8Tij8o8GZb99gEJ'],
    'TrashLTL': ['9jPK8KBWzjFmBx4Hb']
}


# Initialize Alloy4fun System

## Populate Alloy4fun Database With Public Data

In [None]:
from populate_database import populate_database
populate_database()

## Setup Hint System

# Initialize Hint Systems 

In [None]:
#HIGENA GRAPH ENDPOINTS (optional, disabled)
"""
for (name,ids) in name_to_model_ids.items():
    print(send_http_request(url="http://localhost:8080/hint/higena-setup",  body=ids))
"""
# SPECASSISTANT GRAPH ENDPOINTS
send_http_request(url="http://localhost:8080/hint/debug-drop-db", method="GET")

for name, ids in name_to_model_ids.items():
    print(send_http_request(url="http://localhost:8080/hint/specassistant-setup?prefix="+name, body=ids, method="GET"))

# NOTE: Execution times are stored by the api application within its database

# Choose a policy

In this notebook we choose the same policty for every graph for convinience, but this does not have to be the case.

In [None]:
policy="TED"

send_http_request(url="http://localhost:8080/hint/compute-all-policies-for-rule?rule="+policy, method="POST")

# Database Study

In [3]:
# Database basic requirements
from a4f_mongo_pipelines import *

mongo_uri = "mongodb://localhost:27017/"
database_name = "meteor"

## Data Gathering

### Get GraphId Maps

In [None]:
client = MongoClient(mongo_uri)
db = client[database_name]

graph_collection = db["Graph"]

name_to_graph_ids = {} 

for doc in graph_collection.aggregate(get_graph_id_dict_pipeline()):
    name_to_graph_ids[doc["_id"]] = doc["graph_ids"]

client.close()

name_to_graph_ids

### Get Graph Stats Data Frames

In [None]:
client = MongoClient(mongo_uri)
db = client[database_name]

node_collection = db["Node"]

data = list(node_collection.aggregate(get_graph_node_statistics()))

graph_stats_df = pd.DataFrame(data)

client.close()

graph_stats_df

### Get Popular Node Data Frames

WARNING: Requires GraphId Maps

In [None]:
client = MongoClient(mongo_uri)
db = client[database_name]

node_collection = db["Node"]

name_to_pop_dfs = {}

for (name,graph_ids) in name_to_graph_ids.items():
    data = list(node_collection.aggregate(get_popular_nodes_pipeline(graph_ids)))[0:30] # Limits output to first 30 entries
    df_ = pd.DataFrame(data)
    name_to_pop_dfs[name] = df_

client.close()

name_to_pop_dfs

### Get Min Solutions Data Frames

WARNING: Requires GraphId Maps

In [None]:
client = MongoClient(mongo_uri)
db = client[database_name]

node_collection = db["Node"]

name_to_min_sol_dfs = {}

for (name,graph_ids) in name_to_graph_ids.items():
    data = list(node_collection.aggregate(get_min_solutions_pipeline(graph_ids)))
    df_ = pd.DataFrame(data)
    name_to_min_sol_dfs[name] = df_

client.close()

name_to_min_sol_dfs

## Data Frame Persistance

### Write As Multiple Csvs

General Statistics

In [None]:
graph_stats_df.to_csv(path_or_buf="graph_stats.csv",sep=';',float_format='%g',mode='w', index=False)

Popular Formulas

In [None]:
for (name, df_) in name_to_pop_dfs.items():
    df_.to_csv(path_or_buf=name+".popularity.csv",sep=';',float_format='%g',mode='w', index=False)

Solution Formulas

In [None]:
for (name, df_) in name_to_min_sol_dfs.items():
    df_.to_csv(path_or_buf=name+".solution.csv",sep=';',float_format='%g',mode='w', index=False)

### Write as Sheets of a Single XLSX File

WARNING: Requires Every DataFrame

In [None]:
import xlsxwriter

with pd.ExcelWriter('db_study.xlsx', engine='xlsxwriter') as writer:
    workbook = writer.book
    text_wrap = workbook.add_format({'text_wrap': True, 'valign': 'top'})
    bold = workbook.add_format({'bold': True})
    for name in sorted(list(name_to_model_ids.keys())):
        sheet = workbook.add_worksheet(name=name)

        sheet.set_column(0,0,15)
        sheet.set_column(1,1,100,text_wrap)
        sheet.set_column(2,2,15)
        sheet.set_column(3,3,100,text_wrap)
        sheet.set_column(4,4,27)
        
        row = 0
        sheet.merge_range(row,0,row,len(name_to_pop_dfs[name]),"The 30 most frequent formulas",bold)
        row+=1
        name_to_pop_dfs[name].to_excel(excel_writer=writer,sheet_name=name,startrow=row, index=False)
        row+= name_to_pop_dfs[name].shape[0] + 2
        sheet.merge_range(row,0,row,len(name_to_min_sol_dfs[name]),"The valid formulas ordered by their frequency",bold)
        row+=1
        name_to_min_sol_dfs[name].to_excel(excel_writer=writer,sheet_name=name,startrow=row, index=False)
        row+= name_to_min_sol_dfs[name].shape[0] + 2
    
    graph_stats_df.to_excel(excel_writer=writer,sheet_name="General Statistics", index=False)
    workbook.get_worksheet_by_name('General Statistics').set_column(0,0,30)
    workbook.get_worksheet_by_name('General Statistics').set_column(1,4,20)

# Policy Study

Read the targeted formulas

In [None]:
exercise = "dkZH6HJNQNLLDX6Aj" # Social Network
data = pd.read_csv('formulas.csv', delimiter=';')

input_data = None
input_data = pd.read_csv('formulas.csv', delimiter=';').sort_values(by='Predicate')
input_data = input_data.reset_index()

body = dict()

for index, row in input_data.iterrows():
    try:
        body[row['Predicate']].append(row['Formula'])
    except KeyError:
        body[row['Predicate']] = [row['Formula']]

input_data

Collect the result of testing each formula for each policy

In [None]:
output=requests.request("POST", "http://localhost:8080/study/test-all-policies-on-formulas?model_id="+exercise, json=body).json()
df = pd.DataFrame(output)
df

Group duplicate hints, each policy hit is aggregated in a matrix.

In [None]:
condensed_output = dict()
for obj in output:
    try:
        condensed_output[(obj['predicate'],obj['formula'],obj.get('nextFormula', None))][obj['policy']] = True
    except KeyError:
        copy = dict(obj)
        policy = obj['policy']
        copy.pop('policy',None)
        copy[policy] = True
        condensed_output[(obj['predicate'],obj['formula'],obj['nextFormula'])] = copy
    
df = pd.DataFrame(condensed_output.values())
df

Write the data to a file

In [None]:
df.sort_values(by='predicate', ascending=True)
df.to_csv("hints.csv", index=False, sep=";")


# Statistical evaluations

## TEST

### Hint system

In [None]:
send_http_request(url="http://localhost:8080/study/test-spec-assist", body=name_to_graph_ids, method="POST")

### Tar
If you wish to test tar uncomment and run the following code. This process however can take days to complete as TAR is slow to process a significant amount of the entries.

In [None]:
# WARNING: THIS WILL TAKE A HOURS OR PERHAPS DAYS TO COMPLETE

# send_http_request(url="http://localhost:8080/study/test-TAR", method="POST")

TAR is not data-driven so it's accuracy does not depend on the training dataset. As a result, in our evaluations we tested TAR for every model and then limited the result view for the test sets we used. These results can be found in the file TAR_test_data.json. If you wish to use theese results you can run the following block to import them, however be advided that the specified execution times are tied to the machine specified in the paper.

In [None]:
import pymongo
import json

client = pymongo.MongoClient()
collection = client["meteor"]["Test"]

with open("TAR_test_data.json", mode="r") as file:
    collection.insert_many([json.loads(line.strip())  for line in file.readlines()])

client.close()

## Gather and Aggregate Test Data from Database

In [None]:
from extract_test_data import *
client = pymongo.MongoClient()
collection = client["meteor"]["Test"]

data = list(collection.aggregate(extract_test_data_pipeline()))

df = pd.DataFrame(data)

client.close()

df