Skip to content

Commit

Permalink
Groundwork for fusion implementation and assorted bug fixes.
Browse files Browse the repository at this point in the history
Some notable changes:
- Support for loading fusion and/or repair raw datasets.
- Imported flights dataset for fusion testing.
- Restructured test cases to use unittests.
- Removed unnecessary try excepts that caused exceptions to be silenced.
- Fixed a few bugs as a result of silenced exceptions.
- Keep attributes/columns in their original case (instead of lowercasing
everything): modify Postgres queries to quote references to columns.
  • Loading branch information
richardwu committed Nov 15, 2018
1 parent 94572ae commit 6746943
Show file tree
Hide file tree
Showing 20 changed files with 76,696 additions and 5,450 deletions.
161 changes: 92 additions & 69 deletions dataset/dataset.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
import time
import logging
from enum import Enum
import os
import time
import pandas as pd

from .dbengine import DBengine
from .table import Table, Source

Expand Down Expand Up @@ -53,7 +56,7 @@ def __init__(self, name, env):
verbose=env['verbose'], timeout=env['timeout'])
# members to convert (tuple_id, attribute) to cell_id
self.attr_to_idx = {}
self.attr_number = 0
self.attr_count = 0
# dataset statistics
self.stats_ready = False
# Total tuples
Expand All @@ -63,47 +66,66 @@ def __init__(self, name, env):
# Domain stats for attribute pairs
self.pair_attr_stats = {}

# Fixed to load data from a CSV file at the moment.
def load_data(self, name, f_path, f_name, na_values=None):
# TODO(richardwu): load more than just CSV files
def load_data(self, name, fpath, na_values=None,
entity_col=None, src_col=None):
"""
load_data takes a CSV file of the initial data, adds tuple IDs (_tid_)
to each row to uniquely identify an 'entity', and generates unique
index numbers for each attribute/column.
Creates a table with the user supplied 'name' parameter (e.g. 'hospital').
"""
:param name: (str) name to initialize dataset with.
:param fpath: (str) filepath to CSV file.
:param na_values: (str) value that identifies a NULL value
:param entity_col: (str) column containing the unique
identifier/ID of an entity. For fusion tasks, rows with
the same ID will be fused together in the output.
If None, assumes every row is a unique entity.
:param src_col: (str) if not None, for fusion tasks
specifies the column containing the source for each "mention" of an
entity.
"""
tic = time.clock()
try:
# Load raw CSV file/data into the Postgres 'init_X' table.

self.raw_data = Table(name, Source.FILE, f_path, f_name, na_values)
# Add _tid_ column to dataset
df = self.raw_data.df
df.insert(0,'_tid_', range(0,len(df)))
df.fillna('_nan_',inplace=True)
# Call to store to database
self.raw_data.store_to_db(self.engine.engine)
status = 'DONE Loading '+f_name

# Generate indexes on attribute columns for faster queries

for attr in self.raw_data.get_attributes():
# Generate index on attribute
self.raw_data.create_db_index(self.engine,[attr])

# Create attr_to_idx dictionary (assign unique index for each attribute)
# and attr_number (total # of attributes)

tmp_attr_list = self.raw_data.get_attributes()
tmp_attr_list.remove('_tid_')
for idx, attr in enumerate(tmp_attr_list):
# Map attribute to index
self.attr_to_idx[attr] = idx
self.attr_number = len(self.raw_data.get_attributes())

except Exception as e:
status = ' '.join(['For table:', name, str(e)])
# Do not include TID and source column as trainable attributes
exclude_attr_cols = ['_tid_']
if src_col is not None:
exclude_attr_cols.append(src_col)

# Load raw CSV file/data into a Postgres table 'name' (param).
self.raw_data = Table(name, Source.FILE, na_values=na_values,
exclude_attr_cols=exclude_attr_cols, fpath=fpath)

df = self.raw_data.df
# Add _tid_ column to dataset that uniquely identifies an entity.
# If entity_col is not supplied, use auto-incrementing values.
# Otherwise we use the entity values directly as _tid_'s.
if entity_col is None:
# auto-increment
df.insert(0, '_tid_', range(0,len(df)))
else:
# use entity IDs as _tid_'s directly
df.rename({entity_col: '_tid_'}, axis='columns', inplace=True)

# Use '_nan_' to represent NULL values
df.fillna('_nan_', inplace=True)

# Call to store to database

self.raw_data.store_to_db(self.engine.engine)
status = 'DONE Loading {fname}'.format(fname=os.path.basename(fpath))

# Generate indexes on attribute columns for faster queries
for attr in self.raw_data.get_attributes():
# Generate index on attribute
self.raw_data.create_db_index(self.engine,[attr])

# Create attr_to_idx dictionary (assign unique index for each attribute)
# and attr_count (total # of attributes)

self.attr_to_idx = {attr: idx for idx, attr in enumerate(self.raw_data.get_attributes())}
self.attr_count = len(self.attr_to_idx)
toc = time.clock()
load_time = toc - tic
return status, load_time
Expand All @@ -121,56 +143,63 @@ def generate_aux_table(self, aux_table, df, store=False, index_attrs=False):
2. sets an index on the aux_table's internal Pandas DataFrame (index_attrs=[<columns>]), AND/OR
3. creates Postgres indexes for aux_table (store=True and index_attrs=[<columns>])
:param aux_table: (str) name of auxiliary table (see AuxTables)
:param aux_table: (AuxTable) auxiliary table to generate
:param df: (DataFrame) dataframe to memoize/store for this auxiliary table
:param store: (bool) if true, creates/replaces Postgres table for this auxiliary table
:param index_attrs: (list[str]) list of attributes to create indexes on. If store is true,
also creates indexes on Postgres table.
"""
try:
self.aux_table[aux_table] = Table(aux_table.name, Source.DF, df)
self.aux_table[aux_table] = Table(aux_table.name, Source.DF, df=df)
if store:
self.aux_table[aux_table].store_to_db(self.engine.engine)
if index_attrs:
self.aux_table[aux_table].create_df_index(index_attrs)
if store and index_attrs:
self.aux_table[aux_table].create_db_index(self.engine, index_attrs)
except Exception as e:
raise Exception(' '.join(['For table:',aux_table.name,str(e)]))
except Exception:
logging.error('generating auxiliary table "{table}" failed'.format(table=aux_table.name))
raise

def generate_aux_table_sql(self, aux_table, query, index_attrs=False):
"""
:param aux_table: (AuxTable) auxiliary table to generate
:param query: (str) SQL query whose result is used for generating the auxiliary table.
"""
try:
self.aux_table[aux_table] = Table(aux_table.name, Source.SQL, query, self.engine)
self.aux_table[aux_table] = Table(aux_table.name, Source.SQL, table_query=query, db_engine=self.engine)
if index_attrs:
self.aux_table[aux_table].create_df_index(index_attrs)
self.aux_table[aux_table].create_db_index(self.engine, index_attrs)
except Exception as e:
raise Exception(' '.join(['For table:',aux_table.name,str(e)]))
except Exception:
logging.error('generating auxiliary table via SQL "{table}" failed'.format(table=aux_table.name))
raise

def get_raw_data(self):
"""
Is this guaranteed sorted by TID?
get_raw_data returns a pandas.DataFrame containing the raw data as it was initially loaded.
"""
if self.raw_data:
return self.raw_data.df
else:
raise Exception('ERROR No dataset loaded')

def get_attributes(self):
if self.raw_data:
attrs = self.raw_data.get_attributes()
attrs.remove('_tid_')
return attrs
else:
"""
get_attributes return the trainable/learnable attributes (i.e. exclude meta
columns like _tid_).
"""
if self.raw_data is None:
raise Exception('ERROR No dataset loaded')
return self.raw_data.get_attributes()

def get_cell_id(self, tuple_id, attr_name):
"""
get_cell_id returns cell ID: a unique ID for every cell.
Cell ID: _tid_ * (# of attributes) + attr_idx
"""
vid = tuple_id*self.attr_number + self.attr_to_idx[attr_name]
vid = tuple_id*self.attr_count + self.attr_to_idx[attr_name]

return vid

Expand Down Expand Up @@ -241,32 +270,26 @@ def get_inferred_values(self):
"_vid_, init_value, string_to_array(regexp_replace(domain, \'[{\"\"}]\', \'\', \'gi\'), \'|||\') as domain " \
"FROM %s) as t1, %s as t2 " \
"WHERE t1._vid_ = t2._vid_"%(AuxTables.cell_domain.name, AuxTables.inf_values_idx.name)
try:
self.generate_aux_table_sql(AuxTables.inf_values_dom, query, index_attrs=['_tid_'])
self.aux_table[AuxTables.inf_values_dom].create_db_index(self.engine, ['attribute'])
status = "DONE colleting the inferred values."
except Exception as e:
status = "ERROR when colleting the inferred values: %s"%str(e)
self.generate_aux_table_sql(AuxTables.inf_values_dom, query, index_attrs=['_tid_'])
self.aux_table[AuxTables.inf_values_dom].create_db_index(self.engine, ['attribute'])
status = "DONE colleting the inferred values."
toc = time.clock()
total_time = toc - tic
return status, total_time

def get_repaired_dataset(self):
tic = time.clock()
try:
init_records = self.raw_data.df.sort_values(['_tid_']).to_records(index=False)
t = self.aux_table[AuxTables.inf_values_dom]
repaired_vals = dictify(t.df.reset_index())
for tid in repaired_vals:
for attr in repaired_vals[tid]:
init_records[tid][attr] = repaired_vals[tid][attr]
repaired_df = pd.DataFrame.from_records(init_records)
name = self.raw_data.name+'_repaired'
self.repaired_data = Table(name, Source.DF, repaired_df)
self.repaired_data.store_to_db(self.engine.engine)
status = "DONE generating repaired dataset"
except Exception as e:
status = "ERROR when generating repaired dataset: %s"
init_records = self.raw_data.df.sort_values(['_tid_']).to_records(index=False)
t = self.aux_table[AuxTables.inf_values_dom]
repaired_vals = dictify(t.df.reset_index())
for tid in repaired_vals:
for attr in repaired_vals[tid]:
init_records[tid][attr] = repaired_vals[tid][attr]
repaired_df = pd.DataFrame.from_records(init_records)
name = self.raw_data.name+'_repaired'
self.repaired_data = Table(name, Source.DF, df=repaired_df)
self.repaired_data.store_to_db(self.engine.engine)
status = "DONE generating repaired dataset"
toc = time.clock()
total_time = toc - tic
return status, total_time
Expand Down
13 changes: 12 additions & 1 deletion dataset/dbengine.py
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,18 @@ def create_db_table_from_query(self, name, query):
return True

def create_db_index(self, name, table, attr_list):
stmt = index_template.substitute(idx_title=name, table=table, attr=','.join(attr_list))
"""
create_db_index creates a (multi-column) index on the columns/attributes
specified in :param attr_list: with the given :param name: on
:param table:.
:param name: (str) name of index
:param table: (str) name of table
:param attr: (list[str]) list of attributes/columns to create index on
"""
# We need to quote each attribute since Postgres auto-downcases unquoted column references
quoted_attrs = map(lambda attr: '"{}"'.format(attr), attr_list)
stmt = index_template.substitute(idx_title=name, table=table, attr=','.join(quoted_attrs))
tic = time.clock()
conn = self.engine.connect()
result = conn.execute(stmt)
Expand Down
101 changes: 55 additions & 46 deletions dataset/table.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,67 +12,76 @@ class Table:
"""
A wrapper class for Dataset Tables.
"""
def __init__(self, name, src, *args):
def __init__(self, name, src, na_values=None, exclude_attr_cols=['_tid_'],
fpath=None, df=None, db_conn=None, table_query=None, db_engine=None):
"""
:param name: (str) name to assign to dataset.
:param na_values: (str or list[str]) values to interpret as NULL.
:param exclude_attrs: (list[str]) list of columns to NOT treat as
attributes during training/learning.
:param src: (Source) type of source to load from. Note additional
parameters MUST be provided for each specific source:
Source.FILE: :param fpath:, read from CSV file
Source.DF: :param df:, read from pandas DataFrame
Source.DB: :param db_conn:, read from database table with :param name:
Source.SQL: :param table_query: and :param db_engine:, use result
from :param table_query:
:param fpath: (str)
:param df: (pandas.DataFrame)
:param db_conn: (SQLAlchemy connectable, str)
:param table_query: (str)
:param db_engine: (DBEngine)
"""
self.name = name
self.index_count = 0
# Copy the list to memoize
self.exclude_attr_cols = list(exclude_attr_cols)
self.df = pd.DataFrame()

if src == Source.FILE:
if len(args) < 2:
raise Exception("ERROR while loading table. File path and file name expected.Please provide <file_path> and <file_name>.")
else:
file_path = args[0]
file_name = args[1]
if len(args) == 3:
na_values = args[2]
else:
na_values = None
# TODO(richardwu): use COPY FROM instead of loading this into memory
self.df = pd.read_csv(os.path.join(file_path,file_name), dtype=str, na_values=na_values)
# Normalize to lower strings and strip whitespaces.
# TODO: No support for numerical values. To be added.
for attr in self.df.columns.values:
if attr != '_tid_' and self.df[attr].dtype == str:
self.df[attr] = self.df[attr].apply(lambda x: x.lower())
self.df[attr] = self.df[attr].apply(lambda x: x.strip())
self.df.columns = map(str.lower, self.df.columns)
if fpath is None:
raise Exception("ERROR while loading table. File path for CSV file name expected. Please provide <df> param.")
# TODO(richardwu): use COPY FROM instead of loading this into memory
# TODO(richardwu): No support for numerical values. To be added.
self.df = pd.read_csv(fpath, dtype=str, na_values=na_values)
# Normalize to lower strings and strip whitespaces.
for attr in self.df.columns.values:
if attr not in exclude_attr_cols and self.df[attr].dtype == str:
self.df[attr] = self.df[attr].str.strip().str.lower()
elif src == Source.DF:
if len(args) != 1:
raise Exception("ERROR while loading table. Dataframe expected. Please provide <dataframe>.")
else:
self.df = args[0]
if df is None:
raise Exception("ERROR while loading table. Dataframe expected. Please provide <df> param.")
self.df = df
elif src == Source.DB:
if len(args) != 1:
if db_conn is None:
raise Exception("ERROR while loading table. DB connection expected. Please provide <db_conn>")
else:
db_conn = args[0]
self.df = pd.read_sql_table(name, db_conn)
self.df = pd.read_sql_table(name, db_conn)
elif src == Source.SQL:
if len(args) != 2:
raise Exception("ERROR while loading table. SQL Query and DB engine expected. Please provide <query> and <db_engine>.")
else:
tab_query = args[0]
dbengine = args[1]
dbengine.create_db_table_from_query(self.name, tab_query)
self.df = pd.read_sql_table(name, dbengine.conn)
if table_query is None or db_engine is None:
raise Exception("ERROR while loading table. SQL Query and DB connection expected. Please provide <table_query> and <db_engine>.")
db_engine.create_db_table_from_query(self.name, table_query)
self.df = pd.read_sql_table(name, db_engine.conn)

def store_to_db(self, con, if_exists='replace', index=False, index_label=None):
def store_to_db(self, db_conn, if_exists='replace', index=False, index_label=None):
# TODO: This version supports single session, single worker.
self.df.to_sql(self.name, con, if_exists=if_exists, index=index, index_label=index_label)
self.df.to_sql(self.name, db_conn, if_exists=if_exists, index=index, index_label=index_label)

def get_attributes(self):
if not self.df.empty:
return list(self.df.columns.values)
else:
raise Exception("Empty Dataframe associated with table "+self.name+". Cannot return attributes.")
"""
get_attributes returns the columns that are trainable/learnable attributes
(i.e. exclude meta-columns like _tid_).
"""
if self.df.empty:
raise Exception("Empty Dataframe associated with table {name}. Cannot return attributes.".format(
name=self.name))
return list(col for col in self.df.columns if col not in self.exclude_attr_cols)

def create_df_index(self, attr_list):
self.df.set_index(attr_list, inplace=True)

def create_db_index(self, dbengine, attr_list):
index_name = self.name+'_'+str(self.index_count)
try:
dbengine.create_db_index(index_name, self.name, attr_list)
self.index_count += 1
except:
raise Exception("ERROR while creating index for table %s on attributes %s"%(self.name, str(attr_list)))
index_name = '{name}_{idx}'.format(name=self.name, idx=self.index_count)
dbengine.create_db_index(index_name, self.name, attr_list)
self.index_count += 1
return
Loading

0 comments on commit 6746943

Please sign in to comment.