Skip to content

Commit

Permalink
Fixed Redis caching to use Parquet
Browse files Browse the repository at this point in the history
  • Loading branch information
saeedamen committed May 20, 2022
1 parent df8439a commit 3db2deb
Show file tree
Hide file tree
Showing 16 changed files with 83 additions and 84 deletions.
1 change: 1 addition & 0 deletions .gitignore
Expand Up @@ -7,6 +7,7 @@ __pycache__/

# Credentials files
*cred.py
*cred*.py
*credpro.py
*creduser.py
*.env
Expand Down
2 changes: 2 additions & 0 deletions CHANGELOG.md
@@ -1,6 +1,8 @@
# tcapy change log

## Coding log
* 20 May 2022
* Fixed Redis caching (now uses Parquet)
* 20 Oct 2021
* Fixed docker-compose.xml (removed trailing /) for newer Docker versions
* 29 Jun 2021
Expand Down
Expand Up @@ -245,7 +245,7 @@ dependencies:
- celery==5.0.5
- chardet==3.0.4
- chart-studio==1.1.0
- chartpy==0.1.8
- chartpy==0.1.11
- click-didyoumean==0.0.3
- click-plugins==1.1.1
- click-repl==0.1.6
Expand Down Expand Up @@ -300,9 +300,11 @@ dependencies:
- nest-asyncio==1.0.0
- oauthlib==3.1.0
- openpyxl==3.0.6
- pandas==1.0.5
- pandas-datareader==0.9.0
- plotly==4.9.0
- protobuf==3.6.1
- pyarrow==6.0.1
- pymongo==3.11.3
- pyphen==0.10.0
- pystore==0.1.22
Expand Down
16 changes: 10 additions & 6 deletions batch_scripts/linux/installation/install_pip_python_packages.sh
Expand Up @@ -15,9 +15,9 @@ echo 'Installing Python packages...'
if [ $TCAPY_PYTHON_ENV_TYPE == "virtualenv" ]; then

# Install everything by pip
pip install
pip install \
setuptools-git==1.2 cython arctic==1.79.4 sqlalchemy==1.3.17 redis==3.3.7 \
pandas==1.2.3 numpy scipy statsmodels==0.11.1 blosc==1.8.3 pyarrow==2.0.0 \
pandas==1.0.5 numpy scipy statsmodels==0.11.1 blosc==1.8.3 pyarrow==2.0.0 \
pathos==0.2.1 multiprocess==0.70.9 fastparquet==0.5.0 \
flask-restplus==0.13.0 gunicorn==19.9.0 \
beautifulsoup4==4.8.0 pdfkit==0.6.1 psutil==5.6.6 \
Expand All @@ -33,7 +33,7 @@ if [ $TCAPY_PYTHON_ENV_TYPE == "virtualenv" ]; then
Flask-Session==0.3.1 \
celery==5.0.5 pytest-tap kombu python-memcached==1.59 numba==0.48.0 vispy==0.6.4 jinja2==2.11.2 \
jupyterlab jupyter_contrib_nbextensions jupyter_nbextensions_configurator RISE bqplot WeasyPrint==51 \
dask==2.14.0 distributed==2.14.0 cloudpickle==1.3.0 python-snappy==0.5.4 bokeh==2.0.1 msgpack==1.0.0 pystore==0.1.22 fsspec==0.3.3 eikon==1.1.2
dask==2.14.0 distributed==2.14.0 cloudpickle==1.3.0 python-snappy==0.5.4 bokeh==2.0.1 msgpack==1.0.0 pystore==0.1.22 fsspec==0.3.3 eikon==1.1.2 vaex

# Can't install orca with pip (has to be done manually or via conda)
sudo apt-get install nodejs npm
Expand All @@ -42,7 +42,7 @@ elif [ $TCAPY_PYTHON_ENV_TYPE == "conda" ] && [ $CONDA_FROM_YAML == 0 ]; then

# Install conda forge packages
conda install -c conda-forge \
setuptools-git cython sqlalchemy redis-py \
setuptools-git cython sqlalchemy redis-py python=3.7 \
pandas=1.0.5 numpy scipy statsmodels python-blosc \
pathos multiprocess fastparquet \
flask-restplus gunicorn \
Expand All @@ -53,7 +53,7 @@ elif [ $TCAPY_PYTHON_ENV_TYPE == "conda" ] && [ $CONDA_FROM_YAML == 0 ]; then
pytest pytest-cov \
numba pyarrow=2.0.0 vispy jinja2 \
jupyterlab jupyter_contrib_nbextensions jupyter_nbextensions_configurator nodejs rise bqplot \
dask distributed cloudpickle python-snappy bokeh msgpack-python --yes
dask distributed cloudpickle python-snappy bokeh msgpack-python vaex --yes

# Install charting libraries
# for flash recording of session variables
Expand All @@ -65,4 +65,8 @@ elif [ $TCAPY_PYTHON_ENV_TYPE == "conda" ] && [ $CONDA_FROM_YAML == 0 ]; then
qpython==2.0.0 influxdb==5.2.3 \
Flask-Session==0.3.1 \
celery==5.0.5 pytest-tap kombu python-memcached==1.59 WeasyPrint==51 pystore==0.1.22 fsspec==0.3.3 eikon==1.1.2
fi
fi

# Hack for vaex
pip uninstall progressbar2
pip install progressbar2
Expand Up @@ -25,7 +25,7 @@ sudo rm -f /etc/nginx/conf.d/000-default.conf
sudo rm -f /etc/nginx/conf.d/default.conf

# Allows reading of files outside of nginx folder (just for Red Hat/Centos
if [ $DISTRO == "redhat" ]; then
if [ $DISTRO == "redhat" ]; then
sudo setenforce 0
fi

Expand Down
4 changes: 2 additions & 2 deletions batch_scripts/linux/installation/set_tcapy_env_vars.sh
Expand Up @@ -14,10 +14,10 @@ export TCAPY_CUEMACRO=/home/$USER/cuemacro/tcapy
export TCAPY_PYTHON_ENV_TYPE="conda"
# export TCAPY_PYTHON_ENV=/home/$USER/py37tca/ # virtualenv folder or conda name
export TCAPY_PYTHON_ENV=py37tca # virtualenv folder or conda name
export TCAPY_PYTHON_ENV_BIN=$TCAPY_PYTHON_ENV/bin/
export TCAPY_PYTHON_ENV_BIN=/home/$USER/$TCAPY_PYTHON_ENV/bin/
export TCAPY_PYTHON_VERSION=3 # Only Python 3 is now supported

export CONDA_ACTIVATE=/home/tcapyuser/anaconda3/bin/activate
export CONDA_ACTIVATE=/home/$USER/anaconda3/bin/activate
export TCAPY_USER=$USER # which user to run tcapy

export TCAPY_CELERY_WORKERS=14
Expand Down
8 changes: 4 additions & 4 deletions tcapy/conf/constants.py
Expand Up @@ -81,8 +81,8 @@ class Constants(object):
temp_large_data_folder = os.path.join(test_data_folder, 'large')

# For backward compatibility with Python 2
pickle.DEFAULT_PROTOCOL = 2
pickle.HIGHEST_PROTOCOL = 2
pickle.DEFAULT_PROTOCOL = 4
pickle.HIGHEST_PROTOCOL = 4

# Default format for writing small binary files to disk (eg. by DatabasePopulator, UtilFunc..)
# 'parquet' (or 'hdf5', but that requires pytables tables Python package installed, which is not done by default)
Expand Down Expand Up @@ -605,8 +605,8 @@ class Constants(object):
# at current stage arrow is not fully tested
volatile_cache_redis_format = 'arrow' # 'msgpack' or 'arrow'

volatile_cache_redis_compression = {'msgpack' : 'blosc',
'arrow' : 'snappy'} # 'lz4' or 'snappy'
volatile_cache_redis_compression = {'msgpack': 'blosc',
'arrow': 'gzip'} # 'lz4' or 'snappy'

# Above this size we need to break apart our keys into different chunks before pushing into Redis
# Redis has a maximum size of what we can store in a single value (512 is maximum, can tweak lower)
Expand Down
2 changes: 1 addition & 1 deletion tcapy/conf/mongo.conf
@@ -1,5 +1,5 @@
dbpath = /data/db
logpath = /home/tcapyuser/cuemacro/tcapy/log/mongo.log
logpath = /home/ubuntu/cuemacro/tcapy/log/mongo.log
# logpath = /tmp/mongo.log
logappend = true
bind_ip = 127.0.0.1
Expand Down
76 changes: 25 additions & 51 deletions tcapy/util/deltaizeserialize.py
Expand Up @@ -11,7 +11,7 @@
import math
import json
import pandas as pd
import pyarrow as pa
# import pyarrow as pa

from plotly.utils import PlotlyJSONEncoder
import plotly.graph_objs as go
Expand All @@ -21,7 +21,8 @@
from tcapy.util.timeseries import TimeSeriesOps
from tcapy.util.loggermanager import LoggerManager

context = pa.default_serialization_context()
# context = pa.default_serialization_context()
import io

constants = Constants()

Expand Down Expand Up @@ -70,38 +71,36 @@ def convert_python_to_binary(self, obj, key):
constants.volatile_cache_redis_format])

elif constants.volatile_cache_redis_format == 'arrow':
# Set the size of each compressed object, so can read back later
# eg. key might be xxxx_size_354534_size_345345_endsize etc.
# Ignore bit before first '_size_' and after '_endsize'
for i in range(0, len(obj_list)):
if obj_list[i] is not None:
ser = context.serialize(obj_list[i]).to_buffer()

obj_list[i] = pa.compress(ser,
codec=constants.volatile_cache_redis_compression[
constants.volatile_cache_redis_format],
asbytes=True)

key = key + '_size_' + str(len(ser))

key = key + '_endsizearrow_'
ser = io.BytesIO()
obj_list[i].to_parquet(ser,
compression=constants.volatile_cache_redis_compression[
constants.volatile_cache_redis_format])
ser.seek(0)

obj_list[i] = ser.read()
else:
raise Exception("Invalid volatile cache format specified.")
raise Exception("Invalid volatile cache format specified " + constants.volatile_cache_redis_format)
elif '_comp' not in key:
if constants.volatile_cache_redis_format == 'msgpack':

for i in range(0, len(obj_list)):
if obj_list[i] is not None:
obj_list[i] = obj_list[i].to_msgpack()
elif constants.volatile_cache_redis_format == 'arrow':
# context = pa.default_serialization_context()

elif constants.volatile_cache_redis_format == "arrow":
for i in range(0, len(obj_list)):
if obj_list[i] is not None:
obj_list[i] = context.serialize(obj_list[i]).to_buffer().to_pybytes()
ser = io.BytesIO()
obj_list[i].to_parquet(ser,
compression=
constants.volatile_cache_redis_compression[
constants.volatile_cache_redis_format])
ser.seek(0)

obj_list[i] = ser.read()
else:
raise Exception("Invalid volatile cache format specified.")
raise Exception("Invalid volatile cache format specified " + constants.volatile_cache_redis_format)

# For Plotly JSON style objects (assume these will fit in the cache, as they tend to used downsampled data)
elif '_fig' in key:
Expand All @@ -127,39 +126,14 @@ def convert_binary_to_python(self, obj, key):
obj[i] = pd.read_msgpack(obj[i])

elif constants.volatile_cache_redis_format == 'arrow':

# If compressed we need to know the size, to decompress it
if '_comp' in key:
# Get the size of each compressed object
# eg. key might be xxxx_size_354534_size_345345_endsize etc.
# Ignore bit before first '_size_' and after '_endsize'

start = '_size_'
end = '_endsizearrow_'

if len(obj) > 0:
key = self._util_func.find_sub_string_between(key, start, end)
siz = self._util_func.keep_numbers_list(key.split('_size_'))

for i in range(0, len(obj)):
if obj[i] is not None:
obj[i] = pa.decompress(obj[i],
codec=constants.volatile_cache_redis_compression[
constants.volatile_cache_redis_format],
decompressed_size=siz[i])

obj[i] = context.deserialize(obj[i])
else:
for i in range(0, len(obj)):
if obj[i] is not None:
obj[i] = context.deserialize(obj[i])

# Need to copy because Arrow doesn't allow writing on a DataFrame
for i in range(0, len(obj)):
if obj[i] is not None:
obj[i] = obj[i].copy()

obj[i] = io.BytesIO(obj[i])
obj[i] = pd.read_parquet(obj[i])#, compression=constants.volatile_cache_redis_compression[constants.volatile_cache_redis_format])
else:
raise Exception("Invalid volatile cache format specified.")
raise Exception("Invalid volatile cache format specified "
+ str(constants.volatile_cache_redis_format))

if len(obj) == 1:
obj = obj[0]
Expand Down
38 changes: 27 additions & 11 deletions tcapy/vis/app.py
Expand Up @@ -18,7 +18,7 @@ class to render the layout and SessionManager to keep track of each user session
from tcapy.vis.app_imports import *

# Create Flask object and create Dash instance on top of it
server=Flask(__name__)
server = Flask(__name__)

debug_start_flask_server_directly = constants.debug_start_flask_server_directly

Expand All @@ -45,13 +45,23 @@ class to render the layout and SessionManager to keep track of each user session
else:
routes_pathname_prefix = constants.routes_pathname_prefix

print(routes_pathname_prefix)

for css in stylesheets:
# app.css.append_css({"external_url": static_css_route + css})
stylesheets_path.append(static_css_route + css)

app = dash.Dash(name='tcapy', server=server, url_base_pathname=routes_pathname_prefix,
suppress_callback_exceptions=True, serve_locally=True, external_stylesheets=stylesheets_path,
app = dash.Dash(name='tcapy', server=server, #url_base_pathname=routes_pathname_prefix,
suppress_callback_exceptions=True, serve_locally=True,
external_stylesheets=stylesheets_path,
meta_tags=[{"name": "viewport", "content": "width=1000px"}])

app.config.update({
'routes_pathname_prefix': routes_pathname_prefix,
'requests_pathname_prefix': routes_pathname_prefix,
})


app.title = 'tcapy'
app.server.secret_key = constants.secret_key

Expand All @@ -65,18 +75,20 @@ class to render the layout and SessionManager to keep track of each user session

# This loads up a user specific version of the layout and TCA application
if constants.tcapy_version == 'user':
from tcapyuser.layoutuser import *; layout = LayoutImplUser(app=app, constants=constants, url_prefix=url_prefix)
from tcapyuser.layoutuser import *; layout = LayoutImplUser(
app=app, constants=constants, url_prefix=url_prefix)
from tcapyuser.tcacalleruser import TCACallerImplUser as TCACaller

# this loads up a generic version of the layout and TCA application
elif constants.tcapy_version == 'test_tcapy' or constants.tcapy_version == 'gen':
from tcapygen.layoutgen import *; layout = LayoutDashImplGen(app=app, constants=constants, url_prefix=url_prefix)
from tcapygen.layoutgen import *; layout = LayoutDashImplGen(
app=app, constants=constants, url_prefix=url_prefix)
from tcapygen.tcacallergen import TCACallerImplGen as TCACaller

# you can add your own additional layout versions here
# app.config['SESSION_TYPE'] = 'memcached'

########################################################################################################################
###############################################################################

logger.info("Root path = " + app.server.root_path)

Expand All @@ -89,12 +101,14 @@ class to render the layout and SessionManager to keep track of each user session
# Create the HTML layout for the pages (note: this is in a separate file layoutdash.py)
app.layout = layout.page_content

plain_css = open(os.path.join(os.path.dirname(os.path.realpath(__file__)), 'tcapy.css'), 'r').read()
plain_css = open(os.path.join(os.path.dirname(os.path.realpath(__file__)),
'tcapy.css'), 'r').read()

# Create objects for caching data for clients sessions (and also internally for the server)
glob_volatile_cache = Mediator.get_volatile_cache()

########################################################################################################################
###############################################################################


@app.server.route('/static/<path>')
def static_file(path):
Expand Down Expand Up @@ -136,15 +150,17 @@ def serve_csv():
def favicon():
return flask.send_from_directory(app.server.root_path, 'favicon.ico')

########################################################################################################################
###############################################################################

from tcapy.vis.displaylisteners import DisplayListeners

# converts GUI output into TCARequest objects and this then calls TCAEngine (adds listeners for the processing buttons)
tca_caller = TCACaller(app, session_manager, callback_manager, glob_volatile_cache, layout)
tca_caller = TCACaller(app, session_manager, callback_manager,
glob_volatile_cache, layout)

# adds listeners/code for each GUI display components which show the dataframes generated by TCACaller/TCAEngine
display_listeners = DisplayListeners(app, layout, session_manager, callback_manager, tca_caller, plain_css, url_prefix)
display_listeners = DisplayListeners(app, layout, session_manager,
callback_manager, tca_caller, plain_css, url_prefix)

if __name__ == '__main__':
# need this for WINDOWS machines, to ensure multiprocessing stuff works properly
Expand Down
2 changes: 1 addition & 1 deletion tcapy/vis/app_board.py
Expand Up @@ -83,7 +83,7 @@ class to render the layout and SessionManager to keep track of each user session
elif constants.tcapy_version == 'test_tcapy' or constants.tcapy_version == 'gen':
from tcapygen.layoutboardgen import *;

layout = LayoutDashImplBoardGen(url_prefix=url_prefix)
layout = LayoutDashImplBoardGen(app=app, url_prefix=url_prefix, constants=constants)
from tcapygen.tcacallerboardgen import TCACallerImplBoardGen as TCACaller

# you can add your own additional layout versions here
Expand Down
2 changes: 1 addition & 1 deletion tcapy_notebooks/a_10_minute_view_of_tcapy.ipynb
Expand Up @@ -2818,7 +2818,7 @@
"name": "python",
"nbconvert_exporter": "python",
"pygments_lexer": "ipython3",
"version": "3.7.9"
"version": "3.7.6"
},
"toc": {
"base_numbering": 1,
Expand Down
2 changes: 1 addition & 1 deletion tcapy_notebooks/real_life_tcapy_case_study.ipynb
Expand Up @@ -1973,7 +1973,7 @@
"name": "python",
"nbconvert_exporter": "python",
"pygments_lexer": "ipython3",
"version": "3.7.9"
"version": "3.7.6"
},
"toc": {
"base_numbering": 1,
Expand Down
2 changes: 1 addition & 1 deletion tcapy_scripts/gen/upload_market_parquet_csv_hdf5.py
Expand Up @@ -43,7 +43,7 @@

csv_folder = '/data/csv_dump/' + data_vendor + '/'

if_exists_table = 'replace' # 'replace' or 'append' to database table
if_exists_table = 'append' # 'replace' or 'append' to database table
if_append_replace_ticker = 'replace' # 'replace' or 'append' to ticker

file_extension = 'parquet' # 'parquet' (recommended) or 'csv' or 'h5' on disk
Expand Down

0 comments on commit 3db2deb

Please sign in to comment.