We are going to create a custom bundle for Zipline using Stock data; see download instructions first.

## We will take the following steps:

Create several data files containing information on tickers, prices, and adjustments
Code up a Zipline ingest function that handles the data processing and storage
Define a Zipline extension that registers the new bundle
Place the files in the Zipline_ROOT directory to ensure the Zipline ingest command finds them
Setup
Zipline permits the creation of custom bundle containing open, high, low, close and volume (OHCLV) information, as well as adjustments like stock splits and dividend payments.

It stores the data per default a .Zipline directory in the user's home directory, ~/.Zipline. However, you can modify the target location by setting the Zipline_ROOT environment variable as we do for the docker images provided with this book.

# Data preprocessing
To prepare the data, we create three kinds of data tables in HDF5 format:

equities: contains a unique sid, the ticker, and a name for the security.
price tables with OHLCV data for each of the assets, named jp.<sid>
splits: contains split factors and is required; our data is already adjusted so we just add one line with a factor of 1.0 for one
The file stooq_preprocessing implements these steps and produces the tables in the HDF5 file stooq.h5.

### Zipline ingest function
The file stooq_jp_stocks.py defines a function stooq_jp_to_bundle(interval='1d') that returns the ingest function required by Zipline to produce a custom bundle (see docs. It needs to have the following signature:

ingest(environ,
       asset_db_writer,
       minute_bar_writer,
       daily_bar_writer,
       adjustment_writer,
       calendar,
       start_session,
       end_session,
       cache,
       show_progress,
       output_dir)
This function loads the information we crated in the previous step during the ingest process. It consists of a data_generator() that loads (sid, ticker) tuples as needed, and produces the corresponding OHLCV info in the correct format. It also adds information about the exchange so Zipline can associate the right calendar, and the range of trading dates.

It also loads the adjustment data, which in this case does not play an active role.

## Bundle registration
Zipline needs to know that the bundle exists and how to create the ingest function we just defined. To this end, we create an extension.py file that communicates the bundle's name, where to find the function that returns the ingest function (namely stooq_jp_to_bundle() in stooq_jp_stocks.py), and indicates the trading calendar to use (NYMEX for NY's exchange).

### File locations
Finally, we need to put these files in the right locations so that Zipline finds them. We can use symbolic links while keeping the actual files in this directory.

More specifically, we'll create symbolic links to

to stooq_jp_stocks.py in the ZIPLINE_ROOT directory, and
to stooq.h5 in ZIPLINE_ROOT/custom_data
In Linux or MacOSX, this implies opening the shell and running the following commands (where PROJECT_DIR refers to absolute path to the root folder of this repository on your machine)

cd $ZIPLINE_ROOT
ln -s PROJECT_DIR/11_decision_trees_random_forests/00_custom_bundle/stooq_jp_stocks.py
ln -s PROJECT_DIR/machine-learning-for-trading/11_decision_trees_random_forests/00_custom_bundle/extension.py .
mkdir custom_data
ln -s PROJECT_DIR/11_decision_trees_random_forests/00_custom_bundle/stooq.h5 custom_data/.
As a result, your directory structure should look as follows (some of these files will be symbolic links):

In [None]:
import sys
from pathlib import Path

sys.path.append(Path('~', '.zipline').expanduser().as_posix())
from zipline.data.bundles import register
from stooq_jp_stocks import stooq_jp_to_bundle
from datetime import time
from pytz import timezone


register('stooq',
         stooq_jp_to_bundle(),
         calendar_name='XTKS',
         )

#### cleaning up

In [None]:
from pathlib import Path
import os
import numpy as np
import pandas as pd

pd.set_option('display.expand_frame_repr', False)
np.random.seed(42)


zipline_root = None

try:
    zipline_root = os.environ['ZIPLINE_ROOT']
except KeyError:
    print('Please ensure a ZIPLINE_ROOT environment variable is defined and accessible '
          '(or alter the script and manually set the path')
    exit()

custom_data_path = Path(zipline_root, 'custom_data')

# custom_data_path = Path('~/.zipline/custom_data').expanduser()


def load_equities():
    return pd.read_hdf(custom_data_path / 'stooq.h5', 'jp/equities')


def ticker_generator():
    """
    Lazily return (sid, ticker) tuple
    """
    return (v for v in load_equities().values)


def data_generator():
    for sid, symbol, asset_name in ticker_generator():
        df = pd.read_hdf(custom_data_path / 'stooq.h5', 'jp/{}'.format(sid))

        start_date = df.index[0]
        end_date = df.index[-1]

        first_traded = start_date.date()
        auto_close_date = end_date + pd.Timedelta(days=1)
        exchange = 'XTKS'

        yield (sid, df), symbol, asset_name, start_date, end_date, first_traded, auto_close_date, exchange


def metadata_frame():
    dtype = [
        ('symbol', 'object'),
        ('asset_name', 'object'),
        ('start_date', 'datetime64[ns]'),
        ('end_date', 'datetime64[ns]'),
        ('first_traded', 'datetime64[ns]'),
        ('auto_close_date', 'datetime64[ns]'),
        ('exchange', 'object'), ]
    return pd.DataFrame(np.empty(len(load_equities()), dtype=dtype))


def stooq_jp_to_bundle(interval='1d'):
    def ingest(environ,
               asset_db_writer,
               minute_bar_writer,
               daily_bar_writer,
               adjustment_writer,
               calendar,
               start_session,
               end_session,
               cache,
               show_progress,
               output_dir
               ):
        metadata = metadata_frame()

        def daily_data_generator():
            return (sid_df for (sid_df, *metadata.iloc[sid_df[0]]) in data_generator())

        daily_bar_writer.write(daily_data_generator(), show_progress=True)

        metadata.dropna(inplace=True)
        asset_db_writer.write(equities=metadata)
        # empty DataFrame
        adjustment_writer.write(splits=pd.read_hdf(custom_data_path / 'stooq.h5', 'jp/splits'))

    return ingest

#### preprocessing data 

In [None]:
from pathlib import Path
import warnings
import pandas as pd

warnings.filterwarnings('ignore')

DATA_DIR = Path('..', '..', 'data')
idx = pd.IndexSlice


def create_split_table():
    with pd.HDFStore('stooq.h5') as store:
        store.put('jp/splits', pd.DataFrame(columns=['sid', 'effective_date', 'ratio'],
                                            data=[[1, pd.to_datetime('2010-01-01'), 1.0]]), format='t')


def load_prices():
    df = pd.read_hdf(DATA_DIR / 'assets.h5', 'stooq/jp/tse/stocks/prices')

    return (df.loc[idx[:, '2014': '2019'], :]
            .unstack('ticker')
            .sort_index()
            .tz_localize('UTC')
            .ffill(limit=5)
            .dropna(axis=1)
            .stack('ticker')
            .swaplevel())


def load_symbols(tickers):
    df = pd.read_hdf(DATA_DIR / 'assets.h5', 'stooq/jp/tse/stocks/tickers')
    return (df[df.ticker.isin(tickers)]
            .reset_index(drop=True)
            .reset_index()
            .rename(columns={'index': 'sid'}))


if __name__ == '__main__':
    prices = load_prices()
    print(prices.info(null_counts=True))
    tickers = prices.index.unique('ticker')

    symbols = load_symbols(tickers)
    print(symbols.info(null_counts=True))
    symbols.to_hdf('stooq.h5', 'jp/equities', format='t')

    dates = prices.index.unique('date')
    start_date = dates.min()
    end_date = dates.max()

    for sid, symbol in symbols.set_index('sid').symbol.items():
        p = prices.loc[symbol]
        p.to_hdf('stooq.h5', 'jp/{}'.format(sid), format='t')

    with pd.HDFStore('stooq.h5') as store:
        print(store.info())

    create_split_table()