In [399]:
import draco as drc
import pandas as pd
from vega_datasets import data as vega_data
import altair as alt
from IPython.display import display, Markdown
import json
import numpy as np
from draco.renderer import AltairRenderer
# alt.renderers.enable("png")
import pdb
from draco import Draco
dr_check=Draco()


# Handles serialization of common numpy datatypes
class NpEncoder(json.JSONEncoder):
    def default(self, obj):
        if isinstance(obj, np.integer):
            return int(obj)
        elif isinstance(obj, np.floating):
            return float(obj)
        elif isinstance(obj, np.ndarray):
            return obj.tolist()
        else:
            return super(NpEncoder, self).default(obj)


def md(markdown: str):
    display(Markdown(markdown))


def pprint(obj):
    md(f"```json\n{json.dumps(obj, indent=2, cls=NpEncoder)}\n```")

def localpprint(obj):
        print(json.dumps(obj, indent=2, cls=NpEncoder))

def recommend_charts(
    spec: list[str], draco: drc.Draco, df: pd.DataFrame, num: int = 1, labeler=lambda i: f"CHART {i+1}"
) -> dict[str, tuple[list[str], dict]]:
    # Dictionary to store the generated recommendations, keyed by chart name

    renderer = AltairRenderer()
    chart_specs = {}
    chart_vega_specs = {}
    for i, model in enumerate(draco.complete_spec(spec, num)):
        # print(i)
        chart_name = labeler(i)
        spec = drc.answer_set_to_dict(model.answer_set)
        chart_specs[chart_name] = drc.dict_to_facts(spec), spec
        # print(chart_name)
        # print(f"COST: {model.cost}")
        chart = renderer.render(spec=spec, data=df)
        chart_vega_specs[chart_name] = chart.to_json()
        # # Adjust column-faceted chart size
        if (
            isinstance(chart, alt.FacetChart)
            and chart.facet.column is not alt.Undefined
        ):
            chart = chart.configure_view(continuousWidth=130, continuousHeight=130)
        # print(chart.to_json())
        # display(chart)
        # print(model.answer_set)

    # return chart_specs
    return chart_vega_specs



In [449]:
def rec_from_generated_spec(
    marks: list[str],
    fields: list[str],
    encoding_channels: list[str],
    draco: drc.Draco,
    input_spec_base: list[str],
    data: pd.DataFrame,
    num: int = 3, config=None
) -> dict[str, dict]:
    if config is None:
        num_encodings = len(fields)
        #shuffle the fields array
        np.random.shuffle(fields)
        # ############ GOLD STANDARD ################
        # force_attributes= ['entity(encoding,m0,e0).',
        #     'attribute((encoding,field),e0,wildlife_species).',
        #                 'entity(encoding,m0,e1).',
        #                 'attribute((encoding,field),e1,flight_date).',
        #                 'entity(encoding,m0,e2).',
        #                 'attribute((encoding,field),e2,wildlife_size).'
        #                    ]
        #
        # input_specs = [
        #     (
        #         ("random_stuff_for_cfg_some"),
        #         input_spec_base +
        #
        #         force_attributes +
        #
        #         [
        #              # f"attribute((mark,type),m{id},{mark}).",
        #              # f"attribute((encoding,channel),e{id},{enc_ch}).",
        #          ":- {entity(mark,_,_)} != 1.",
        #          ":- {attribute((encoding,field),_,_)} < 1."]
        #     )]
        ############## GOLD STANDARD ################


        force_attributes = []
        for index, item in enumerate(fields):
            text1 = f'entity(encoding,m0,e{index}).'
            force_attributes.append(text1)
            text2 = f'attribute((encoding,field),e{index},{item}).'
            force_attributes.append(text2)

        input_specs = [
            (
                ("random_stuff_for_cfg_some"),
                input_spec_base +

                force_attributes +

                [
                 ":- {entity(mark,_,_)} != 1.",
                 ":- {attribute((encoding,field),_,_)} <" + str(num_encodings) + "."]
            )]
    else:
        input_specs=validate_chart(config,input_spec_base)

    recs = {}
    for cfg, spec in input_specs:
        labeler = lambda i: f"CHART {i + 1}, {cfg[0]}"
        recs = recs | recommend_charts(spec=spec, draco=draco, df=data, num=num, labeler=labeler)

    return recs






In [450]:
def validate_chart(config, input_spec_base):
    if not config:  # If config is empty, return an empty list
        return []

    con = config[0]  # Use the first configuration in the list
    mark = con['mark']
    encoding = con['encoding']
    i=0
    input_spec = []
    # Extract fields, aggregates, and channels from the encoding dictionary
    for channel, attr_info in encoding.items():
        field = attr_info.get('field')
        aggregate = attr_info.get('aggregate')

        # Ensure channel is not None

        if  i==0:
            # Generate the base input specification
            input_spec = [
                (mark, field, channel) if field is not None else (mark, channel),
                input_spec_base + [
                    f"attribute((mark,type),m{i},{mark}).",
                    f"entity(encoding,m{i},e{i}).",
                    f"attribute((encoding,channel),e{i},{channel}).",
                ]
            ]

            # Append additional attribute for field if it's not None
            if field is not None:
                input_spec[1].append(f"attribute((encoding,field),e{i},{field}).")

            # Append additional attribute for aggregate if it's not None
            if aggregate is not None:
                input_spec[1].append(f"attribute((encoding,aggregate),e{i},{aggregate}).")
            i=i+1

        elif  i>0:

            input_spec[1].append(f"entity(encoding,m{i},e{i}).")
            input_spec[1].append(f"attribute((encoding,channel),e{i},{channel}).")
            if field is not None:
                input_spec[1].append(f"attribute((encoding,field),e{i},{field}).")
            if aggregate is not None:
                input_spec[1].append(f"attribute((encoding,aggregate),e{i},{aggregate}).")
            i=i+1


    # Append filtering rules
    input_spec[1].extend([
        # exclude multi-layer designs
        ":- {entity(mark,_,_)} != 1."
    ])

    return [input_spec]






In [451]:
def start_draco(fields,datasetname='birdstrikes',config=None):
    # Loading data to be explored
    d = drc.Draco()
    if datasetname == 'movies':
        df: pd.DataFrame = vega_data.movies()
        # df = df.drop(columns = 'Worldwide_Gross')
    elif datasetname=='seattle':
        df: pd.DataFrame = vega_data.seattle_weather()
    elif datasetname=='performance':
        df = pd.read_csv('distribution_map.csv')
    else:
        df: pd.DataFrame = vega_data.birdstrikes()
        df = df.sample(n=500, random_state=1)
    # print(df.head(10))
    df.columns = [col.replace('__', '_').lower() for col in df.columns]
    df.columns = [col.replace('$', 'a') for col in df.columns]
    data_schema = drc.schema_from_dataframe(df)
    # pprint(data_schema)
    data_schema_facts = drc.dict_to_facts(data_schema)
    # print(df.columns)
    # pprint(data_schema_facts)

    input_spec_base = data_schema_facts + [
        "entity(view,root,v0).",
        "entity(mark,v0,m0).",
    ]
    # initial_recommendations = recommend_charts(spec=input_spec_base, draco=d, df=df)

    recommendations = rec_from_generated_spec(
    marks=['bar', 'point', 'circle', 'line', 'tick'],
    fields=fields,
    # encoding_channels=["x", "y", "color"],
    # encoding_channels=["color", "shape", "size"],
    encoding_channels=["x", "y", "color", "shape", "size"],
    draco=d,
    input_spec_base=input_spec_base,
    data=df,
    config=config
    )
    return recommendations






In [452]:
def get_draco_recommendations(attributes,datasetname='birdstrikes',config=None):
    ret = [f.replace('__', '_').lower() for f in attributes]
    field_names_renamed = [f.replace('$', 'a') for f in ret]
    recommendations=start_draco(fields=field_names_renamed,datasetname=datasetname,config=config)
    return recommendations

# Joining the data `schema` dict with the view specification dict
if __name__ == '__main__':
    fields_birdstrikes = ['wildlife_species', 'wildlife_size', 'flight_date']
    # fields_seattle=["weather", "temp_min", "date"]
    # fields_movies = ["major_genre", "us_gross", "source"]
    # fields_performance = ['Fields', 'Probability']
    # recommendations=start_draco(fields=fields_movies, datasetname='movies')
    recommendations=start_draco(fields=fields_birdstrikes, datasetname='birdstrikes')
    #recommendations=start_draco(fields=fields_performance, datasetname='performance')

    # recommendations=start_draco(fields=fields_seattle, datasetname='seattle')
    # print(len(recommendations))
    # Loop through the dictionary and print recommendations
    for chart_key, _ in recommendations.items():
        # (_,chart)=(recommendations[chart_key])
        chart = recommendations[chart_key]
        print(f"Recommendation for {chart_key}:")
        # print(f"**Draco Specification of {chart_key}**")
        # # # localpprint(chart)
        # print(chart)
        # print("\n")

Recommendation for CHART 1, r:
Recommendation for CHART 2, r:
Recommendation for CHART 3, r:


In [453]:
#charts string to json for all keys
for key in recommendations.keys():
    chart = json.loads(recommendations[key])
    print(chart.get('encoding',{}))
    print(chart.get('mark',{}))
    print("\n")


{'color': {'field': 'wildlife_size', 'type': 'ordinal'}, 'size': {'aggregate': 'count', 'scale': {'type': 'linear', 'zero': True}}, 'x': {'field': 'wildlife_species', 'type': 'ordinal'}, 'y': {'field': 'flight_date', 'type': 'ordinal'}}
{'type': 'point'}


{'color': {'field': 'wildlife_species', 'type': 'ordinal'}, 'size': {'aggregate': 'count', 'scale': {'type': 'linear', 'zero': True}}, 'x': {'field': 'wildlife_size', 'type': 'ordinal'}, 'y': {'field': 'flight_date', 'type': 'ordinal'}}
{'type': 'point'}


{'color': {'field': 'wildlife_size', 'type': 'ordinal'}, 'size': {'aggregate': 'count', 'scale': {'type': 'linear', 'zero': True}}, 'x': {'field': 'wildlife_species', 'type': 'ordinal'}, 'y': {'field': 'flight_date', 'type': 'ordinal'}}
{'type': 'point'}




In [120]:
attribute((field,name),7,wildlife_size)


'attribute((encoding,field),e0,wildlife_species).', 'attribute((encoding,field),e1,cost_repair).', 'attribute((encoding,field),e2,wildlife_size)

wildlife_Speciess=8
cost_repair=11

"encoding": {
    "x": {
      "aggregate": "count",
      "scale": {
        "type": "linear",
        "zero": true
      }
    }
  },
  "mark": {
    "type": "bar"
  }
}

SyntaxError: unterminated string literal (detected at line 4) (1584224773.py, line 4)

In [280]:
from draco import dict_to_facts, answer_set_to_dict, run_clingo
from pprint import pprint

In [281]:
facts = dict_to_facts(
    {
        "mark": "bar",
        "encoding": [
            {"channel": "x"},
            {"channel": "y"},
        ],
    }
)
facts

['attribute(mark,root,bar).',
 'entity(encoding,root,0).',
 'attribute((encoding,channel),0,x).',
 'entity(encoding,root,1).',
 'attribute((encoding,channel),1,y).']

NameError: name 'specs' is not defined