New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Blaze Data with Blaze Server is much slower than compute(expr, Client) #1240

Closed
cpcloud opened this Issue Sep 10, 2015 · 8 comments

Comments

Projects
None yet
3 participants
@cpcloud
Member

cpcloud commented Sep 10, 2015

No description provided.

@cpcloud cpcloud added the bug label Sep 10, 2015

@cpcloud cpcloud self-assigned this Sep 10, 2015

@cpcloud cpcloud added this to the 0.8.3 milestone Sep 10, 2015

@llllllllll

This comment has been minimized.

Member

llllllllll commented Sep 10, 2015

Can you post some timings? I will see if I can replicate with my setup

@cpcloud

This comment has been minimized.

Member

cpcloud commented Sep 10, 2015

@kwmsmith Can you post some timings here, or the expressions that are triggering the extra calls to /compute?

@kwmsmith

This comment has been minimized.

Member

kwmsmith commented Sep 10, 2015

@llllllllll @cpcloud : I tried to strip it down to a minimal example that demonstrates hitting the server many times to compute one expression; the best I can do thus far is the following:

from __future__ import print_function

import blaze as bz
from datashape import dshape

weight_dshape = dshape('''var * {
                          Symbol: string,
                          Weight: float64
                         }''')

trades_dshape = dshape("""var * {
                          TradeTime: ?datetime,
                          Symbol: ?string,
                          Price: ?float64,
                          Shares: int64,
                          Exchange: ?string,
                          Sales: ?string,
                          Sequence: int64,
                          Bid: ?float64,
                          Ask: ?float64,
                          BSize: int64,
                          ASize: int64
                          }""")

total_dshape = dshape('''{{sp500: {weight_dshape},
                           trades: {trades_dshape}}}'''.format(weight_dshape=weight_dshape,
                                                               trades_dshape=trades_dshape))

def compute_interactive_inproc():
    weights = bz.Data(bz.CSV('./demo/sp500.csv'),
                              dshape=weight_dshape)
    trades = bz.Data(bz.CSV('./demo/trades.csv.gz', dshape=trades_dshape))
    market_hours_sel = (((trades.TradeTime.hour == 9) & (trades.TradeTime.minute >= 30)) |
                        ((trades.TradeTime.hour >= 10) & (trades.TradeTime.hour < 16))) 
    market_trades = trades[market_hours_sel]
    minute_bars = bz.by(bz.merge(market_trades.Symbol,
                                 market_trades.TradeTime.date,
                                 market_trades.TradeTime.hour,
                                 market_trades.TradeTime.minute),
                        avg=market_trades.Price.mean(),
                        low=market_trades.Price.min(),
                        high=market_trades.Price.max(),
                        vol=market_trades.Shares.sum())
    join_bars = bz.join(minute_bars, weights, 'Symbol')
    weight_bars = bz.transform(join_bars, wt_price=join_bars.avg * join_bars.Weight)
    return bz.compute(weight_bars.count())

def compute_interactive_server(server_uri):
    ss = bz.Data(server_uri, dshape=total_dshape)
    weights = ss['sp500']
    trades = ss['trades']
    market_hours_sel = (((trades.TradeTime.hour == 9) & (trades.TradeTime.minute >= 30)) |
                        ((trades.TradeTime.hour >= 10) & (trades.TradeTime.hour < 16))) 
    market_trades = trades[market_hours_sel]
    minute_bars = bz.by(bz.merge(market_trades.Symbol,
                                 market_trades.TradeTime.date,
                                 market_trades.TradeTime.hour,
                                 market_trades.TradeTime.minute),
                        avg=market_trades.Price.mean(),
                        low=market_trades.Price.min(),
                        high=market_trades.Price.max(),
                        vol=market_trades.Shares.sum())
    join_bars = bz.join(minute_bars, weights, 'Symbol')
    weight_bars = bz.transform(join_bars, wt_price=join_bars.avg * join_bars.Weight)
    return bz.compute(weight_bars.count())

I can work on stripping this down further.

My server code:

from __future__ import print_function
import blaze as bz
from blaze.server import Server

def main():
    sp500 = bz.CSV('./demo/sp500.csv')
    trades = bz.CSV('./demo/trades.csv.gz')
    return Server({'sp500': sp500,
                   'trades': trades})

if __name__ == '__main__':
    server = main()
    server.run()
In [118]: %time compute_interactive_inproc()
CPU times: user 1min 14s, sys: 8.67 s, total: 1min 23s
Wall time: 1min 23s
Out[118]: 70410

In [119]: %time compute_interactive_server('blaze://localhost:6363')
CPU times: user 1.09 s, sys: 32.8 ms, total: 1.13 s
Wall time: 1min 25s
Out[119]: 70410

So the overall runtime is the same for in-process versus server; I was mistaken there.

The server version generates 17 POSTs, all routed through /compute.

If instead I do bz.compute(join_bars.count()) in compute_interactive_server(), just one POST to /compute is generated, and it takes about 8s total runtime.

@kwmsmith

This comment has been minimized.

Member

kwmsmith commented Sep 14, 2015

@cpcloud @llllllllll It looks like, with the above, the following 17 expressions are being evaluated, either in-process or sent to the server. The first 16 expressions aren't necessary, AFAICT:

1: market_trades.Symbol.head(11)
2: market_trades.TradeTime.date.head(11)
3: market_trades.TradeTime.hour.head(11)
4: market_trades.TradeTime.minute.head(11)

5: market_trades.Symbol.head(11)
6: market_trades.TradeTime.date.head(11)
7: market_trades.TradeTime.hour.head(11)
8: market_trades.TradeTime.minute.head(11)

9: market_trades.Symbol.head(11)
10: market_trades.TradeTime.date.head(11)
11: market_trades.TradeTime.hour.head(11)
12: market_trades.TradeTime.minute.head(11)

13: market_trades.Symbol.head(11)
14: market_trades.TradeTime.date.head(11)
15: market_trades.TradeTime.hour.head(11)
16: market_trades.TradeTime.minute.head(11)

17: weight_bars.count()

Full output is below:

(Pdb) p expr
_1.trades[((_1.trades.TradeTime.hour == 9) & (_1.trades.TradeTime.minute >= 30)) | ((_1.trades.TradeTime.hour >= 10) & (_1.trades.TradeTime.hour < 16))].Symbol.head(11)
(Pdb) c
> /Users/ksmith/work/blaze/blaze-repo/blaze/server/client.py(149)compute_down()
-> tree = to_tree(expr)
(Pdb) p expr
_1.trades[((_1.trades.TradeTime.hour == 9) & (_1.trades.TradeTime.minute >= 30)) | ((_1.trades.TradeTime.hour >= 10) & (_1.trades.TradeTime.hour < 16))].TradeTime.date.head(11)
(Pdb) c
> /Users/ksmith/work/blaze/blaze-repo/blaze/server/client.py(149)compute_down()
-> tree = to_tree(expr)
(Pdb) p expr
_1.trades[((_1.trades.TradeTime.hour == 9) & (_1.trades.TradeTime.minute >= 30)) | ((_1.trades.TradeTime.hour >= 10) & (_1.trades.TradeTime.hour < 16))].TradeTime.hour.head(11)
(Pdb) c
> /Users/ksmith/work/blaze/blaze-repo/blaze/server/client.py(149)compute_down()
-> tree = to_tree(expr)
(Pdb) p expr
_1.trades[((_1.trades.TradeTime.hour == 9) & (_1.trades.TradeTime.minute >= 30)) | ((_1.trades.TradeTime.hour >= 10) & (_1.trades.TradeTime.hour < 16))].TradeTime.minute.head(11)
(Pdb) c
> /Users/ksmith/work/blaze/blaze-repo/blaze/server/client.py(149)compute_down()
-> tree = to_tree(expr)
(Pdb) p expr
_1.trades[((_1.trades.TradeTime.hour == 9) & (_1.trades.TradeTime.minute >= 30)) | ((_1.trades.TradeTime.hour >= 10) & (_1.trades.TradeTime.hour < 16))].Symbol.head(11)
(Pdb) c
> /Users/ksmith/work/blaze/blaze-repo/blaze/server/client.py(149)compute_down()
-> tree = to_tree(expr)
(Pdb) p expr
_1.trades[((_1.trades.TradeTime.hour == 9) & (_1.trades.TradeTime.minute >= 30)) | ((_1.trades.TradeTime.hour >= 10) & (_1.trades.TradeTime.hour < 16))].TradeTime.date.head(11)
(Pdb) c
> /Users/ksmith/work/blaze/blaze-repo/blaze/server/client.py(149)compute_down()
-> tree = to_tree(expr)
(Pdb) p expr
_1.trades[((_1.trades.TradeTime.hour == 9) & (_1.trades.TradeTime.minute >= 30)) | ((_1.trades.TradeTime.hour >= 10) & (_1.trades.TradeTime.hour < 16))].TradeTime.hour.head(11)
(Pdb) c
> /Users/ksmith/work/blaze/blaze-repo/blaze/server/client.py(149)compute_down()
-> tree = to_tree(expr)
(Pdb) p expr
_1.trades[((_1.trades.TradeTime.hour == 9) & (_1.trades.TradeTime.minute >= 30)) | ((_1.trades.TradeTime.hour >= 10) & (_1.trades.TradeTime.hour < 16))].TradeTime.minute.head(11)
(Pdb) c
> /Users/ksmith/work/blaze/blaze-repo/blaze/server/client.py(149)compute_down()
-> tree = to_tree(expr)
(Pdb) p expr
_1.trades[((_1.trades.TradeTime.hour == 9) & (_1.trades.TradeTime.minute >= 30)) | ((_1.trades.TradeTime.hour >= 10) & (_1.trades.TradeTime.hour < 16))].Symbol.head(11)
(Pdb) c
> /Users/ksmith/work/blaze/blaze-repo/blaze/server/client.py(149)compute_down()
-> tree = to_tree(expr)
(Pdb) p expr
_1.trades[((_1.trades.TradeTime.hour == 9) & (_1.trades.TradeTime.minute >= 30)) | ((_1.trades.TradeTime.hour >= 10) & (_1.trades.TradeTime.hour < 16))].TradeTime.date.head(11)
(Pdb) c
> /Users/ksmith/work/blaze/blaze-repo/blaze/server/client.py(149)compute_down()
-> tree = to_tree(expr)
(Pdb) p expr
_1.trades[((_1.trades.TradeTime.hour == 9) & (_1.trades.TradeTime.minute >= 30)) | ((_1.trades.TradeTime.hour >= 10) & (_1.trades.TradeTime.hour < 16))].TradeTime.hour.head(11)
(Pdb) c
> /Users/ksmith/work/blaze/blaze-repo/blaze/server/client.py(149)compute_down()
-> tree = to_tree(expr)
(Pdb) p expr
_1.trades[((_1.trades.TradeTime.hour == 9) & (_1.trades.TradeTime.minute >= 30)) | ((_1.trades.TradeTime.hour >= 10) & (_1.trades.TradeTime.hour < 16))].TradeTime.minute.head(11)
(Pdb) c
> /Users/ksmith/work/blaze/blaze-repo/blaze/server/client.py(149)compute_down()
-> tree = to_tree(expr)
(Pdb) p expr
_1.trades[((_1.trades.TradeTime.hour == 9) & (_1.trades.TradeTime.minute >= 30)) | ((_1.trades.TradeTime.hour >= 10) & (_1.trades.TradeTime.hour < 16))].Symbol.head(11)
(Pdb) c
> /Users/ksmith/work/blaze/blaze-repo/blaze/server/client.py(149)compute_down()
-> tree = to_tree(expr)
(Pdb) p expr
_1.trades[((_1.trades.TradeTime.hour == 9) & (_1.trades.TradeTime.minute >= 30)) | ((_1.trades.TradeTime.hour >= 10) & (_1.trades.TradeTime.hour < 16))].TradeTime.date.head(11)
(Pdb) c
> /Users/ksmith/work/blaze/blaze-repo/blaze/server/client.py(149)compute_down()
-> tree = to_tree(expr)
(Pdb) p expr
_1.trades[((_1.trades.TradeTime.hour == 9) & (_1.trades.TradeTime.minute >= 30)) | ((_1.trades.TradeTime.hour >= 10) & (_1.trades.TradeTime.hour < 16))].TradeTime.hour.head(11)
(Pdb) c
> /Users/ksmith/work/blaze/blaze-repo/blaze/server/client.py(149)compute_down()
-> tree = to_tree(expr)
(Pdb) p expr
_1.trades[((_1.trades.TradeTime.hour == 9) & (_1.trades.TradeTime.minute >= 30)) | ((_1.trades.TradeTime.hour >= 10) & (_1.trades.TradeTime.hour < 16))].TradeTime.minute.head(11)
(Pdb) c
> /Users/ksmith/work/blaze/blaze-repo/blaze/server/client.py(149)compute_down()
-> tree = to_tree(expr)
(Pdb) p expr
count(Merge(_child=Join(lhs=by(Merge(_child=_1.trades[((_1.trades.TradeTime.hour == 9) & (_1.trades.TradeTime.minute >= 30)) | ((_1.trades.TradeTime.hour >= 10) & (_1.trades.TradeTime.hour < 16))], children=(_1.trades[((_1.trades.TradeTime.hour == 9) & (_1.trades.TradeTime.minute >= 30)) | ((_1.trades.TradeTime.hour >= 10) & (_1.trades.TradeTime.hour < 16))].Symbol,
 _1.trades[((_1.trades.TradeTime.hour == 9) & (_1.trades.TradeTime.minute >= 30)) | ((_1.trades.TradeTime.hour >= 10) & (_1.trades.TradeTime.hour < 16))].TradeTime.date,
 _1.trades[((_1.trades.TradeTime.hour == 9) & (_1.trades.TradeTime.minute >= 30)) | ((_1.trades.TradeTime.hour >= 10) & (_1.trades.TradeTime.hour < 16))].TradeTime.hour,
 _1.trades[((_1.trades.TradeTime.hour == 9) & (_1.trades.TradeTime.minute >= 30)) | ((_1.trades.TradeTime.hour >= 10) & (_1.trades.TradeTime.hour < 16))].TradeTime.minute)), avg=mean(_1.trades[((_1.trades.TradeTime.hour == 9) & (_1.trades.TradeTime.minute >= 30)) | ((_1.trades.TradeTime.hour >= 10) & (_1.trades.TradeTime.hour < 16))].Price), high=max(_1.trades[((_1.trades.TradeTime.hour == 9) & (_1.trades.TradeTime.minute >= 30)) | ((_1.trades.TradeTime.hour >= 10) & (_1.trades.TradeTime.hour < 16))].Price), low=min(_1.trades[((_1.trades.TradeTime.hour == 9) & (_1.trades.TradeTime.minute >= 30)) | ((_1.trades.TradeTime.hour >= 10) & (_1.trades.TradeTime.hour < 16))].Price), vol=sum(_1.trades[((_1.trades.TradeTime.hour == 9) & (_1.trades.TradeTime.minute >= 30)) | ((_1.trades.TradeTime.hour >= 10) & (_1.trades.TradeTime.hour < 16))].Shares)), rhs=_1.sp500, _on_left='Symbol', _on_right='Symbol', how='inner', suffixes=('_left', '_right')), children=(Join(lhs=by(Merge(_child=_1.trades[((_1.trades.TradeTime.hour == 9) & (_1.trades.TradeTime.minute >= 30)) | ((_1.trades.TradeTime.hour >= 10) & (_1.trades.TradeTime.hour < 16))], children=(_1.trades[((_1.trades.TradeTime.hour == 9) & (_1.trades.TradeTime.minute >= 30)) | ((_1.trades.TradeTime.hour >= 10) & (_1.trades.TradeTime.hour < 16))].Symbol,
 _1.trades[((_1.trades.TradeTime.hour == 9) & (_1.trades.TradeTime.minute >= 30)) | ((_1.trades.TradeTime.hour >= 10) & (_1.trades.TradeTime.hour < 16))].TradeTime.date,
 _1.trades[((_1.trades.TradeTime.hour == 9) & (_1.trades.TradeTime.minute >= 30)) | ((_1.trades.TradeTime.hour >= 10) & (_1.trades.TradeTime.hour < 16))].TradeTime.hour,
 _1.trades[((_1.trades.TradeTime.hour == 9) & (_1.trades.TradeTime.minute >= 30)) | ((_1.trades.TradeTime.hour >= 10) & (_1.trades.TradeTime.hour < 16))].TradeTime.minute)), avg=mean(_1.trades[((_1.trades.TradeTime.hour == 9) & (_1.trades.TradeTime.minute >= 30)) | ((_1.trades.TradeTime.hour >= 10) & (_1.trades.TradeTime.hour < 16))].Price), high=max(_1.trades[((_1.trades.TradeTime.hour == 9) & (_1.trades.TradeTime.minute >= 30)) | ((_1.trades.TradeTime.hour >= 10) & (_1.trades.TradeTime.hour < 16))].Price), low=min(_1.trades[((_1.trades.TradeTime.hour == 9) & (_1.trades.TradeTime.minute >= 30)) | ((_1.trades.TradeTime.hour >= 10) & (_1.trades.TradeTime.hour < 16))].Price), vol=sum(_1.trades[((_1.trades.TradeTime.hour == 9) & (_1.trades.TradeTime.minute >= 30)) | ((_1.trades.TradeTime.hour >= 10) & (_1.trades.TradeTime.hour < 16))].Shares)), rhs=_1.sp500, _on_left='Symbol', _on_right='Symbol', how='inner', suffixes=('_left', '_right')),
 label((Join(lhs=by(Merge(_child=_1.trades[((_1.trades.TradeTime.hour == 9) & (_1.trades.TradeTime.minute >= 30)) | ((_1.trades.TradeTime.hour >= 10) & (_1.trades.TradeTime.hour < 16))], children=(_1.trades[((_1.trades.TradeTime.hour == 9) & (_1.trades.TradeTime.minute >= 30)) | ((_1.trades.TradeTime.hour >= 10) & (_1.trades.TradeTime.hour < 16))].Symbol,
 _1.trades[((_1.trades.TradeTime.hour == 9) & (_1.trades.TradeTime.minute >= 30)) | ((_1.trades.TradeTime.hour >= 10) & (_1.trades.TradeTime.hour < 16))].TradeTime.date,
 _1.trades[((_1.trades.TradeTime.hour == 9) & (_1.trades.TradeTime.minute >= 30)) | ((_1.trades.TradeTime.hour >= 10) & (_1.trades.TradeTime.hour < 16))].TradeTime.hour,
 _1.trades[((_1.trades.TradeTime.hour == 9) & (_1.trades.TradeTime.minute >= 30)) | ((_1.trades.TradeTime.hour >= 10) & (_1.trades.TradeTime.hour < 16))].TradeTime.minute)), avg=mean(_1.trades[((_1.trades.TradeTime.hour == 9) & (_1.trades.TradeTime.minute >= 30)) | ((_1.trades.TradeTime.hour >= 10) & (_1.trades.TradeTime.hour < 16))].Price), high=max(_1.trades[((_1.trades.TradeTime.hour == 9) & (_1.trades.TradeTime.minute >= 30)) | ((_1.trades.TradeTime.hour >= 10) & (_1.trades.TradeTime.hour < 16))].Price), low=min(_1.trades[((_1.trades.TradeTime.hour == 9) & (_1.trades.TradeTime.minute >= 30)) | ((_1.trades.TradeTime.hour >= 10) & (_1.trades.TradeTime.hour < 16))].Price), vol=sum(_1.trades[((_1.trades.TradeTime.hour == 9) & (_1.trades.TradeTime.minute >= 30)) | ((_1.trades.TradeTime.hour >= 10) & (_1.trades.TradeTime.hour < 16))].Shares)), rhs=_1.sp500, _on_left='Symbol', _on_right='Symbol', how='inner', suffixes=('_left', '_right')).avg) * (Join(lhs=by(Merge(_child=_1.trades[((_1.trades.TradeTime.hour == 9) & (_1.trades.TradeTime.minute >= 30)) | ((_1.trades.TradeTime.hour >= 10) & (_1.trades.TradeTime.hour < 16))], children=(_1.trades[((_1.trades.TradeTime.hour == 9) & (_1.trades.TradeTime.minute >= 30)) | ((_1.trades.TradeTime.hour >= 10) & (_1.trades.TradeTime.hour < 16))].Symbol,
 _1.trades[((_1.trades.TradeTime.hour == 9) & (_1.trades.TradeTime.minute >= 30)) | ((_1.trades.TradeTime.hour >= 10) & (_1.trades.TradeTime.hour < 16))].TradeTime.date,
 _1.trades[((_1.trades.TradeTime.hour == 9) & (_1.trades.TradeTime.minute >= 30)) | ((_1.trades.TradeTime.hour >= 10) & (_1.trades.TradeTime.hour < 16))].TradeTime.hour,
 _1.trades[((_1.trades.TradeTime.hour == 9) & (_1.trades.TradeTime.minute >= 30)) | ((_1.trades.TradeTime.hour >= 10) & (_1.trades.TradeTime.hour < 16))].TradeTime.minute)), avg=mean(_1.trades[((_1.trades.TradeTime.hour == 9) & (_1.trades.TradeTime.minute >= 30)) | ((_1.trades.TradeTime.hour >= 10) & (_1.trades.TradeTime.hour < 16))].Price), high=max(_1.trades[((_1.trades.TradeTime.hour == 9) & (_1.trades.TradeTime.minute >= 30)) | ((_1.trades.TradeTime.hour >= 10) & (_1.trades.TradeTime.hour < 16))].Price), low=min(_1.trades[((_1.trades.TradeTime.hour == 9) & (_1.trades.TradeTime.minute >= 30)) | ((_1.trades.TradeTime.hour >= 10) & (_1.trades.TradeTime.hour < 16))].Price), vol=sum(_1.trades[((_1.trades.TradeTime.hour == 9) & (_1.trades.TradeTime.minute >= 30)) | ((_1.trades.TradeTime.hour >= 10) & (_1.trades.TradeTime.hour < 16))].Shares)), rhs=_1.sp500, _on_left='Symbol', _on_right='Symbol', how='inner', suffixes=('_left', '_right')).Weight), 'wt_price'))))
@cpcloud

This comment has been minimized.

Member

cpcloud commented Sep 14, 2015

This is due to the way common_subexpression works. Looking into it now.

@kwmsmith

This comment has been minimized.

Member

kwmsmith commented Sep 14, 2015

I'm able to fix it using the following. Does this look right to you @cpcloud ?

diff --git a/blaze/expr/core.py b/blaze/expr/core.py
index b3b6836..0b7d90b 100644
--- a/blaze/expr/core.py
+++ b/blaze/expr/core.py
@@ -290,6 +290,8 @@ def _str(s):
         return get_callable_name(s)
     elif isinstance(s, Node):
         return str(s)
+    elif isinstance(s, (list, tuple)):
+        return tuple(_str(x) for x in s)
     else:
         stream = StringIO()
         pprint(s, stream=stream)

kwmsmith added a commit that referenced this issue Sep 14, 2015

Fix for issue #1240.
Calling _str() on a tuple of expressions triggers a __repr__ call on
each element, resulting in unnecessary computation.

This fix detects when stringifying a tuple of exprs, and calls _str()
recursively in that case.

kwmsmith added a commit that referenced this issue Sep 14, 2015

cpcloud added a commit that referenced this issue Sep 15, 2015

@cpcloud cpcloud modified the milestones: 0.8.3, 0.9.0 Sep 15, 2015

@cpcloud

This comment has been minimized.

Member

cpcloud commented Dec 4, 2015

@kwmsmith can this be closed?

@kwmsmith

This comment has been minimized.

Member

kwmsmith commented Dec 4, 2015

Yep, closing.

@kwmsmith kwmsmith closed this Dec 4, 2015

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment