Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Groundwork for fusion implementation and assorted bug fixes #18

Merged
merged 5 commits into from
Nov 20, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
8 changes: 4 additions & 4 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -148,12 +148,12 @@ If you are on MacOS, you may need to install XCode developer tools using the com

## Usage

See the code in `tests/test_holoclean.py` for a documented usage of HoloClean.
See the code in `examples/holoclean_repair_example.py` for a documented usage of HoloClean.

In order to run the test script, run the following:
In order to run the example script, run the following:
```bash
$ cd tests
$ ./start_test.sh
$ cd examples
$ ./start_example.sh
```

The script sets up the python path environment for running holoclean.
89 changes: 59 additions & 30 deletions dataset/dataset.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
import logging
richardwu marked this conversation as resolved.
Show resolved Hide resolved
import time
from enum import Enum
import os
import time
import pandas as pd

from .dbengine import DBengine
Expand Down Expand Up @@ -55,7 +57,7 @@ def __init__(self, name, env):
timeout=env['timeout'])
# members to convert (tuple_id, attribute) to cell_id
self.attr_to_idx = {}
self.attr_number = 0
self.attr_count = 0
# dataset statistics
self.stats_ready = False
# Total tuples
Expand All @@ -65,44 +67,66 @@ def __init__(self, name, env):
# Domain stats for attribute pairs
self.pair_attr_stats = {}

# Fixed to load data from a CSV file at the moment.
def load_data(self, name, f_path, f_name, na_values=None):
# TODO(richardwu): load more than just CSV files
def load_data(self, name, fpath, na_values=None, entity_col=None, src_col=None):
"""
load_data takes a CSV file of the initial data, adds tuple IDs (_tid_)
to each row to uniquely identify an 'entity', and generates unique
index numbers for each attribute/column.

Creates a table with the user supplied 'name' parameter (e.g. 'hospital').
"""

:param name: (str) name to initialize dataset with.
:param fpath: (str) filepath to CSV file.
:param na_values: (str) value that identifies a NULL value
:param entity_col: (str) column containing the unique
identifier/ID of an entity. For fusion tasks, rows with
the same ID will be fused together in the output.
If None, assumes every row is a unique entity.
:param src_col: (str) if not None, for fusion tasks
specifies the column containing the source for each "mention" of an
entity.
"""
tic = time.clock()
try:
# Load raw CSV file/data into the Postgres 'init_X' table.
# Do not include TID and source column as trainable attributes
exclude_attr_cols = ['_tid_']
if src_col is not None:
exclude_attr_cols.append(src_col)

# Load raw CSV file/data into a Postgres table 'name' (param).
self.raw_data = Table(name, Source.FILE, na_values=na_values,
exclude_attr_cols=exclude_attr_cols, fpath=fpath)

self.raw_data = Table(name, Source.FILE, f_path, f_name, na_values)
# Add _tid_ column to dataset
df = self.raw_data.df
df.insert(0,'_tid_', range(0,len(df)))
df.fillna('_nan_',inplace=True)
# Add _tid_ column to dataset that uniquely identifies an entity.
# If entity_col is not supplied, use auto-incrementing values.
# Otherwise we use the entity values directly as _tid_'s.
if entity_col is None:
# auto-increment
df.insert(0, '_tid_', range(0,len(df)))
else:
# use entity IDs as _tid_'s directly
df.rename({entity_col: '_tid_'}, axis='columns', inplace=True)

# Use '_nan_' to represent NULL values
df.fillna('_nan_', inplace=True)

# Call to store to database

self.raw_data.store_to_db(self.engine.engine)
status = 'DONE Loading '+f_name
status = 'DONE Loading {fname}'.format(fname=os.path.basename(fpath))

# Generate indexes on attribute columns for faster queries

for attr in self.raw_data.get_attributes():
# Generate index on attribute
self.raw_data.create_db_index(self.engine,[attr])

# Create attr_to_idx dictionary (assign unique index for each attribute)
# and attr_number (total # of attributes)

tmp_attr_list = self.raw_data.get_attributes()
tmp_attr_list.remove('_tid_')
for idx, attr in enumerate(tmp_attr_list):
# Map attribute to index
self.attr_to_idx[attr] = idx
self.attr_number = len(self.raw_data.get_attributes())
# and attr_count (total # of attributes)

self.attr_to_idx = {attr: idx for idx, attr in enumerate(self.raw_data.get_attributes())}
self.attr_count = len(self.attr_to_idx)
except Exception:
logging.error('loading data for table %s', name)
raise
Expand All @@ -123,14 +147,14 @@ def generate_aux_table(self, aux_table, df, store=False, index_attrs=False):
2. sets an index on the aux_table's internal Pandas DataFrame (index_attrs=[<columns>]), AND/OR
3. creates Postgres indexes for aux_table (store=True and index_attrs=[<columns>])

:param aux_table: (str) name of auxiliary table (see AuxTables)
:param aux_table: (AuxTable) auxiliary table to generate
:param df: (DataFrame) dataframe to memoize/store for this auxiliary table
:param store: (bool) if true, creates/replaces Postgres table for this auxiliary table
:param index_attrs: (list[str]) list of attributes to create indexes on. If store is true,
also creates indexes on Postgres table.
"""
try:
self.aux_table[aux_table] = Table(aux_table.name, Source.DF, df)
self.aux_table[aux_table] = Table(aux_table.name, Source.DF, df=df)
if store:
self.aux_table[aux_table].store_to_db(self.engine.engine)
if index_attrs:
Expand All @@ -142,8 +166,12 @@ def generate_aux_table(self, aux_table, df, store=False, index_attrs=False):
raise

def generate_aux_table_sql(self, aux_table, query, index_attrs=False):
"""
:param aux_table: (AuxTable) auxiliary table to generate
:param query: (str) SQL query whose result is used for generating the auxiliary table.
"""
try:
self.aux_table[aux_table] = Table(aux_table.name, Source.SQL, query, self.engine)
self.aux_table[aux_table] = Table(aux_table.name, Source.SQL, table_query=query, db_engine=self.engine)
if index_attrs:
self.aux_table[aux_table].create_df_index(index_attrs)
self.aux_table[aux_table].create_db_index(self.engine, index_attrs)
Expand All @@ -153,28 +181,29 @@ def generate_aux_table_sql(self, aux_table, query, index_attrs=False):

def get_raw_data(self):
"""
Is this guaranteed sorted by TID?
get_raw_data returns a pandas.DataFrame containing the raw data as it was initially loaded.
"""
if self.raw_data:
return self.raw_data.df
else:
raise Exception('ERROR No dataset loaded')

def get_attributes(self):
if self.raw_data:
attrs = self.raw_data.get_attributes()
attrs.remove('_tid_')
return attrs
else:
"""
get_attributes return the trainable/learnable attributes (i.e. exclude meta
columns like _tid_).
"""
if self.raw_data is None:
raise Exception('ERROR No dataset loaded')
return self.raw_data.get_attributes()

def get_cell_id(self, tuple_id, attr_name):
"""
get_cell_id returns cell ID: a unique ID for every cell.

Cell ID: _tid_ * (# of attributes) + attr_idx
"""
vid = tuple_id*self.attr_number + self.attr_to_idx[attr_name]
vid = tuple_id*self.attr_count + self.attr_to_idx[attr_name]

return vid

Expand Down Expand Up @@ -262,7 +291,7 @@ def get_repaired_dataset(self):
init_records[tid][attr] = repaired_vals[tid][attr]
repaired_df = pd.DataFrame.from_records(init_records)
name = self.raw_data.name+'_repaired'
self.repaired_data = Table(name, Source.DF, repaired_df)
self.repaired_data = Table(name, Source.DF, df=repaired_df)
self.repaired_data.store_to_db(self.engine.engine)
status = "DONE generating repaired dataset"
toc = time.clock()
Expand Down
13 changes: 12 additions & 1 deletion dataset/dbengine.py
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,18 @@ def create_db_table_from_query(self, name, query):
return True

def create_db_index(self, name, table, attr_list):
stmt = index_template.substitute(idx_title=name, table=table, attr=','.join(attr_list))
"""
create_db_index creates a (multi-column) index on the columns/attributes
specified in :param attr_list: with the given :param name: on
:param table:.

:param name: (str) name of index
:param table: (str) name of table
:param attr: (list[str]) list of attributes/columns to create index on
"""
# We need to quote each attribute since Postgres auto-downcases unquoted column references
quoted_attrs = map(lambda attr: '"{}"'.format(attr), attr_list)
stmt = index_template.substitute(idx_title=name, table=table, attr=','.join(quoted_attrs))
tic = time.clock()
conn = self.engine.connect()
result = conn.execute(stmt)
Expand Down
101 changes: 55 additions & 46 deletions dataset/table.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,67 +12,76 @@ class Table:
"""
A wrapper class for Dataset Tables.
"""
def __init__(self, name, src, *args):
def __init__(self, name, src, na_values=None, exclude_attr_cols=['_tid_'],
fpath=None, df=None, db_conn=None, table_query=None, db_engine=None):
"""
:param name: (str) name to assign to dataset.
:param na_values: (str or list[str]) values to interpret as NULL.
:param exclude_attrs: (list[str]) list of columns to NOT treat as
attributes during training/learning.
:param src: (Source) type of source to load from. Note additional
parameters MUST be provided for each specific source:
Source.FILE: :param fpath:, read from CSV file
Source.DF: :param df:, read from pandas DataFrame
Source.DB: :param db_conn:, read from database table with :param name:
Source.SQL: :param table_query: and :param db_engine:, use result
from :param table_query:

:param fpath: (str)
:param df: (pandas.DataFrame)
:param db_conn: (SQLAlchemy connectable, str)
:param table_query: (str)
:param db_engine: (DBEngine)
"""
self.name = name
self.index_count = 0
# Copy the list to memoize
self.exclude_attr_cols = list(exclude_attr_cols)
self.df = pd.DataFrame()

if src == Source.FILE:
if len(args) < 2:
raise Exception("ERROR while loading table. File path and file name expected.Please provide <file_path> and <file_name>.")
else:
file_path = args[0]
file_name = args[1]
if len(args) == 3:
na_values = args[2]
else:
na_values = None
# TODO(richardwu): use COPY FROM instead of loading this into memory
self.df = pd.read_csv(os.path.join(file_path,file_name), dtype=str, na_values=na_values)
# Normalize to lower strings and strip whitespaces.
# TODO: No support for numerical values. To be added.
for attr in self.df.columns.values:
if attr != '_tid_' and self.df[attr].dtype == str:
self.df[attr] = self.df[attr].apply(lambda x: x.lower())
self.df[attr] = self.df[attr].apply(lambda x: x.strip())
self.df.columns = map(str.lower, self.df.columns)
if fpath is None:
raise Exception("ERROR while loading table. File path for CSV file name expected. Please provide <fpath> param.")
# TODO(richardwu): use COPY FROM instead of loading this into memory
richardwu marked this conversation as resolved.
Show resolved Hide resolved
# TODO(richardwu): No support for numerical values. To be added.
self.df = pd.read_csv(fpath, dtype=str, na_values=na_values)
# Normalize to lower strings and strip whitespaces.
for attr in self.df.columns.values:
if attr not in exclude_attr_cols:
self.df[attr] = self.df[attr].str.strip().str.lower()
richardwu marked this conversation as resolved.
Show resolved Hide resolved
elif src == Source.DF:
if len(args) != 1:
raise Exception("ERROR while loading table. Dataframe expected. Please provide <dataframe>.")
else:
self.df = args[0]
if df is None:
raise Exception("ERROR while loading table. Dataframe expected. Please provide <df> param.")
self.df = df
elif src == Source.DB:
if len(args) != 1:
if db_conn is None:
raise Exception("ERROR while loading table. DB connection expected. Please provide <db_conn>")
else:
db_conn = args[0]
self.df = pd.read_sql_table(name, db_conn)
self.df = pd.read_sql_table(name, db_conn)
elif src == Source.SQL:
if len(args) != 2:
raise Exception("ERROR while loading table. SQL Query and DB engine expected. Please provide <query> and <db_engine>.")
else:
tab_query = args[0]
dbengine = args[1]
dbengine.create_db_table_from_query(self.name, tab_query)
self.df = pd.read_sql_table(name, dbengine.conn)
if table_query is None or db_engine is None:
raise Exception("ERROR while loading table. SQL Query and DB connection expected. Please provide <table_query> and <db_engine>.")
db_engine.create_db_table_from_query(self.name, table_query)
self.df = pd.read_sql_table(name, db_engine.conn)

def store_to_db(self, con, if_exists='replace', index=False, index_label=None):
def store_to_db(self, db_conn, if_exists='replace', index=False, index_label=None):
# TODO: This version supports single session, single worker.
self.df.to_sql(self.name, con, if_exists=if_exists, index=index, index_label=index_label)
self.df.to_sql(self.name, db_conn, if_exists=if_exists, index=index, index_label=index_label)

def get_attributes(self):
if not self.df.empty:
return list(self.df.columns.values)
else:
raise Exception("Empty Dataframe associated with table "+self.name+". Cannot return attributes.")
"""
get_attributes returns the columns that are trainable/learnable attributes
(i.e. exclude meta-columns like _tid_).
"""
if self.df.empty:
raise Exception("Empty Dataframe associated with table {name}. Cannot return attributes.".format(
name=self.name))
return list(col for col in self.df.columns if col not in self.exclude_attr_cols)

def create_df_index(self, attr_list):
self.df.set_index(attr_list, inplace=True)

def create_db_index(self, dbengine, attr_list):
index_name = self.name+'_'+str(self.index_count)
try:
dbengine.create_db_index(index_name, self.name, attr_list)
self.index_count += 1
except:
raise Exception("ERROR while creating index for table %s on attributes %s"%(self.name, str(attr_list)))
index_name = '{name}_{idx}'.format(name=self.name, idx=self.index_count)
dbengine.create_db_index(index_name, self.name, attr_list)
self.index_count += 1
return
23 changes: 14 additions & 9 deletions dcparser/constraint.py
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,8 @@ class Predicate:

def __init__(self, predicate_string, tuple_names, schema):
"""
Constructing predicate object
Constructing predicate object by setting self.cnf_form to e.g. t1."Attribute" = t2."Attribute".

:param predicate_string: string shows the predicate
:param tuple_names: name of tuples in denial constraint
:param schema: list of attributes
Expand All @@ -91,7 +92,10 @@ def __init__(self, predicate_string, tuple_names, schema):
if isinstance(component, str):
self.cnf_form += component
else:
self.cnf_form += component[0] + "." + component[1]
# Need to wrap column names in quotations for Postgres
self.cnf_form += '{alias}."{attr}"'.format(
alias=component[0],
attr=component[1])
if i < len(self.components) - 1:
self.cnf_form += self.operation
logging.info("DONE parsing predicate: %s", predicate_string)
Expand Down Expand Up @@ -156,13 +160,14 @@ def parse_components(self, predicate_string):
predicate_string[i + 1] == ')') and \
predicate_string[i] != "'":

if str.lower(str_so_far) in self.schema:
current_component.append(str_so_far)
str_so_far = ""
components.append(current_component)
current_component = []
else:
raise Exception('Attribute name ' + str_so_far + ' not in schema: {}'.format(",".join(self.schema)))
# Attribute specified in DC not found in schema
if str_so_far not in self.schema:
raise Exception('Attribute name {} not in schema: {}'.format(str_so_far, ",".join(self.schema)))

current_component.append(str_so_far)
str_so_far = ""
components.append(current_component)
current_component = []
elif str_so_far == ',' or str_so_far == '.':
str_so_far = ''
return components