## Preparing a Factor Ranking Model Using Zipline Pipelines

In [1]:
import os
import warnings

In [3]:
import numpy as np
import pandas as pd
from IPython.display import Markdown, display
from zipline.data import bundles
from zipline.data.bundles.core import load
from zipline.pipeline import Pipeline
from zipline.utils.run_algo import load_extensions
from zipline.pipeline.data import USEquityPricing
from zipline.pipeline.engine import SimplePipelineEngine
from zipline.pipeline.factors import (
    VWAP,
    AnnualizedVolatility,
    AverageDollarVolume,
    BollingerBands,
    CustomFactor,
    DailyReturns,
    ExponentialWeightedMovingAverage,
    MaxDrawdown,
    PercentChange,
    Returns,
    SimpleMovingAverage,
    WeightedAverageValue,
    Latest,
    TrueRange,
)
from zipline.pipeline.loaders import USEquityPricingLoader
import exchange_calendars as xcals

In [4]:
warnings.filterwarnings("ignore")

### Option 1: Use the built-in bundle with free data

This option uses the built-in data bundle provided by Zipline. It then acquires free US equities data that extend through 2018.

In [4]:
#os.environ["QUANDL_API_KEY"] = "unvCDgkmsr-UjQkKkzs9"
#bundle = "quandl"
#bundles.ingest(bundle)

### Option 2: Use the custom bundle with premium data

This option uses the custom bundle with premium data. Follow the steps here: https://pyquantnews.com/ingest-premium-market-data-with-zipline-reloaded/ before using.

In [5]:
os.environ["DATALINK_API_KEY"] = "unvCDgkmsr-UjQkKkzs9"
bundle = "quotemedia"

#from zipline import load_extensions

load_extensions(
    default=True,            # Load default extensions
    extensions=[],           # List of additional extensions
    strict=True,             # Raise errors if extensions fail to load
    environ=os.environ,      # Environment variables
)


In [6]:
#bundles.ingest(bundle)

Ingest the bundle data from your selected bundle.

In [6]:
bundle_data = load(bundle, os.environ, None)

Create a USEquityPricingLoader

In [7]:
pipeline_loader = USEquityPricingLoader(
    bundle_data.equity_daily_bar_reader, bundle_data.adjustment_reader, fx_reader=None
)

Initialize a SimplePipelineEngine

In [8]:
engine = SimplePipelineEngine(
    get_loader=lambda col: pipeline_loader, asset_finder=bundle_data.asset_finder
)

Define a custom momentum factor

In [9]:
class MomentumFactor(CustomFactor):
    """Momentum factor"""

    inputs = [USEquityPricing.close, Returns(window_length=126)]
    window_length = 252

    def compute(self, today, assets, out, prices, returns):
        out[:] = (
            (prices[-21] - prices[-252]) / prices[-252]
            - (prices[-1] - prices[-21]) / prices[-21]
        ) / np.nanstd(returns, axis=0)

Define a function to create a pipeline

In [11]:
def make_pipeline():
    momentum = MomentumFactor()
    dollar_volume = AverageDollarVolume(window_length=30)
    close_price = Latest(inputs=[USEquityPricing.close])

    # Calculate the 21-day returns
    returns_21 = Returns(window_length=21)
    returns_09 = Returns(window_length=9)

    # Define the longs and shorts based on the direction of the 21-day returns
    longs = (returns_21 > 0) & momentum.top(50)
    shorts = (returns_09 < 0) & momentum.bottom(50)

    return Pipeline(
        columns={
            "factor": momentum,
            "longs": momentum.top(50),
            #"longs": longs,
            "shorts": momentum.bottom(50),
            #"shorts": shorts,
            "rank": momentum.rank(),
        },
        #screen=dollar_volume.top(3000),
        screen=(close_price > 5) & (dollar_volume.top(3000)),
    )

Run the pipeline

In [7]:
import calendar

#xnys = xcals.get_calendar("XNYS")
#today = pd.Timestamp.today().strftime("%Y-%m-%d")
#start_date = xnys.session_offset(today, count=-252).strftime("%Y-%m-%d")
#end_date = xnys.session_offset(today, count=-1).strftime("%Y-%m-%d")

xnys = xcals.get_calendar("XNYS")
#today = pd.Timestamp.today().strftime("%Y-%m-%d")
today = pd.Timestamp.today()

# Check if a specific date is a trading session
# date = pd.Timestamp("2024-12-28")

# Function to get the last Friday's date
def get_last_friday(date):
    while date.weekday() != calendar.FRIDAY:  # 4 represents Friday
        date -= pd.Timedelta(days=1)
    return date

# Check if today is a weekend
if today.weekday() >= calendar.SATURDAY:  # 5 and 6 represent Saturday and Sunday
    session_date = get_last_friday(today)
    print(f"Today is a weekend, using last Friday's date: {session_date.strftime('%Y-%m-%d')}")
else:
    session_date = today
    print(f"Today is a weekday, using today's date: {session_date.strftime('%Y-%m-%d')}")

session_date = session_date.strftime("%Y-%m-%d")
is_trading_day = xnys.is_session(session_date)
print(f"Is {session_date} a trading day? {is_trading_day}")

# Get all trading sessions within a date range
#trading_days = xnys.sessions_in_range(pd.Timestamp("2024-01-01"), pd.Timestamp("2024-12-31"))
#print(f"Trading days in 2024: {trading_days}")

# Get the trading session that is 5 sessions before a specific date
#offset_trading_day = xnys.session_offset(previous_trading_day, count=-5)  # Use previous_trading_day here
#print(f"Trading day 5 sessions before 2024-12-28: {offset_trading_day}")

start_date = xnys.session_offset(session_date, count=-252).strftime("%Y-%m-%d")
end_date = xnys.session_offset(session_date, count=-5).strftime("%Y-%m-%d")
thirty_days_ago = xnys.session_offset(session_date, count=-30).strftime("%Y-%m-%d")

print(f"Start date: {start_date}")
print(f"End date: {end_date}")



Today is a weekend, using last Friday's date: 2024-12-27
Is 2024-12-27 a trading day? True
Start date: 2023-12-27
End date: 2024-12-19


In [None]:
results = engine.run_pipeline(
#    make_pipeline(), pd.to_datetime("2012-01-04"), pd.to_datetime("2012-03-01")
    make_pipeline(), start_date, end_date
)

Clean and display the results

In [45]:
results.dropna(subset="factor", inplace=True)
results.index.names = ["date", "symbol"]
results.sort_values(by=["date", "factor"], inplace=True)

In [None]:
display(results)

In [None]:
#    longs = results.xs(today, level=0).query("longs == True")
longs = results.xs(end_date, level=0).query("longs == True")
#longs = results.xs(pd.to_datetime("2012-03-01"), level=0).query("longs == True")

    #    shorts = results.xs(today, level=0).query("shorts == True")   
shorts = results.xs(end_date, level=0).query("shorts == True")
#shorts = results.xs(pd.to_datetime("2012-03-01"), level=0).query("shorts == True")

    # Print the list of long symbols
print(f"Long symbols: on {end_date}", longs.index.get_level_values('symbol').tolist())
print(f"Short symbols: on {end_date}", shorts.index.get_level_values('symbol').tolist())

#    longs = results.xs(today, level=0).query("longs == True")
longs = results.xs(thirty_days_ago, level=0).query("longs == True")
#longs = results.xs(pd.to_datetime("2012-03-01"), level=0).query("longs == True")

    #    shorts = results.xs(today, level=0).query("shorts == True")   
shorts = results.xs(thirty_days_ago, level=0).query("shorts == True")
#shorts = results.xs(pd.to_datetime("2012-03-01"), level=0).query("shorts == True")

    # Print the list of long symbols
print(f"Long symbols: on {thirty_days_ago}", longs.index.get_level_values('symbol').tolist())
print(f"Short symbols: on {thirty_days_ago}", shorts.index.get_level_values('symbol').tolist())

The code below is generated by Gemini

In [17]:
class MomentumFactor(CustomFactor):
    inputs = [USEquityPricing.close, Returns(window_length=126)]
    window_length = 252

    def compute(self, today, assets, out, prices, returns):
        price_change = (prices[-21] - prices[-252]) / prices[-252] - (prices[-1] - prices[-21]) / prices[-21]
        recent_returns = returns[-42:].mean(axis=0)
        out[:] = (price_change / np.nanstd(returns, axis=0)) * np.sign(recent_returns) 


In [18]:
top_n = 10

def make_pipeline():
    momentum = MomentumFactor()
    volatility = AnnualizedVolatility(window_length=252)
    dollar_volume = AverageDollarVolume(window_length=30)
    close_price = Latest(inputs=[USEquityPricing.close])
    avg_close_price = SimpleMovingAverage(inputs=[USEquityPricing.close], window_length=42)

    return Pipeline(
        columns={
            "momentum": momentum,
            "volatility": volatility,
            "momentum_rank": momentum.rank(ascending=False),
            "volatility_rank": volatility.rank(),
            "dollar_volume": dollar_volume,
            "close_price": close_price,
            "longs": momentum.top(top_n),  
            "shorts": momentum.bottom(top_n),
            "avg_close_price": avg_close_price,
        },
        screen=(dollar_volume.top(200) & (close_price > 5) & (avg_close_price > 5)),
    )


In [19]:
pipeline_loader = USEquityPricingLoader(
        bundle_data.equity_daily_bar_reader,
        bundle_data.adjustment_reader,
        fx_reader=None,
    )

engine = SimplePipelineEngine(
        get_loader=lambda col: pipeline_loader, asset_finder=bundle_data.asset_finder
    )

results = engine.run_pipeline(make_pipeline(), start_date, end_date)
results.dropna(subset=["momentum", "volatility"], inplace=True)
results.index.names = ["date", "symbol"]

In [20]:
    # 1. Volatility Filtering
    #
    #    - Filters out stocks with volatility above a certain threshold (e.g., 75th percentile)
    #    - This helps avoid extremely volatile stocks, even if they have high momentum z-scores.
    #
volatility_threshold = results.xs(end_date, level=0)["volatility"].quantile(0.75)
filtered_results = results.xs(end_date, level=0)[
        results.xs(end_date, level=0)["volatility"] < volatility_threshold
    ]
longs_filtered = filtered_results.nlargest(top_n, "momentum_rank")
shorts_filtered = filtered_results.nsmallest(top_n, "momentum_rank")

In [21]:
    # 2. Volatility-Adjusted Ranking
    #
    #    - Creates a combined rank that penalizes stocks with high volatility.
    #    - Subtracts a multiple of the volatility rank from the momentum rank.
    #    - The `volatility_penalty_factor` controls the strength of the penalty.
    #
volatility_penalty_factor = 0.5  # Adjust this factor as needed
results["combined_rank_adjusted"] = (
        results["momentum_rank"] - volatility_penalty_factor * results["volatility_rank"]
    )
longs_adjusted = results.xs(end_date, level=0).nlargest(
        top_n, "combined_rank_adjusted"
    )
shorts_adjusted = results.xs(end_date, level=0).nsmallest(
        top_n, "combined_rank_adjusted"
    )



In [22]:
    # 3. Conditional Weighting
    #
    #    - Dynamically adjusts the weights assigned to momentum and volatility based on the volatility level.
    #    - Uses `np.where()` for efficient vectorized conditional logic.
    #    - If volatility is above the median, assigns higher weight to volatility rank (e.g., 0.4) and lower weight to momentum rank (e.g., 0.6).
    #    - If volatility is below the median, assigns lower weight to volatility rank (e.g., 0.2) and higher weight to momentum rank (e.g., 0.8).
    #
def conditional_weight(volatility, momentum_rank, volatility_rank, median_volatility):
        return np.where(
            volatility > median_volatility,
            0.6 * momentum_rank + 0.4 * volatility_rank,
            0.8 * momentum_rank + 0.2 * volatility_rank,
        )

median_volatility = results["volatility"].median()

results["combined_rank_conditional"] = conditional_weight(
        results["volatility"],
        results["momentum_rank"],
        results["volatility_rank"],
        median_volatility,
    )
longs_conditional = results.xs(end_date, level=0).nlargest(
        top_n, "combined_rank_conditional"
    )
shorts_conditional = results.xs(end_date, level=0).nsmallest(
        top_n, "combined_rank_conditional"
    )


In [23]:
 # --- Print Results ---
print("\n--- Longs ---")
print("Filtered:", longs_filtered.index.get_level_values("symbol").tolist())
print("Adjusted:", longs_adjusted.index.get_level_values("symbol").tolist())
print("Conditional:", longs_conditional.index.get_level_values("symbol").tolist())

print("\n--- Shorts ---")
print("Filtered:", shorts_filtered.index.get_level_values("symbol").tolist())
print("Adjusted:", shorts_adjusted.index.get_level_values("symbol").tolist())
print(
        "Conditional:", shorts_conditional.index.get_level_values("symbol").tolist()
    )


--- Longs ---
Filtered: [Equity(18536 [SGOV]), Equity(5095 [CVX]), Equity(12818 [LULU]), Equity(4034 [CI]), Equity(4554 [COP]), Equity(20188 [TGT]), Equity(988 [AMGN]), Equity(20356 [TLT]), Equity(16873 [QCOM]), Equity(7216 [F])]
Adjusted: [Equity(18536 [SGOV]), Equity(5095 [CVX]), Equity(20356 [TLT]), Equity(4554 [COP]), Equity(4034 [CI]), Equity(988 [AMGN]), Equity(10469 [IEF]), Equity(12818 [LULU]), Equity(5377 [DE]), Equity(20188 [TGT])]
Conditional: [Equity(4538 [CONL]), Equity(17497 [RIVN]), Equity(5635 [DJT]), Equity(5407 [DELL]), Equity(19114 [SOUN]), Equity(4265 [CLSK]), Equity(12953 [MARA]), Equity(19014 [SNOW]), Equity(20692 [TSLL]), Equity(14898 [NVDL])]

--- Shorts ---
Filtered: [Equity(2473 [BIL]), Equity(2938 [BRK_B]), Equity(2937 [BRK_A]), Equity(9228 [GS]), Equity(15984 [PGR]), Equity(19247 [SPOT]), Equity(22707 [XLF]), Equity(11096 [ISRG]), Equity(11576 [JPM]), Equity(1989 [BA])]
Adjusted: [Equity(13845 [MSTR]), Equity(1256 [APP]), Equity(21800 [VST]), Equity(5075 [C

Define a function with create a pipeline with a VWAP factor

In [17]:
def make_pipeline_vwap():
    vwap = VWAP(window_length=5)
    dollar_volume = AverageDollarVolume(window_length=30)

    return Pipeline(
        columns={
            "factor": vwap,
            "longs": vwap.top(50),
            "shorts": vwap.bottom(50),
            "rank": vwap.rank(),
        },
        screen=dollar_volume.top(100),
    )

Run the pipeline

In [None]:
results = engine.run_pipeline(
    make_pipeline_vwap(), pd.to_datetime("2012-01-04"), pd.to_datetime("2012-03-01")
)

Clean and display the results

In [11]:
results.dropna(subset="factor", inplace=True)
results.index.names = ["date", "symbol"]
results.sort_values(by=["date", "factor"], inplace=True)

In [None]:
display(results)

**Jason Strimpel** is the founder of <a href='https://pyquantnews.com/'>PyQuant News</a> and co-founder of <a href='https://www.tradeblotter.io/'>Trade Blotter</a>. His career in algorithmic trading spans 20+ years. He previously traded for a Chicago-based hedge fund, was a risk manager at JPMorgan, and managed production risk technology for an energy derivatives trading firm in London. In Singapore, he served as APAC CIO for an agricultural trading firm and built the data science team for a global metals trading firm. Jason holds degrees in Finance and Economics and a Master's in Quantitative Finance from the Illinois Institute of Technology. His career spans America, Europe, and Asia. He shares his expertise through the <a href='https://pyquantnews.com/subscribe-to-the-pyquant-newsletter/'>PyQuant Newsletter</a>, social media, and has taught over 1,000+ algorithmic trading with Python in his popular course **<a href='https://gettingstartedwithpythonforquantfinance.com/'>Getting Started With Python for Quant Finance</a>**. All code is for educational purposes only. Nothing provided here is financial advise. Use at your own risk.