Skip to content

Commit

Permalink
[wip] refactoring
Browse files Browse the repository at this point in the history
  • Loading branch information
xmnlab committed May 1, 2018
1 parent 5559fd8 commit a44f4bf
Show file tree
Hide file tree
Showing 5 changed files with 47 additions and 32 deletions.
22 changes: 13 additions & 9 deletions ci/datamgr.py
Expand Up @@ -193,7 +193,7 @@ def sqlite(database, schema, tables, data_directory, **params):
@click.option('-P', '--port', default=9091, type=int)
@click.option('-u', '--user', default='mapd')
@click.option('-p', '--password', default='HyperInteractive')
@click.option('-D', '--database', default='mapd')
@click.option('-D', '--database', default='ibis_testing')
@click.option('-S', '--schema', type=click.File('rt'),
default=str(SCRIPT_DIR / 'schema' / 'mapd.sql'))
@click.option('-t', '--tables', multiple=True, default=TEST_TABLES)
Expand Down Expand Up @@ -290,6 +290,7 @@ def mapd(schema, tables, data_directory, **params):
)

# connection
print(params)
click.echo('Initializing MapD...')
if params['database'] != 'mapd':
conn = pymapd.connect(
Expand All @@ -302,7 +303,7 @@ def mapd(schema, tables, data_directory, **params):
try:
conn.execute('CREATE DATABASE {}'.format(params['database']))
except Exception as e:
click.echo(e)
click.echo('[MAPD|WW]{}'.format(e))
conn.close()

conn = pymapd.connect(
Expand All @@ -316,21 +317,24 @@ def mapd(schema, tables, data_directory, **params):
try:
conn.execute('DROP TABLE {}'.format(table))
except Exception as e:
click.echo('[WW] {}'.format(str(e)))
click.echo('[II] Dropping tables ... OK')
click.echo('[MAPD|WW] {}'.format(str(e)))
click.echo('[MAPD|II] Dropping tables ... OK')

# create tables
for stmt in schema.read().split(';'):
stmt = stmt.strip()
if len(stmt):
conn.execute(stmt)
click.echo('[II] Creating tables ... OK')
try:
conn.execute(stmt)
except Exception as e:
click.echo('[MAPD|WW] {}'.format(str(e)))
click.echo('[MAPD|II] Creating tables ... OK')

# import data
click.echo('[II] Loading data ...')
click.echo('[MAPD|II] Loading data ...')
for table in tables:
src = data_directory / '{}.csv'.format(table)
click.echo('[II] src: {}'.format(src))
click.echo('[MAPD|II] src: {}'.format(src))
df = pd.read_csv(src, delimiter=',', **table_import_args[table])

# prepare data frame data type
Expand All @@ -347,7 +351,7 @@ def mapd(schema, tables, data_directory, **params):
conn.load_table_columnar(table, df)
conn.close()

click.echo('[II] Done!')
click.echo('[MAPD|II] Done!')


@cli.command()
Expand Down
8 changes: 4 additions & 4 deletions ci/docker-compose.yml
Expand Up @@ -53,12 +53,11 @@ services:
image: mapd/mapd-ce-cpu
ports:
- 9090:9090
- 9091:909
- 9092:9092
- 9091:9091
environment:
- MAPD_HOST=mapd
- MAPD_PORT=9091
- MAPD_DATABASE=mapd
- MAPD_DATABASE=ibis_testing
- MAPD_USER=mapd

waiter:
Expand All @@ -69,6 +68,7 @@ services:
-wait tcp://impala:21050
-wait tcp://impala:50070
-wait tcp://clickhouse:9000
-wait tcp://mapd:9091
-wait-retry-interval 5s
-timeout 5m
Expand Down Expand Up @@ -98,7 +98,7 @@ services:
- IBIS_TEST_CLICKHOUSE_DATABASE=ibis_testing
- IBIS_TEST_MAPD_HOST=mapd
- IBIS_TEST_MAPD_PORT=9091
- IBIS_TEST_MAPD_DATABASE=mapd
- IBIS_TEST_MAPD_DATABASE=ibis_testing
- IBIS_TEST_MAPD_USER=mapd
- IBIS_TEST_MAPD_PASSWORD=HyperInteractive
- GOOGLE_BIGQUERY_PROJECT_ID=ibis-gbq
Expand Down
24 changes: 24 additions & 0 deletions ibis/mapd/compiler.py
Expand Up @@ -2,9 +2,11 @@
from . import operations as mapd_ops
from .identifiers import quote_identifier # noqa: F401
from .operations import _type_to_sql_string # noqa: F401
from ibis.expr.api import _add_methods, _unary_op, _binop_expr

import ibis.common as com
import ibis.util as util
import ibis.expr.types as ir
import ibis.expr.operations as ops
import ibis.sql.compiler as compiles

Expand Down Expand Up @@ -194,3 +196,25 @@ class MapDDialect(compiles.Dialect):
rewrites = MapDExprTranslator.rewrites

compiles(ops.Distance, mapd_ops.distance)

mapd_reg = mapd_ops._operation_registry

_add_methods(
ir.NumericValue, dict(
conv_4326_900913_x=_unary_op(
'conv_4326_900913_x', mapd_ops.Conv_4326_900913_X
),
conv_4326_900913_y=_unary_op(
'conv_4326_900913_y', mapd_ops.Conv_4326_900913_Y
),
truncate=_binop_expr(
'truncate', mapd_ops.NumericTruncate
)
)
)


@rewrites(ops.HLLCardinality)
def _approx_count_distinct(expr):
left, right = expr.op().args
return left.div(right).floor()
23 changes: 5 additions & 18 deletions ibis/mapd/operations.py
Expand Up @@ -480,8 +480,6 @@ class ApproxCountDistinct(ops.Reduction):
where = ops.Arg(rlz.boolean, default=None)

def output_type(self):
# Impala 2.0 and higher returns a DOUBLE
# return ir.DoubleScalar
return ops.partial(ir.IntegerScalar, dtype=ops.dt.int64)


Expand Down Expand Up @@ -580,17 +578,20 @@ class ByteLength(ops.StringLength):
ops.Tan: unary('tan')
}

# GEOMETRIC
_geometric_ops = {
Conv_4326_900913_X: unary('conv_4326_900913_x'),
Conv_4326_900913_Y: unary('conv_4326_900913_y')
}

# STRING
_string_ops = {
ops.StringLength: _length(),
ByteLength: _length('byte_length', 'LENGTH'),
ops.StringSQLILike: binary_infix_op('ilike'),
}

# DATE
_date_ops = {
ops.Date: unary('toDate'),
ops.DateTruncate: _timestamp_truncate,
Expand All @@ -613,12 +614,13 @@ class ByteLength(ops.StringLength):
ops.TimestampSub: _timestamp_op('TIMESTAMPADD', '-'),
}


# AGGREGATION/REDUCTION
_agg_ops = {
ApproxCountDistinct: approx_count_distinct,
ops.DistinctColumn: unary_prefix_op('distinct'),
}

# GENERAL
_general_ops = {
ops.Literal: literal,
ops.ValueList: _value_list,
Expand All @@ -640,18 +642,3 @@ class ByteLength(ops.StringLength):
_operation_registry.update(_string_ops)
_operation_registry.update(_date_ops)
_operation_registry.update(_agg_ops)


# numeric operations
_add_methods(ir.NumericValue, _trigonometric_ops, forced=True)
_add_methods(ir.NumericValue, _math_ops, forced=True)
_add_methods(ir.NumericValue, _geometric_ops, forced=True)
_add_methods(ir.NumericValue, _stats_ops, forced=False)
_add_methods(ir.ColumnExpr, _agg_ops, forced=True)

# string operations
_add_methods(ir.StringValue, _string_ops, forced=True)

# date/time/timestamp operations
_add_methods(ir.TimestampColumn, _date_ops, forced=True)
_add_methods(ir.DateColumn, _date_ops, forced=True)
2 changes: 1 addition & 1 deletion ibis/mapd/tests/conftest.py
Expand Up @@ -7,7 +7,7 @@
MAPD_PORT = int(os.environ.get('IBIS_TEST_MAPD_PORT', 9091))
MAPD_USER = os.environ.get('IBIS_TEST_MAPD_USER', 'mapd')
MAPD_PASS = os.environ.get('IBIS_TEST_MAPD_PASSWORD', 'HyperInteractive')
MAPD_DB = os.environ.get('IBIS_TEST_DATA_DB', 'mapd')
MAPD_DB = os.environ.get('IBIS_TEST_DATA_DB', 'ibis_testing')


@pytest.fixture(scope='module')
Expand Down

0 comments on commit a44f4bf

Please sign in to comment.