Skip to content

Commit

Permalink
Added DDL and data storage operations.
Browse files Browse the repository at this point in the history
  • Loading branch information
xmnlab committed Apr 25, 2018
1 parent 90a41f1 commit 02737c9
Show file tree
Hide file tree
Showing 2 changed files with 137 additions and 124 deletions.
145 changes: 77 additions & 68 deletions ibis/mapd/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -208,29 +208,6 @@ def drop(self):
def truncate(self):
self._client.truncate_table(self._qualified_name)

def insert(self, obj=None, validate=True):
"""
Insert into Impala table. Wraps ImpalaClient.insert
Parameters
----------
obj : TableExpr or pandas DataFrame
validate : boolean, default True
If True, do more rigorous validation that schema of table being
inserted is compatible with the existing table
Examples
--------
>>> t.insert(table_expr) # doctest: +SKIP
"""
if not isinstance(obj, pd.DataFrame):
raise NotImplementedError('{} input not implemented'.format(obj))

statement = ddl.InsertPandas(self._unqualified_name, obj)

raise Exception(statement)
return self._execute(statement)

def load_data(self, df):
"""
Wraps the LOAD DATA DDL statement. Loads data into an MapD table from
Expand Down Expand Up @@ -364,7 +341,7 @@ def __init__(
)

def __del__(self):
self.con.close()
self.close()

def log(self, msg):
log(msg)
Expand Down Expand Up @@ -528,8 +505,37 @@ def drop_user(self, name):
statement = ddl.DropUser(name)
self._execute(statement)

def create_view(self, name, expr, database=None):
"""
Create an MapD view from a table expression
Parameters
----------
name : string
expr : ibis TableExpr
database : string, default None
"""
ast = self._build_ast(expr, MapDDialect.make_context())
select = ast.queries[0]
statement = ddl.CreateView(name, select, database=database)
self._execute(statement)

def drop_view(self, name, database=None):
"""
Drop an MapD view
Parameters
----------
name : string
database : string, default None
"""
statement = ddl.DropView(name, database=database)
self._execute(statement)

def create_table(
self, table_name, obj=None, schema=None, database=None
self, table_name, obj=None, schema=None, database=None,
fragment_size=None, max_rows=None, page_size=None, partitions=None,
shard_count=None
):
"""
Create a new table in MapD using an Ibis table expression.
Expand All @@ -543,6 +549,24 @@ def create_table(
Mutually exclusive with expr, creates an empty table with a
particular schema
database : string, default None (optional)
fragment_size : int, Default None
Number of rows per fragment that is a unit of the table for query
processing. Default = 32 million rows, which is not expected to be
changed.
max_rows : int, Default None
Set the maximum number of rows allowed in a table to create a capped
collection. When this limit is reached, the oldest fragment is
removed. Default = 2^62.
page_size : int, Default None
Number of I/O page bytes. Default = 1MB, which does not need to be
changed.
partitions : string, Default None
Partition strategy option:
SHARDED: Partition table using sharding.
REPLICATED: Partition table using replication.
shard_count : int , Default None
Number of shards to create, typically equal to the number of GPUs
across which the data table is distributed.
Examples
--------
Expand All @@ -553,8 +577,9 @@ def create_table(

if obj is not None:
if isinstance(obj, pd.DataFrame):
from ibis.mapd.pandas_interop import write_temp_dataframe
writer, to_insert = write_temp_dataframe(self._client, obj)
raise NotImplementedError(
'Pandas Data Frame input not implemented.'
)
else:
to_insert = obj
ast = self._build_ast(to_insert, MapDDialect.make_context())
Expand All @@ -567,7 +592,12 @@ def create_table(
elif schema is not None:
statement = ddl.CreateTableWithSchema(
table_name, schema,
database=database
database=database,
fragment_size=fragment_size,
max_rows=max_rows,
page_size=page_size,
partitions=partitions,
shard_count=shard_count
)
else:
raise com.IbisError('Must pass expr or schema')
Expand Down Expand Up @@ -603,6 +633,18 @@ def drop_table(self, table_name, database=None, force=False):
self._execute(statement, False)
self.set_database(_database)

def truncate_table(self, table_name, database=None):
"""
Delete all rows from, but do not drop, an existing table
Parameters
----------
table_name : string
database : string, default None (optional)
"""
statement = ddl.TruncateTable(table_name, database=database)
self._execute(statement, False)

def drop_view(self, name, database=None, force=False):
"""
Drop an MapD view
Expand All @@ -619,18 +661,6 @@ def drop_view(self, name, database=None, force=False):
)
return self._execute(statement, False)

def truncate_table(self, table_name, database=None):
"""
Delete all rows from, but do not drop, an existing table
Parameters
----------
table_name : string
database : string, default None (optional)
"""
statement = ddl.TruncateTable(table_name, database=database)
self._execute(statement, False)

def drop_table_or_view(self, name, database=None, force=False):
"""
Attempt to drop a relation that may be a view or table
Expand Down Expand Up @@ -673,50 +703,29 @@ def database(self, name=None):
)
return self.database_class(name, new_client)

def insert(
self, table_name, obj=None, database=None, validate=True
):
"""
Insert into existing table.
See MapDTable.insert for other parameters.
Parameters
----------
table_name : string
database : string, default None
Examples
--------
>>> table = 'my_table'
>>> con.insert(table, table_expr) # doctest: +SKIP
"""
table = self.table(table_name, database=database)
return table.insert(
obj=obj, validate=validate
)

def load_data(self, table_name, df, database=None):
def load_data(self, table_name, obj, database=None):
"""
Wraps the LOAD DATA DDL statement. Loads data into an MapD table by
physically moving data files.
Parameters
----------
table_name : string
df: pandas.DataFrame
obj: pandas.DataFrame or pyarrow.Table
database : string, default None (optional)
"""
table = self.table(table_name, database=database)
return table.load_data(df)
_database = self.db_name
self.set_database(database)
self.con.load_table_arrow(table_name, obj)
self.set_database(_database)

@property
def current_database(self):
return self.db_name

def set_database(self, name):
if self.db_name != name and name is not None:
self.con.close()
self.con = pymapd.connect(
uri=self.uri, user=self.user, password=self.password,
host=self.host, port=self.port, dbname=name,
Expand Down
Loading

0 comments on commit 02737c9

Please sign in to comment.