Skip to content

Commit

Permalink
Improve datamgr mapd
Browse files Browse the repository at this point in the history
  • Loading branch information
xmnlab committed May 2, 2018
1 parent ca95daa commit 95e3f8d
Show file tree
Hide file tree
Showing 2 changed files with 25 additions and 104 deletions.
127 changes: 24 additions & 103 deletions ci/datamgr.py
Expand Up @@ -200,97 +200,11 @@ def sqlite(database, schema, tables, data_directory, **params):
@click.option('-d', '--data-directory', default=DATA_DIR)
def mapd(schema, tables, data_directory, **params):
import pymapd
import numpy as np

data_directory = Path(data_directory)

int_na = -9999

table_dtype = dict(
functional_alltypes=dict(
index=np.int64,
Unnamed_=np.int64,
id=np.int32,
bool_col=np.bool,
tinyint_col=np.int16,
smallint_col=np.int16,
int_col=np.int32,
bigint_col=np.int64,
float_col=np.float32,
double_col=np.float64,
date_string_col=str,
string_col=str,
# timestamp_col=pd.datetime,
year_=np.int32,
month_=np.int32
),
diamonds=dict(
carat=np.float32,
cut=str,
color=str,
clarity=str,
depth=np.float32,
table_=np.float32,
price=np.int64,
x=np.float32,
y=np.float32,
z=np.float32
),
batting=dict(
playerID=str,
yearID=np.int64,
stint=np.int64,
teamID=str,
lgID=str,
G=np.int64,
AB=np.int64,
R=np.int64,
H=np.int64,
X2B=np.int64,
X3B=np.int64,
HR=np.int64,
RBI=np.int64,
SB=np.int64,
CS=np.int64,
BB=np.int64,
SO=np.int64,
IBB=np.int64,
HBP=np.int64,
SH=np.int64,
SF=np.int64,
GIDP=np.int64
),
awards_players=dict(
playerID=str,
awardID=str,
yearID=np.int64,
lgID=str,
tie=str,
notes=str
)
)

table_import_args = dict(
functional_alltypes=dict(
parse_dates=['timestamp_col']
),
diamonds={},
batting={},
awards_players={}

)

table_rename = dict(
functional_alltypes={
'Unnamed_': 'Unnamed: 0'
},
diamonds={},
batting={},
awards_players={}
)
reserved_words = ['table', 'year', 'month']

# connection
print(params)
click.echo('Initializing MapD...')
if params['database'] != 'mapd':
conn = pymapd.connect(
Expand Down Expand Up @@ -332,23 +246,30 @@ def mapd(schema, tables, data_directory, **params):

# import data
click.echo('[MAPD|II] Loading data ...')
for table in tables:
src = data_directory / '{}.csv'.format(table)
click.echo('[MAPD|II] src: {}'.format(src))
df = pd.read_csv(src, delimiter=',', **table_import_args[table])

# prepare data frame data type
for column, dtype in table_dtype[table].items():
if column.endswith('_'):
if column in table_rename[table]:
df_col = table_rename[table][column]
else:
df_col = column[:-1]
df.rename(columns={df_col: column}, inplace=True)
if np.issubdtype(dtype, int):
df[column].fillna(int_na, inplace=True)
df[column] = df[column].astype(dtype)
for table, df in read_tables(tables, data_directory):
if table == 'batting':
# float nan problem
cols = df.select_dtypes([float]).columns
df[cols] = df[cols].fillna(0).astype(int)
# string None driver problem
cols = df.select_dtypes([object]).columns
df[cols] = df[cols].fillna('')
elif table == 'awards_players':
# string None driver problem
cols = df.select_dtypes([object]).columns
df[cols] = df[cols].fillna('')

# rename fields
for df_col in df.columns:
if ' ' in df_col or ':' in df_col:
column = df_col.replace(' ', '_').replace(':', '_')
elif df_col in reserved_words:
column = '{}_'.format(df_col)
else:
continue
df.rename(columns={df_col: column}, inplace=True)
conn.load_table_columnar(table, df)

conn.close()

click.echo('[MAPD|II] Done!')
Expand Down
2 changes: 1 addition & 1 deletion ci/schema/mapd.sql
Expand Up @@ -55,7 +55,7 @@ DROP TABLE IF EXISTS functional_alltypes;

CREATE TABLE functional_alltypes (
index BIGINT,
Unnamed_ BIGINT,
Unnamed__0 BIGINT,
id INTEGER,
bool_col BOOLEAN,
tinyint_col SMALLINT,
Expand Down

0 comments on commit 95e3f8d

Please sign in to comment.