# Pipeline

This notebook is from the official Quantopian Guide on Pipelines. Make sure to visit their documentation for many more great resources!

Many trading algorithms have the following structure:

1. For each asset in a known (large) set, compute N scalar values for the asset based on a trailing window of data.
2. Select a smaller tradeable set of assets based on the values computed in (1).
3. Calculate desired portfolio weights on the set of assets selected in (2).
4. Place orders to move the algorithm’s current portfolio allocations to the desired weights computed in (3).

There are several technical challenges with doing this robustly. These include:

* efficiently querying large sets of assets
* performing computations on large sets of assets
* handling adjustments (splits and dividends)
* asset delistings

Pipeline exists to solve these challenges by providing a uniform API for expressing computations on a diverse collection of datasets.

## Factors
A factor is a function from an asset and a moment in time to a numerical value.

A simple example of a factor is the most recent price of a security. Given a security and a specific point in time, the most recent price is a number. Another example is the 10-day average trading volume of a security. Factors are most commonly used to assign values to securities which can then be used in a number of ways. A factor can be used in each of the following procedures:
* computing target weights
* generating alpha signal
* constructing other, more complex factors
* constructing filters

## Filters
A filter is a function from an asset and a moment in time to a boolean.
An example of a filter is a function indicating whether a security's price is below $10. Given a security and a point in time, this evaluates to either True or False. Filters are most commonly used for describing sets of assets to include or exclude for some particular purpose.

## Classifiers
A classifier is a function from an asset and a moment in time to a categorical output.
More specifically, a classifier produces a string or an int that doesn't represent a numerical value (e.g. an integer label such as a sector code). Classifiers are most commonly used for grouping assets for complex transformations on Factor outputs. An example of a classifier is the exchange on which an asset is currently being traded.

In [1]:
from quantopian.pipeline import Pipeline

In [22]:
def make_pipeline():
    return Pipeline()

In [23]:
pipe = make_pipeline()

In [24]:
from quantopian.research import run_pipeline

In [25]:
result = run_pipeline(pipe,'2017-01-01','2017-01-01')

In [26]:
result.head(10)

Unnamed: 0,Unnamed: 1
2017-01-03 00:00:00+00:00,Equity(2 [ARNC])
2017-01-03 00:00:00+00:00,Equity(21 [AAME])
2017-01-03 00:00:00+00:00,Equity(24 [AAPL])
2017-01-03 00:00:00+00:00,Equity(25 [ARNC_PR])
2017-01-03 00:00:00+00:00,Equity(31 [ABAX])
2017-01-03 00:00:00+00:00,Equity(39 [DDC])
2017-01-03 00:00:00+00:00,Equity(41 [ARCB])
2017-01-03 00:00:00+00:00,Equity(52 [ABM])
2017-01-03 00:00:00+00:00,Equity(53 [ABMD])
2017-01-03 00:00:00+00:00,Equity(62 [ABT])


In [27]:
result.info()

<class 'pandas.core.frame.DataFrame'>
MultiIndex: 8343 entries, (2017-01-03 00:00:00+00:00, Equity(2 [ARNC])) to (2017-01-03 00:00:00+00:00, Equity(50569 [OUSM]))
Empty DataFrame

# Data

In [36]:
from quantopian.pipeline.data.builtin import USEquityPricing

## Factors

Remember, Factors take in an asset and a timestamp and return some numerical value.

In [38]:
from quantopian.pipeline.factors import BollingerBands,SimpleMovingAverage,EWMA

In [40]:
SimpleMovingAverage(inputs=[USEquityPricing.close],window_length=30)

SimpleMovingAverage((USEquityPricing.close::float64,), window_length=30)

In [41]:
def make_pipeline():
    
    mean_close_30 = SimpleMovingAverage(inputs=[USEquityPricing.close],window_length=30)
    
    return Pipeline(columns={
        '30 Day Mean Close':mean_close_30
    })

In [46]:
results = run_pipeline(make_pipeline(),'2017-01-01','2017-01-01')

In [48]:
results.head(20)

Unnamed: 0,Unnamed: 1,30 Day Mean Close
2017-01-03 00:00:00+00:00,Equity(2 [ARNC]),20.1105
2017-01-03 00:00:00+00:00,Equity(21 [AAME]),3.899241
2017-01-03 00:00:00+00:00,Equity(24 [AAPL]),113.368433
2017-01-03 00:00:00+00:00,Equity(25 [ARNC_PR]),86.796111
2017-01-03 00:00:00+00:00,Equity(31 [ABAX]),52.498394
2017-01-03 00:00:00+00:00,Equity(39 [DDC]),9.523
2017-01-03 00:00:00+00:00,Equity(41 [ARCB]),29.969167
2017-01-03 00:00:00+00:00,Equity(52 [ABM]),42.138239
2017-01-03 00:00:00+00:00,Equity(53 [ABMD]),114.030167
2017-01-03 00:00:00+00:00,Equity(62 [ABT]),38.664333


In [49]:
def make_pipeline():
    
    mean_close_30 = SimpleMovingAverage(inputs=[USEquityPricing.close],window_length=30)
    latest_close = USEquityPricing.close.latest
    
    return Pipeline(columns={
        '30 Day Mean Close':mean_close_30,
        'Latest Close':latest_close
    })

In [50]:
results = run_pipeline(make_pipeline(),'2017-01-01','2017-01-01')

In [52]:
results.head(10)

Unnamed: 0,Unnamed: 1,30 Day Mean Close,Latest Close
2017-01-03 00:00:00+00:00,Equity(2 [ARNC]),20.1105,18.55
2017-01-03 00:00:00+00:00,Equity(21 [AAME]),3.899241,4.1
2017-01-03 00:00:00+00:00,Equity(24 [AAPL]),113.368433,115.84
2017-01-03 00:00:00+00:00,Equity(25 [ARNC_PR]),86.796111,
2017-01-03 00:00:00+00:00,Equity(31 [ABAX]),52.498394,52.74
2017-01-03 00:00:00+00:00,Equity(39 [DDC]),9.523,9.69
2017-01-03 00:00:00+00:00,Equity(41 [ARCB]),29.969167,27.75
2017-01-03 00:00:00+00:00,Equity(52 [ABM]),42.138239,40.68
2017-01-03 00:00:00+00:00,Equity(53 [ABMD]),114.030167,112.7
2017-01-03 00:00:00+00:00,Equity(62 [ABT]),38.664333,38.42


## Combining Factors

In [54]:
def make_pipeline():
    
    mean_close_10 = SimpleMovingAverage(inputs=[USEquityPricing.close],window_length=10)
    mean_close_30 = SimpleMovingAverage(inputs=[USEquityPricing.close],window_length=30)
    latest_close = USEquityPricing.close.latest
    
    percent_difference = (mean_close_10-mean_close_30) / mean_close_30
    
    return Pipeline(columns={
        'Percent Difference':percent_difference,
        '30 Day Mean Close':mean_close_30,
        'Latest Close':latest_close
    })

In [55]:
results = run_pipeline(make_pipeline(),'2017-01-01','2017-01-01')

In [56]:
results.head()

Unnamed: 0,Unnamed: 1,30 Day Mean Close,Latest Close,Percent Difference
2017-01-03 00:00:00+00:00,Equity(2 [ARNC]),20.1105,18.55,-0.022749
2017-01-03 00:00:00+00:00,Equity(21 [AAME]),3.899241,4.1,-0.005499
2017-01-03 00:00:00+00:00,Equity(24 [AAPL]),113.368433,115.84,0.028481
2017-01-03 00:00:00+00:00,Equity(25 [ARNC_PR]),86.796111,,-0.000474
2017-01-03 00:00:00+00:00,Equity(31 [ABAX]),52.498394,52.74,-0.007665


# Filters and Screens

Filters take in an asset and a timestamp and return a boolean

In [57]:
last_close_price = USEquityPricing.close.latest
close_price_filter = last_close_price > 20

In [58]:
close_price_filter

NumExprFilter(expr='x_0 > (20.0)', bindings={'x_0': Latest((USEquityPricing.close::float64,), window_length=1)})

In [59]:
def make_pipeline():
    
    mean_close_10 = SimpleMovingAverage(inputs=[USEquityPricing.close],window_length=10)
    mean_close_30 = SimpleMovingAverage(inputs=[USEquityPricing.close],window_length=30)
    latest_close = USEquityPricing.close.latest
    
    percent_difference = (mean_close_10-mean_close_30) / mean_close_30
    
    perc_diff_check = percent_difference > 0 
    
    return Pipeline(columns={
        'Percent Difference':percent_difference,
        '30 Day Mean Close':mean_close_30,
        'Latest Close':latest_close,
        'Positive Percent Diff': perc_diff_check
    })

In [60]:
results = run_pipeline(make_pipeline(),'2017-01-01','2017-01-01')
results.head()

Unnamed: 0,Unnamed: 1,30 Day Mean Close,Latest Close,Percent Difference,Positive Percent Diff
2017-01-03 00:00:00+00:00,Equity(2 [ARNC]),20.1105,18.55,-0.022749,False
2017-01-03 00:00:00+00:00,Equity(21 [AAME]),3.899241,4.1,-0.005499,False
2017-01-03 00:00:00+00:00,Equity(24 [AAPL]),113.368433,115.84,0.028481,True
2017-01-03 00:00:00+00:00,Equity(25 [ARNC_PR]),86.796111,,-0.000474,False
2017-01-03 00:00:00+00:00,Equity(31 [ABAX]),52.498394,52.74,-0.007665,False


## Screens

In [61]:
def make_pipeline():
    
    mean_close_10 = SimpleMovingAverage(inputs=[USEquityPricing.close],window_length=10)
    mean_close_30 = SimpleMovingAverage(inputs=[USEquityPricing.close],window_length=30)
    latest_close = USEquityPricing.close.latest
    
    percent_difference = (mean_close_10-mean_close_30) / mean_close_30
    
    perc_diff_check = percent_difference > 0 
    
    return Pipeline(columns={
                            'Percent Difference':percent_difference,
                            '30 Day Mean Close':mean_close_30,
                            'Latest Close':latest_close,
                            'Positive Percent Diff': perc_diff_check},
                    screen=perc_diff_check)

In [62]:
results = run_pipeline(make_pipeline(),'2017-01-01','2017-01-01')
results.head()

Unnamed: 0,Unnamed: 1,30 Day Mean Close,Latest Close,Percent Difference,Positive Percent Diff
2017-01-03 00:00:00+00:00,Equity(24 [AAPL]),113.368433,115.84,0.028481,True
2017-01-03 00:00:00+00:00,Equity(66 [AB]),23.119167,23.45,0.004578,True
2017-01-03 00:00:00+00:00,Equity(69 [ACAT]),15.8395,15.02,0.009375,True
2017-01-03 00:00:00+00:00,Equity(70 [VBF]),18.20848,18.49,0.011814,True
2017-01-03 00:00:00+00:00,Equity(84 [ACET]),20.722753,21.97,0.03963,True


### Reverse a screen

In [63]:
def make_pipeline():
    
    mean_close_10 = SimpleMovingAverage(inputs=[USEquityPricing.close],window_length=10)
    mean_close_30 = SimpleMovingAverage(inputs=[USEquityPricing.close],window_length=30)
    latest_close = USEquityPricing.close.latest
    
    percent_difference = (mean_close_10-mean_close_30) / mean_close_30
    
    perc_diff_check = percent_difference > 0 
    
    return Pipeline(columns={
                            'Percent Difference':percent_difference,
                            '30 Day Mean Close':mean_close_30,
                            'Latest Close':latest_close,
                            'Positive Percent Diff': perc_diff_check},
                    screen=~perc_diff_check)

In [64]:
results = run_pipeline(make_pipeline(),'2017-01-01','2017-01-01')
results.head()

Unnamed: 0,Unnamed: 1,30 Day Mean Close,Latest Close,Percent Difference,Positive Percent Diff
2017-01-03 00:00:00+00:00,Equity(2 [ARNC]),20.1105,18.55,-0.022749,False
2017-01-03 00:00:00+00:00,Equity(21 [AAME]),3.899241,4.1,-0.005499,False
2017-01-03 00:00:00+00:00,Equity(25 [ARNC_PR]),86.796111,,-0.000474,False
2017-01-03 00:00:00+00:00,Equity(31 [ABAX]),52.498394,52.74,-0.007665,False
2017-01-03 00:00:00+00:00,Equity(39 [DDC]),9.523,9.69,-0.015436,False


## Combine Filters

In [65]:
def make_pipeline():
    
    mean_close_10 = SimpleMovingAverage(inputs=[USEquityPricing.close],window_length=10)
    mean_close_30 = SimpleMovingAverage(inputs=[USEquityPricing.close],window_length=30)
    latest_close = USEquityPricing.close.latest
    
    percent_difference = (mean_close_10-mean_close_30) / mean_close_30
    
    perc_diff_check = percent_difference > 0 
    small_price = latest_close < 5
    
    final_filter = perc_diff_check & small_price
    
    return Pipeline(columns={
                            'Percent Difference':percent_difference,
                            '30 Day Mean Close':mean_close_30,
                            'Latest Close':latest_close,
                            'Positive Percent Diff': perc_diff_check},
                    screen=final_filter)

In [66]:
results = run_pipeline(make_pipeline(),'2017-01-01','2017-01-01')
results.head()

Unnamed: 0,Unnamed: 1,30 Day Mean Close,Latest Close,Percent Difference,Positive Percent Diff
2017-01-03 00:00:00+00:00,Equity(535 [ARTW]),3.097778,3.4,0.013271,True
2017-01-03 00:00:00+00:00,Equity(677 [AXAS]),2.265333,2.56,0.145527,True
2017-01-03 00:00:00+00:00,Equity(1144 [BTX]),3.531167,3.62,0.065795,True
2017-01-03 00:00:00+00:00,Equity(1323 [CAW]),2.541333,2.6,0.016002,True
2017-01-03 00:00:00+00:00,Equity(1546 [CIF]),2.50037,2.57,0.015579,True


# Masking

Sometimes we want to ignore certain assets when computing pipeline expresssions. There are two common cases where ignoring assets is useful:
* We want to compute an expression that's computationally expensive, and we know we only care about results for certain assets.
* We want to compute an expression that performs comparisons between assets, but we only want those comparisons to be performed against a subset of all assets. 

In [83]:
def make_pipeline():
    
    # Create Filters for Masks First
    latest_close = USEquityPricing.close.latest
    small_price = latest_close < 5
    
    # Pass in the mask
    mean_close_10 = SimpleMovingAverage(inputs=[USEquityPricing.close],window_length=10,mask=small_price)
    mean_close_30 = SimpleMovingAverage(inputs=[USEquityPricing.close],window_length=30,mask=small_price)
    
    
    percent_difference = (mean_close_10-mean_close_30) / mean_close_30
    
    perc_diff_check = percent_difference > 0 
    
    
    final_filter = perc_diff_check
    
    return Pipeline(columns={
                            'Percent Difference':percent_difference,
                            '30 Day Mean Close':mean_close_30,
                            'Latest Close':latest_close,
                            'Positive Percent Diff': perc_diff_check},
                    screen=final_filter)

In [84]:
results = run_pipeline(make_pipeline(),'2017-01-01','2017-01-01')
results.head()

Unnamed: 0,Unnamed: 1,30 Day Mean Close,Latest Close,Percent Difference,Positive Percent Diff
2017-01-03 00:00:00+00:00,Equity(535 [ARTW]),3.097778,3.4,0.013271,True
2017-01-03 00:00:00+00:00,Equity(677 [AXAS]),2.265333,2.56,0.145527,True
2017-01-03 00:00:00+00:00,Equity(1144 [BTX]),3.531167,3.62,0.065795,True
2017-01-03 00:00:00+00:00,Equity(1323 [CAW]),2.541333,2.6,0.016002,True
2017-01-03 00:00:00+00:00,Equity(1546 [CIF]),2.50037,2.57,0.015579,True


In [85]:
len(results)

391

# Classifiers

A classifier is a function from an asset and a moment in time to a categorical output such as a string or integer label.

In [74]:
from quantopian.pipeline.data import morningstar
from quantopian.pipeline.classifiers.morningstar import Sector

In [75]:
morningstar_sector = Sector()

In [76]:
exchange = morningstar.share_class_reference.exchange_id.latest

In [77]:
exchange

Latest((share_class_reference.exchange_id::object,), window_length=1)

### Classifier Methods

* eq (equals)
* isnull
* startswith

In [79]:
nyse_filter = exchange.eq('NYS')

In [80]:
def make_pipeline():
    
    # Create Filters for Masks First
    latest_close = USEquityPricing.close.latest
    small_price = latest_close < 5
    
    # Classifier
    nyse_filter = exchange.eq('NYS')
    
    # Pass in the mask
    mean_close_10 = SimpleMovingAverage(inputs=[USEquityPricing.close],window_length=10,mask=small_price)
    mean_close_30 = SimpleMovingAverage(inputs=[USEquityPricing.close],window_length=30,mask=small_price)
    
    
    percent_difference = (mean_close_10-mean_close_30) / mean_close_30
    
    perc_diff_check = percent_difference > 0 
    
    
    final_filter = perc_diff_check & nyse_filter
    
    return Pipeline(columns={
                            'Percent Difference':percent_difference,
                            '30 Day Mean Close':mean_close_30,
                            'Latest Close':latest_close,
                            'Positive Percent Diff': perc_diff_check},
                    screen=final_filter)

In [81]:
results = run_pipeline(make_pipeline(),'2017-01-01','2017-01-01')
results.head()

Unnamed: 0,Unnamed: 1,30 Day Mean Close,Latest Close,Percent Difference,Positive Percent Diff
2017-01-03 00:00:00+00:00,Equity(2586 [EQS]),1.960533,2.02,0.02212,True
2017-01-03 00:00:00+00:00,Equity(3265 [GLF]),1.576367,1.725,0.16242,True
2017-01-03 00:00:00+00:00,Equity(3645 [HOV]),2.406667,2.735,0.176939,True
2017-01-03 00:00:00+00:00,Equity(4577 [LUB]),4.292333,4.27,0.004116,True
2017-01-03 00:00:00+00:00,Equity(4971 [RT]),3.244,3.24,0.009094,True


In [82]:
len(results)

67

# Pipelines in Quantopian IDE

In [None]:
from quantopian.pipeline import Pipeline
from quantopian.algorithm import attach_pipeline, pipeline_output

def initialize(context):
    my_pipe = make_pipeline()
    attach_pipeline(my_pipe, 'my_pipeline')

def make_pipeline():
    return Pipeline()

def before_trading_start(context, data):
    # Store our pipeline output DataFrame in context.
    context.output = pipeline_output('my_pipeline')