# Zipline-Broker Pipeline and Fundamentals Examples Notebook
### Examples of using zipline-broker pipeline with fundamentals, as an asset screener and a factor constructor

#### Date: 2021-05-20

In [1]:
# Modified from :
# https://zipline-trader.readthedocs.io/en/1.5.0/notebooks/SimplePipeline.html
import os
import pandas as pd

#Fill in Start and End Dates for this notebook.
START_DATE = start_date = pd.Timestamp('2019-01-02', tz='utc')
END_DATE   = end_date   = pd.Timestamp('2021-05-19', tz='utc')

#os.environ['ZIPLINE_ROOT'] = os.path.join(os.getcwd(), '.zipline')
os.listdir(os.environ['ZIPLINE_ROOT'])
import zipline
from zipline.data import bundles

bundle_name = 'sharadar-prices'
bundle_data = bundles.load(bundle_name)
from zipline.pipeline.loaders import USEquityPricingLoader
from zipline.utils.calendars import get_calendar
from zipline.pipeline.data import USEquityPricing
from zipline.data.data_portal import DataPortal

# Set the dataloader
pricing_loader = USEquityPricingLoader(bundle_data.equity_daily_bar_reader, bundle_data.adjustment_reader)
#new pricing_loader = USEquityPricingLoader.without_fx(bundle_data.equity_daily_bar_reader, bundle_data.adjustment_reader)


# Define the function for the get_loader parameter
def choose_loader(column):
    if column not in (USEquityPricing.columns or Fundamentals.columns):
        raise Exception('Column not in USEquityPricing|Fundamentals')
    return pricing_loader

# Set the trading calendar
trading_calendar = get_calendar('NYSE')

# Create a data portal
data_portal = DataPortal(bundle_data.asset_finder,
                         trading_calendar = trading_calendar,
                         first_trading_day = start_date,
                         equity_daily_reader = bundle_data.equity_daily_bar_reader,
                         adjustment_reader = bundle_data.adjustment_reader)

  from ._conv import register_converters as _register_converters
You can access NaTType as type(pandas.NaT)
  @convert.register((pd.Timestamp, pd.Timedelta), (pd.tslib.NaTType, type(None)))


In [2]:
from zipline.pipeline.loaders.blaze import BlazeLoader, from_blaze
from zipline.utils.run_algo import load_extensions
import alphatools.fundamentals as fundies
from alphatools.fundamentals import Fundamentals

fd=Fundamentals()


['2021-06-12T02;18;33']
cur_folder= /home/hca-ws2004/hca/alphatools/alphatools/fundamentals
enac_fundamentals_pkl= /home/hca-ws2004/zipline-broker/data/fundem-sharadar-sf1/2021-06-12T02;18;33/quandal_sharadar_sf1.pkl
Non-sid Fund Count=False    103151
True       3963
Name: sid, dtype: int64
Non-sid Fund TotalCount=3963

SF1 Table needs to extend sessions from:max datekey:2021-06-11 tp  current date:2021-06-13 02:59:30.407135 ExtendRange:DatetimeIndex(['2021-06-11'], dtype='datetime64[ns, UTC]', freq='C')

Adding fundamental:fcf
Adding fundamental:de
Adding fundamental:marketcap
Adding fundamental:debtnc
Adding fundamental:equityusd
Adding tickers:category
Adding tickers:exchange
Adding tickers:isdelisted


In [None]:
#fundies.fundamentals.df_loaders

In [3]:

# Load extensions.py; this allows you access to custom bundles
load_extensions(
    default=True,
    extensions=[],
    strict=True,
    environ=os.environ,
)

# Set-Up Pricing Data Access
trading_calendar = get_calendar('NYSE')
bundle = 'sharadar-prices' #'quandl'
bundle_data = bundles.load(bundle)

loaders = fundies.fundamentals.df_loaders

# create and empty BlazeLoader
blaze_loader = BlazeLoader()

def my_dispatcher(column):
    return loaders[column]

pipeline_loader = USEquityPricingLoader(
    bundle_data.equity_daily_bar_reader,
    bundle_data.adjustment_reader,
)

def choose_loader(column):
    if column in USEquityPricing.columns:
        return pipeline_loader
    try:
        return my_dispatcher(column)
    except:
        pass
    return blaze_loader


extension: hca_root_path = /home/hca-ws2004/hca
extension:TODAY_STR = 2021-06-12
extension:TWO_YR_AGO_STR = 2019-06-12
extension:  start_date=2019-06-12 end_date = 2021-06-12


In [4]:
from zipline.utils.calendars import get_calendar
from zipline.pipeline.data import USEquityPricing
from zipline.pipeline.engine import SimplePipelineEngine

# Create a Pipeline engine
engine = SimplePipelineEngine(get_loader = choose_loader,
                              asset_finder = bundle_data.asset_finder, calendar=trading_calendar.all_sessions)

In [5]:
#from zipline.pipeline.domain import US_EQUITIES
from zipline.pipeline.factors import AverageDollarVolume, SimpleMovingAverage
from zipline.pipeline import Pipeline
DomComStk_lst= [
        'Domestic Common Stock',
         #'ADR Common Stock',
        'Domestic Common Stock Primary Class',
         #'Canadian Common Stock',
         #'ADR Common Stock Primary Class',
         #'Canadian Common Stock Primary Class',
         #'Domestic Common Stock Secondary Class', 'Domestic Stock Warrant',
         #'Domestic Preferred Stock', 'ADR Stock Warrant',
         #'ADR Preferred Stock', 'ADR Common Stock Secondary Class',
         #'Canadian Stock Warrant', 'Canadian Preferred Stock', nan, 'ETF',
         #'CEF', 'ETN', 'ETD', 'IDX'
  ]  
# Create a screen for our Pipeline

cat_d = fd.category.latest

exchange_d = fd.exchange.latest

isdelisted_d = fd.isdelisted.latest

#adv5000 = AverageDollarVolume(window_length = 30).top(100)
#mcap3000 = fd.marketcap.latest.top(100)
#universe = adv5000 & mcap3000

universe = cat_d.startswith(DomComStk_lst[0])
universe = universe | cat_d.startswith(DomComStk_lst[1])

# Create an empty Pipeline with the given screen
pipeline = Pipeline(screen = universe)
#new pipeline = Pipeline(screen = universe, domain=US_EQUITIES)
#pipeline.add(AverageDollarVolume(window_length = 5), "DV")
#pipeline.add(fd.marketcap.latest, "MC")

#pipeline.add(fd.liabilities.latest, "liabilities")
#pipeline.add(fd.liabilitiesnc.latest, "liabilitiesnc")
#pipeline.add(fd.assets.latest, "assets")
#pipeline.add(fd.equity.latest, "equity")
#pipeline.add(fd.ev.latest, "ev")
#pipeline.add(fd.ebt.latest, "ebt")
#pipeline.add(fd.ebit.latest, "ebit")
#pipeline.add(fd.cashneq.latest, "cashneq")

pipeline.add(fd.debtnc.latest, "debtnc")
pipeline.add(fd.equityusd.latest, "equityusd")
pipeline.add(fd.category.latest, "cat")
pipeline.add(fd.exchange.latest, "exchange")
pipeline.add(fd.isdelisted.latest, "isdelisted")

In [None]:
#def make_pipeline(): 
#    mean_close_10 = SimpleMovingAverage(inputs=[USEquityPricing.close],window_length=10) 
#    mean_close_30 = SimpleMovingAverage(inputs=[USEquityPricing.close],window_length=30) 
#    
#    latest_close = USEquityPricing.close.latest  
#    
#    perc_diff = (mean_close_10 - mean_close_30) / mean_close_30  
#    
#    return Pipeline(columns={ 'Percent Difference':perc_diff, '30 Day Mean Close':mean_close_30, 'Latest Close':latest_close })

In [6]:
# Set the start and end dates
start_date = START_DATE
end_date = END_DATE

# Run our pipeline for the given start and end dates
pipeline_output = engine.run_pipeline(pipeline, start_date, end_date)
#pipeline_output = engine.run_pipeline(make_pipeline(), start_date, end_date)

pipeline_output


FrameLoad: Dates:01-02-2019 --> 05-19-2021 columns:[Fundamentals.debtnc::float64] CountNonNan:2336981
FrameLoad: Dates:01-02-2019 --> 05-19-2021 columns:[Fundamentals.equityusd::float64] CountNonNan:2909584
FrameLoad: Dates:01-02-2019 --> 05-19-2021 columns:[Fundamentals.isdelisted::object] TotalCount:5626200
FrameLoad: Dates:01-02-2019 --> 05-19-2021 columns:[Fundamentals.category::object] TotalCount:5626200
FrameLoad: Dates:01-02-2019 --> 05-19-2021 columns:[Fundamentals.exchange::object] TotalCount:5626200


Unnamed: 0,Unnamed: 1,cat,debtnc,equityusd,exchange,isdelisted
2019-01-02 00:00:00+00:00,Equity(0 [A]),Domestic Common Stock Primary Class,1.799000e+09,4.567000e+09,NYSE,N
2019-01-02 00:00:00+00:00,Equity(1 [AA]),Domestic Common Stock Primary Class,1.820000e+09,5.216000e+09,NYSE,N
2019-01-02 00:00:00+00:00,Equity(7 [AACH]),Domestic Common Stock Primary Class,3.216020e+08,1.450660e+08,NYSE,N
2019-01-02 00:00:00+00:00,Equity(12 [AAIC]),Domestic Common Stock Primary Class,,3.106900e+08,NYSE,N
2019-01-02 00:00:00+00:00,Equity(17 [AAL]),Domestic Common Stock Primary Class,2.227400e+10,-5.680000e+08,NYSE,N
2019-01-02 00:00:00+00:00,Equity(18 [AAMC]),Domestic Common Stock Primary Class,0.000000e+00,-2.002780e+08,NYSE,N
2019-01-02 00:00:00+00:00,Equity(19 [AAME]),Domestic Common Stock Primary Class,,1.021180e+08,NYSE,N
2019-01-02 00:00:00+00:00,Equity(21 [AAOI]),Domestic Common Stock Primary Class,6.289600e+07,3.354850e+08,NYSE,N
2019-01-02 00:00:00+00:00,Equity(22 [AAON]),Domestic Common Stock Primary Class,0.000000e+00,2.496290e+08,NYSE,N
2019-01-02 00:00:00+00:00,Equity(23 [AAP]),Domestic Common Stock Primary Class,1.045398e+09,3.659636e+09,NYSE,N


In [9]:
from alphatools.fundamentals import Fundamentals
from zipline.pipeline.data import USEquityPricing as USEP
from zipline.pipeline.factors import AverageDollarVolume, SimpleMovingAverage, CustomFactor
from zipline.pipeline import Pipeline

import numpy as np

NUM_TOP_INDEBTED = 20

# Average Dollar Volume without nanmean, so that recent IPOs are truly removed
class ADV_adj(CustomFactor):
    inputs = [USEP.close, USEP.volume]
    window_length = 252

    def compute(self, today, assets, out, close, volume):
        close[np.isnan(close)] = 0
        out[:] = np.mean(close * volume, 0)


def universe_filters():

    # Equities with an average daily volume greater than 750000.
    high_volume = AverageDollarVolume(window_length=252) > 750000

    # Equities for which morningstar's most recent Market Cap value is above $300
    
    # Equities whose exchange id does not start with OTC (Over The Counter).
    # startswith() is a new method available only on string-dtype Classifiers.
    # It returns a Filter.
    #not_otc = ~mstar.share_class_reference.exchange_id.latest.startswith('OTC')
    cat_d = fd.category.latest
    cat_d_0 = cat_d.startswith(DomComStk_lst[0])
    cat_d_1 = cat_d.startswith(DomComStk_lst[1])
    exch_nyse_d = fd.exchange.latest.startswith('NAS')

    # Equities whose symbol (according to morningstar) ends with .WI
    # This generally indicates a "When Issued" offering.
    # endswith() works similarly to startswith().
    #not_wi = ~mstar.share_class_reference.symbol.latest.endswith('.WI')

    # Equities whose company name ends with 'LP' or a similar string.
    # The .matches() method uses the standard library `re` module to match
    # against a regular expression.
    #not_lp_name = ~mstar.company_reference.standard_name.latest.matches('.* L[\\. ]?P\.?$')

    # Equities with a null entry for the balance_sheet.limited_partnership field.
    # This is an alternative way of checking for LPs.
    #not_lp_balance_sheet = mstar.balance_sheet.limited_partnership.latest.isnull()

    # Highly liquid assets only. Also eliminates IPOs in the past 12 months
    # Use new average dollar volume so that unrecorded days are given value 0
    # and not skipped over
    # S&P Criterion
    
    liquid = ADV_adj()
    liq_f = liquid > 25000
    # Add logic when global markets supported
    # S&P Criterion
    #domicile = True

    #universe_filter = (high_volume & primary_share & have_market_cap & not_depositary &
    #                   common_stock & not_otc & not_wi & not_lp_name & not_lp_balance_sheet &
    #                  liquid & domicile)
    universe_filter = (high_volume & liq_f & (cat_d_0 | cat_d_1)) & exch_nyse_d
    #universe_filter = (high_volume)
    

    return universe_filter

def make_pipeline():
    # Base universe set to the Q500US
    universe = universe_filters() # Q3000US()
        # Create the factors we want use
    #rsi = RSI()
    price_close = USEP.close.latest
    #fd=Fundamentals()
    price_volm = USEP.volume.latest
    mc   = fd.marketcap
    de   = fd.de
    dnc  = fd.debtnc
    eusd = fd.equityusd
    fcf = fd.fcf
    
    exch = fd.exchange
    is_delisted = fd.isdelisted
    
    # Create a filter to select our 'universe'
    # Our universe is made up of stocks that have a non-null sentiment signal that was updated in
    # the last day, are not within 2 days of an earnings announcement, are not announced acquisition
    # targets, and are in the Q1500US.
    
    ltd_to_eq_rank = np.divide(dnc.latest, eusd.latest) #Fundamentals.long_term_debt_equity_ratio.latest
    # Create a screen for our Pipeline
    adv5000 = AverageDollarVolume(window_length = 30).percentile_between(80,100)
    mcap3000 = mc.latest.percentile_between(80,100) 
    universe = universe & adv5000 & mcap3000


    #adv5000 = AverageDollarVolume(window_length = 30).top(5000)
    #mcap3000 = mc.latest.top(3000)
    
    universe =  universe & adv5000 & mcap3000

    universe = universe & (fcf.latest > 1.5e8) & (mc.latest >25e6) & (price_close > 10.0) & (price_volm > 1500000) & (ltd_to_eq_rank < 32.0) #100000 is too big #10000 is too small. Cannot get subscription for ILTB
 
    de_f = de.latest #Fundamentals.long_term_debt_equity_ratio.latest
    #print(dir(universe))
    #universe=~universe.matches('.*[-]*$')

    indebted = ltd_to_eq_rank.top(NUM_TOP_INDEBTED, mask=universe) #10 30 150 60

    dnc_f  = dnc.latest
    eusd_f = eusd.latest
    fcf_f  = fcf.latest
    cat_val = fd.category.latest
    exch_val = exch.latest
    isdelisted_val = is_delisted.latest
    
    #mom    = Returns(inputs=[USEP.open],window_length=126,mask=indebted)
    #mom_av = SimpleMovingAverage(inputs=[mom],window_length=22,mask=indebted)

    pipe = Pipeline(columns={
        'category':cat_val,
        'exchange':exch_val,
        'isdelisted':isdelisted_val,
        'close':price_close,
        'volm' :price_volm,
        'ltd_to_eq_rank': ltd_to_eq_rank,
        'de'  : de_f,
        'dnc' : dnc_f,
        'eusd': eusd_f,
         'fcf': fcf_f,
        'adv': adv5000,
        'mcap': mcap3000,
        #' mom' : mom,
        # 'mom_av': mom_av
        },
                    screen=indebted)
    return pipe


In [10]:
# Set the start and end dates
start_date = START_DATE
end_date   = END_DATE

# Run our pipeline for the given start and end dates
#pipeline_output = engine.run_pipeline(pipeline, start_date, end_date)
pipe2_output = engine.run_pipeline(make_pipeline(), start_date, end_date)

pipe2_output.head(20)


FrameLoad: Dates:01-02-2019 --> 05-19-2021 columns:[Fundamentals.exchange::object] TotalCount:5896200
FrameLoad: Dates:01-02-2019 --> 05-19-2021 columns:[Fundamentals.equityusd::float64] CountNonNan:2909584
FrameLoad: Dates:01-02-2019 --> 05-19-2021 columns:[Fundamentals.fcf::float64] CountNonNan:2836432


  return (lower_bounds <= data) & (data <= upper_bounds)


FrameLoad: Dates:01-02-2019 --> 05-19-2021 columns:[Fundamentals.debtnc::float64] CountNonNan:2336981
FrameLoad: Dates:01-02-2019 --> 05-19-2021 columns:[Fundamentals.isdelisted::object] TotalCount:5896200
FrameLoad: Dates:01-02-2019 --> 05-19-2021 columns:[Fundamentals.de::float64] CountNonNan:2909506
FrameLoad: Dates:01-02-2019 --> 05-19-2021 columns:[Fundamentals.marketcap::float64] CountNonNan:2909584


  overwrite_input, interpolation)


FrameLoad: Dates:01-02-2019 --> 05-19-2021 columns:[Fundamentals.category::object] TotalCount:5896200


Unnamed: 0,Unnamed: 1,adv,category,close,de,dnc,eusd,exchange,fcf,isdelisted,ltd_to_eq_rank,mcap,volm
2019-01-03 00:00:00+00:00,Equity(6757 [NCLH]),True,Domestic Common Stock,42.38,1.471,5875252000.0,6198090000.0,NASDAQ,356056000.0,N,0.947913,True,1917427.0
2019-01-03 00:00:00+00:00,Equity(6907 [NLSN]),True,Domestic Common Stock,21.991,3.092,8304000000.0,4017000000.0,NASDAQ,266000000.0,N,2.067214,True,3462090.0
2019-01-03 00:00:00+00:00,Equity(7140 [NWL]),True,Domestic Common Stock,16.364,2.732,9296800000.0,5993300000.0,NASDAQ,499400000.0,N,1.551199,True,5207242.0
2019-01-03 00:00:00+00:00,Equity(7281 [OMC]),True,Domestic Common Stock,66.026,8.205,4357800000.0,2381700000.0,NASDAQ,307900000.0,N,1.829701,True,2175563.0
2019-01-03 00:00:00+00:00,Equity(7532 [PCAR]),True,Domestic Common Stock,53.528,1.737,9586900000.0,9171000000.0,NASDAQ,234400000.0,N,1.045349,True,1802959.0
2019-01-03 00:00:00+00:00,Equity(7616 [PEP]),True,Domestic Common Stock,101.547,6.148,30643000000.0,10286000000.0,NASDAQ,3088000000.0,N,2.979098,True,4835357.0
2019-01-03 00:00:00+00:00,Equity(8140 [QRTEA]),True,Domestic Common Stock,13.895,2.114,5885000000.0,5636000000.0,NASDAQ,211000000.0,N,1.04418,True,2206105.0
2019-01-03 00:00:00+00:00,Equity(8147 [QSR]),True,Domestic Common Stock,47.641,7.592,12007400000.0,2185400000.0,NASDAQ,354000000.0,N,5.494372,True,1898121.0
2019-01-03 00:00:00+00:00,Equity(8526 [RTX]),True,Domestic Common Stock,59.638,2.509,38275000000.0,32106000000.0,NASDAQ,1349000000.0,N,1.192145,True,5885580.0
2019-01-03 00:00:00+00:00,Equity(8654 [SBUX]),True,Domestic Common Stock,61.358,19.65,9090200000.0,1169500000.0,NASDAQ,7864800000.0,N,7.772723,True,10093978.0


In [11]:
print(pipe2_output.adv.sum())

11680


In [12]:
print(pipe2_output.info())

<class 'pandas.core.frame.DataFrame'>
MultiIndex: 11680 entries, (2019-01-03 00:00:00+00:00, Equity(6757 [NCLH])) to (2021-05-19 00:00:00+00:00, Equity(10124 [UPS]))
Data columns (total 12 columns):
adv               11680 non-null bool
category          11680 non-null category
close             11680 non-null float64
de                11680 non-null float64
dnc               11680 non-null float64
eusd              11680 non-null float64
exchange          11680 non-null category
fcf               11680 non-null float64
isdelisted        11680 non-null category
ltd_to_eq_rank    11680 non-null float64
mcap              11680 non-null bool
volm              11680 non-null float64
dtypes: bool(2), category(3), float64(7)
memory usage: 749.9+ KB
None


In [None]:
#pipe2_output.index.values[0:-1]

In [18]:
pipe2_output.count(axis='columns')

2019-01-03 00:00:00+00:00  Equity(6757 [NCLH])     12
                           Equity(6907 [NLSN])     12
                           Equity(7140 [NWL])      12
                           Equity(7281 [OMC])      12
                           Equity(7532 [PCAR])     12
                           Equity(7616 [PEP])      12
                           Equity(8140 [QRTEA])    12
                           Equity(8147 [QSR])      12
                           Equity(8526 [RTX])      12
                           Equity(8654 [SBUX])     12
                           Equity(8664 [SCCO])     12
                           Equity(8864 [SHW])      12
                           Equity(9066 [SO])       12
                           Equity(9248 [SSNC])     12
                           Equity(9361 [STX])      12
                           Equity(9467 [SYY])      12
                           Equity(9469 [T])        12
                           Equity(9609 [TEVA])     12
                           E

In [21]:
pipe2_output.index.get_level_values(0).unique()

DatetimeIndex(['2019-01-03', '2019-01-04', '2019-01-07', '2019-01-08',
               '2019-01-09', '2019-01-10', '2019-01-11', '2019-01-14',
               '2019-01-15', '2019-01-17',
               ...
               '2021-05-06', '2021-05-07', '2021-05-10', '2021-05-11',
               '2021-05-12', '2021-05-13', '2021-05-14', '2021-05-17',
               '2021-05-18', '2021-05-19'],
              dtype='datetime64[ns, UTC]', length=589, freq=None)

In [17]:
dir(pipe2_output.index)

['T',
 '_MultiIndex__bounds',
 '_MultiIndex__set_labels',
 '_MultiIndex__set_levels',
 '__abs__',
 '__add__',
 '__and__',
 '__array__',
 '__array_priority__',
 '__array_wrap__',
 '__bool__',
 '__bytes__',
 '__class__',
 '__contains__',
 '__copy__',
 '__deepcopy__',
 '__delattr__',
 '__dict__',
 '__dir__',
 '__doc__',
 '__eq__',
 '__floordiv__',
 '__format__',
 '__ge__',
 '__getattribute__',
 '__getitem__',
 '__getslice__',
 '__gt__',
 '__hash__',
 '__iadd__',
 '__init__',
 '__init_subclass__',
 '__inv__',
 '__iter__',
 '__le__',
 '__len__',
 '__lt__',
 '__module__',
 '__mul__',
 '__ne__',
 '__neg__',
 '__new__',
 '__nonzero__',
 '__or__',
 '__pos__',
 '__pow__',
 '__radd__',
 '__reduce__',
 '__reduce_ex__',
 '__repr__',
 '__rfloordiv__',
 '__rmul__',
 '__rpow__',
 '__rtruediv__',
 '__setattr__',
 '__setitem__',
 '__setstate__',
 '__sizeof__',
 '__str__',
 '__sub__',
 '__subclasshook__',
 '__truediv__',
 '__unicode__',
 '__weakref__',
 '__xor__',
 '_accessors',
 '_add_comparison_methods