# Stock Data Pipeline


## 0. Install Zipline

You will need to restart your runtime after installing since zipline uses an older version of pandas.

In [None]:
!pip3 install zipline==1.3.0

Collecting zipline==1.3.0
[?25l  Downloading https://files.pythonhosted.org/packages/be/59/8c5802a7897c1095fdc409fb557f04df8f75c37174e80d2ba58c8d8a6488/zipline-1.3.0.tar.gz (2.5MB)
[K     |████████████████████████████████| 2.5MB 5.8MB/s 
Collecting Logbook>=0.12.5
[?25l  Downloading https://files.pythonhosted.org/packages/2f/d9/16ac346f7c0102835814cc9e5b684aaadea101560bb932a2403bd26b2320/Logbook-1.5.3.tar.gz (85kB)
[K     |████████████████████████████████| 92kB 7.8MB/s 
Collecting requests-file>=1.4.1
  Downloading https://files.pythonhosted.org/packages/77/86/cdb5e8eaed90796aa83a6d9f75cfbd37af553c47a291cd47bc410ef9bdb2/requests_file-1.5.1-py2.py3-none-any.whl
Collecting pandas<=0.22,>=0.18.1
[?25l  Downloading https://files.pythonhosted.org/packages/da/c6/0936bc5814b429fddb5d6252566fe73a3e40372e6ceaf87de3dec1326f28/pandas-0.22.0-cp36-cp36m-manylinux1_x86_64.whl (26.2MB)
[K     |████████████████████████████████| 26.3MB 1.3MB/s 
Collecting cyordereddict>=0.2.2
[?25l  Downloading 

## 1. Load libraries data directories

In [None]:
import zipline

from collections import OrderedDict
import numpy as np

import pandas as pd
from os import listdir
import pickle
import sys

import os

from zipline.data import bundles
from zipline.pipeline import Pipeline
from zipline.utils.calendars import get_calendar
from zipline.pipeline.engine import SimplePipelineEngine
from zipline.pipeline.factors import CustomFactor, DailyReturns, AverageDollarVolume, Returns


from zipline.pipeline.data import USEquityPricing
from zipline.pipeline.loaders import USEquityPricingLoader
from zipline.assets._assets import Equity
from zipline.api import symbol

In [None]:
from google.colab import drive

drive.mount('/content/drive')

Mounted at /content/drive


In [None]:
# Zipline root directory
zipline_dir = '/content/drive/MyDrive/abnormal-distribution-project-data/zipline'

os.environ['ZIPLINE_ROOT'] = zipline_dir

## 2. Ingestion and raw data processing 

### 2.1 Ingestion and Data Processing functions

In [None]:
# Portions of this code adapted from https://github.com/pbharrin/alpha-compiler


METADATA_HEADERS = ['start_date', 'end_date', 'auto_close_date',
                    'symbol', 'exchange', 'asset_name']


def check_for_abnormal_returns(df, thresh=3.0):
    """Checks to see if any days have abnormal returns"""
    returns = df['close'].pct_change()
    abnormal_rets = returns[returns > thresh]
    if abnormal_rets.shape[0] > 0:
        sys.stderr.write('Abnormal returns for: {}\n'.format(df.ix[0]['ticker']))
        sys.stderr.write('{}\n'.format(str(abnormal_rets)))


def from_sep_dump(file_name, start=None, end=None):
    """
    Function that reads full Sharadar stock price csv file
    sharadar_sep.csv, and returns an ingest function for Zipline

    """
    us_calendar = get_calendar("NYSE").all_sessions
    ticker2sid_map = {}

    def ingest(environ,
               asset_db_writer,
               minute_bar_writer,  # unused
               daily_bar_writer,
               adjustment_writer,
               calendar,
               cache,
               show_progress,
               output_dir,
               # pass these as defaults to make them 'nonlocal' in py2
               start=start,
               end=end):

        print("starting ingesting data from: {}".format(file_name))

        # read in the whole dump (will require ~7GB of RAM)
        df = pd.read_csv(file_name, index_col='date',
                         parse_dates=['date'], na_values=['NA'])

        # drop unused columns, dividends will be used later
        df = df.drop(['lastupdated', 'dividends', 'closeunadj'], axis=1)

        # counter of valid securites, this will be our primary key
        sec_counter = 0
        data_list = []  # list to send to daily_bar_writer
        metadata_list = []  # list to send to asset_db_writer (metadata)

        # iterate over all the unique securities and pack data, and metadata
        # for writing
        for tkr, df_tkr in df.groupby('ticker'):
            df_tkr = df_tkr.sort_index()

            row0 = df_tkr.ix[0]  # get metadata from row

            print(" preparing {}".format(row0["ticker"]))
            check_for_abnormal_returns(df_tkr)

            # check to see if there are missing dates in the middle
            this_cal = us_calendar[(us_calendar >= df_tkr.index[0]) & (us_calendar <= df_tkr.index[-1])]
            if len(this_cal) != df_tkr.shape[0]:
                print('MISSING interstitial dates for: %s using forward fill' % row0["ticker"])
                print('number of dates missing: {}'.format(len(this_cal) - df_tkr.shape[0]))
                df_desired = pd.DataFrame(index=this_cal.tz_localize(None))
                df_desired = df_desired.join(df_tkr)
                df_tkr = df_desired.fillna(method='ffill')

            # update metadata; 'start_date', 'end_date', 'auto_close_date',
            # 'symbol', 'exchange', 'asset_name'
            metadata_list.append((df_tkr.index[0],
                                  df_tkr.index[-1],
                                  df_tkr.index[-1] + pd.Timedelta(days=1),
                                  row0["ticker"],
                                  "SEP",  # all have exchange = SEP
                                  row0["ticker"]  # TODO: can we delete this?
                                  )
                                 )

            # drop metadata columns
            df_tkr = df_tkr.drop(['ticker'], axis=1)

            # pack data to be written by daily_bar_writer
            data_list.append((sec_counter, df_tkr))
            ticker2sid_map[tkr] = sec_counter  # record the sid for use later
            sec_counter += 1

        print("writing data for {} securities".format(len(metadata_list)))
        daily_bar_writer.write(data_list, show_progress=False)

        # write metadata
        asset_db_writer.write(equities=pd.DataFrame(metadata_list,
                                                    columns=METADATA_HEADERS))
        print("a total of {} securities were loaded into this bundle".format(
            sec_counter))

        # read in Dividend History
        dfd = pd.read_csv(file_name, index_col='date',
                         parse_dates=['date'], na_values=['NA'])
        # drop rows where dividends == 0.0
        dfd = dfd[dfd["dividends"] != 0.0]
        dfd = dfd.dropna()

        dfd.loc[:, 'ex_date'] = dfd.loc[:, 'record_date'] = dfd.index
        dfd.loc[:, 'declared_date'] = dfd.loc[:, 'pay_date'] = dfd.index
        dfd.loc[:, 'sid'] = dfd.loc[:, 'ticker'].apply(lambda x: ticker2sid_map[x])
        dfd = dfd.rename(columns={'dividends': 'amount'})
        dfd = dfd.drop(['open', 'high', 'low', 'close', 'volume', 'lastupdated', 'ticker', 'closeunadj'], axis=1)

        # # format dfd to have sid
        adjustment_writer.write(dividends=dfd)

    return ingest

def get_tickers_from_bundle(bundle_name):
    """Gets a list of tickers from a given bundle"""
    bundle_data = bundles.load(bundle_name, os.environ, None)

    # get a list of all sids
    lifetimes = bundle_data.asset_finder._compute_asset_lifetimes()
    all_sids = lifetimes.sid

    # retreive all assets in the bundle
    all_assets = bundle_data.asset_finder.retrieve_all(all_sids)

    # return only tickers
    return map(lambda x: (x.symbol, x.sid), all_assets)


def get_all_assets_for_bundle(bundle_name):
    """For a given bundle get a list of all assets"""
    bundle_data = load(bundle_name, os.environ, None)

    # get a list of all sids
    lifetimes = bundle_data.asset_finder._compute_asset_lifetimes()
    all_sids = lifetimes.sid

    print('all_sids: ', all_sids)

    # retreive all assets in the bundle
    return bundle_data.asset_finder.retrieve_all(sids=all_sids)


def get_ticker_sid_dict_from_bundle(bundle_name):
    """Packs the (ticker,sid) tuples into a dict."""
    all_equities = get_tickers_from_bundle(bundle_name)
    return dict(all_equities)

def pack_sparse_data(N, rawpath, fields, filename):
    """pack data into np.recarray and persists it to a file to be
    used by SparseDataFactor"""


    # create buffer to hold data for all tickers
    dfs = [None] * N

    max_len = -1
    print("Packing sids")
    for fn in listdir(rawpath):
        if not fn.endswith(".csv"):
            continue
        df = pd.read_csv(os.path.join(rawpath,fn), index_col="Date", parse_dates=True)
        df = df.sort_index()
        sid = int(fn.split('.')[0])
        #print("packing sid: %d" % sid)
        dfs[sid] = df

        # width is max number of rows in any file
        max_len = max(max_len, df.shape[0])
    print("Finished packing sids")

    # temp workaround for `Array Index Out of Bound` bug
    max_len = max_len + 1

    # pack up data as buffer
    num_fundamentals = len(fields)
    buff = np.full((num_fundamentals + 1, N, max_len), np.nan)

    dtypes = [('date', '<f8')]
    for field in fields:
        dtypes.append((field, '<f8'))

    # pack self.data as np.recarray
    data = np.recarray(shape=(N, max_len), buf=buff, dtype=dtypes)

    # iterate over loaded data and populate self.data
    for i, df in enumerate(dfs):
        if df is None:
            continue
        ind_len = df.index.shape[0]
        data.date[i, :ind_len] = df.index
        for field in fields:
            data[field][i, :ind_len] = df[field]

    data.dump(filename)  # can be read back with np.load()


def load_sf1(sf1_dir, 
             fields, 
             bund ='sep', 
             npy_name='SF1', 
             stocks_dir = '/content/drive/MyDrive/abnormal-distribution-project-data/stocks/dummy',
             dimensions=None):
    """
    Loads SF1 data into a npy compressed file SF1.npy
    :param sf1_dir: Sharadar SF1 bulk file
    :param fields: fields to load
    :param dimensions: dimensions to load. One-to-one with fields. If None, assume ARQ if data available,
    ART if not
    """

    bundles.register(bund, from_sep_dump('.', '.'), )
    num_tickers = len(get_ticker_sid_dict_from_bundle(bund))
    print('number of tickers: ', num_tickers)

    data = pd.read_csv(sf1_dir)

    tickers = get_ticker_sid_dict_from_bundle(bund)
    
    counter = 0
    for ticker, sid in tickers.items():
        counter += 1
        if counter % 100 == 0:
            print("Working on {}-th file".format(counter))

        df = data[(data.ticker == ticker)]
        df = df.rename(columns={'datekey': 'Date'}).set_index('Date')
        df.index = df.index.rename('Date')
        series = []
        for i, field in enumerate(fields):
            if dimensions is None:
                if df[df.dimension == 'ARQ'][field].isna().sum() == df[df.dimension == 'ARQ'].shape[0]:
                    s = df[df.dimension == 'ART'][field]
                else:
                    s = df[df.dimension == 'ARQ'][field]
            else:
                s = df[df.dimension == dimensions[i]][field]
            series.append(s)

        df = pd.concat(series, axis=1)
        df = df.sort_index()
        df.index = df.index.rename('Date')
        df.to_csv(os.path.join(stocks_dir, "{}.csv".format(sid)))
    
    pack_sparse_data(num_tickers + 1,  # number of tickers in bundle + 1
                     stocks_dir,
                     fields,
                     zipline_dir + '/data/' + npy_name +  '.npy')  # write directly to the zipline data dir

SECTOR_CODING = {'Technology': 0,
                 'Industrials': 1,
                 'Energy': 2,
                 'Utilities': 3,
                 'Consumer Cyclical': 4,
                 'Healthcare': 5,
                 'Financial Services': 6,
                 'Basic Materials': 7,
                 'Consumer Defensive': 8,
                 'Real Estate': 9,
                 'Communication Services': 10,
                 np.nan: -1}  # a few tickers are missing sectors, these should be ignored

EXCHANGE_CODING = {'NYSE': 0,
                   'NASDAQ': 1,
                   'NYSEMKT': 2,  # previously AMEX
                   'OTC': 3,
                   'NYSEARCA': 4,
                   'BATS': 5}


def load_static(filepath):
    """Stores static items to a persisted np array.
    The following static fields are currently persisted.
    -Sector
    -exchange
    -industry: GICS
    """
    bundles.register('sep', int, )

    df = pd.read_csv(filepath, index_col="ticker")
    df = df[df.exchange != 'None']
    df = df[df.exchange != 'INDEX']
    df = df[df.table == 'SEP']

    coded_sectors_for_ticker = df['sector'].map(SECTOR_CODING)
    coded_exchange_for_ticker = df['exchange'].map(EXCHANGE_CODING)
    coded_industry_for_ticker = df['siccode'].fillna(-1).astype('int')

    ae_d = get_ticker_sid_dict_from_bundle('sep')
    N = max(ae_d.values()) + 1

    # create 2-D array to hold data where index = SID
    static_data = np.full((3, N), -1, np.dtype('int64'))

    # iterate over Assets in the bundle, and fill in static fields
    print('Creating static data')
    for ticker, sid in ae_d.items():
        #print(ticker, sid, coded_sectors_for_ticker.get(ticker, -1))
        static_data[0, sid] = coded_sectors_for_ticker.get(ticker, -1)
        static_data[1, sid] = coded_exchange_for_ticker.get(ticker, -1)
        static_data[2, sid] = coded_industry_for_ticker.get(ticker, -1)
    print('Finished creating static data')

    # finally save the file to disk
    np.save(zipline_dir + '/data/' + "SHARDAR_static.npy", static_data)


class SparseDataFactor(CustomFactor):
    """Abstract Base Class to be used for computing sparse data.
    The data is packed and persisted into a NumPy binary data file
    in a previous step.
    This class must be subclassed with class variable 'outputs' set.  The fields
    in 'outputs' should match those persisted."""
    inputs = []
    window_length = 1

    def __init__(self, *args, **kwargs):
        self.time_index = None
        self.curr_date = None # date for which time_index is accurate
        self.data = None
        self.data_path = "please_specify_.npy_file"

    def bs(self, arr):
        """Binary Search"""
        if len(arr) == 1:
            if self.curr_date < arr[0]:
                return 0
            else: return 1

        mid = int(len(arr) / 2)
        if self.curr_date < arr[mid]:
            return self.bs(arr[:mid])
        else:
            return mid + self.bs(arr[mid:])

    def bs_sparse_time(self, sid):
        """For each security find the best range in the sparse data."""
        dates_for_sid = self.data.date[sid]
        if np.isnan(dates_for_sid[0]):
            return 0

        # do a binary search of the dates array finding the index
        # where self.curr_date will lie.
        non_nan_dates = dates_for_sid[~np.isnan(dates_for_sid)]
        return self.bs(non_nan_dates) - 1

    def cold_start(self, today, assets):
        if self.data is None:
            self.data = np.load(self.data_path, allow_pickle=True)

        self.M = self.data.date.shape[1]

        # for each sid, do binary search of date array to find current index
        # the results can be shared across all factors that inherit from SparseDataFactor
        # this sets an array of ints: time_index
        self.time_index = np.full(self.N, -1, np.dtype('int64'))
        self.curr_date = today.value
        for asset in assets:  # asset is numpy.int64
            self.time_index[asset] = self.bs_sparse_time(asset)

    def update_time_index(self, today, assets):
        """Ratchet update.
        for each asset check if today >= dates[self.time_index]
        if so then increment self.time_index[asset.sid] += 1"""

        ind_p1 = self.time_index.copy()
        np.add.at(ind_p1, ind_p1 != (self.M - 1), 1)
        sids_to_increment = today.value >= self.data.date[np.arange(self.N), ind_p1]
        sids_not_max = self.time_index != (self.M - 1)   # create mask of non-maxed
        self.time_index[sids_to_increment & sids_not_max] += 1

        self.curr_date = today.value

    def compute(self, today, assets, out, *arrays):
        # for each asset in assets determine index from date (today)
        if self.time_index is None:
            self.cold_start(today, assets)
        else:
            self.update_time_index(today, assets)

        ti_used_today = self.time_index[assets]

        for field in self.__class__.outputs:
            out[field][:] = self.data[field][assets, ti_used_today]


class Fundamentals(SparseDataFactor):
    outputs = ['marketcap', 'assets', 'liabilities', 'pe', 'currentratio', 'netmargin', 'capex', 'fcf', 'roic']

    def __init__(self, *args, **kwargs):
        super(Fundamentals, self).__init__(*args, **kwargs)
        self.N = len(get_ticker_sid_dict_from_bundle("sep")) + 1  # max(sid)+1 get this from the bundle
        self.data_path = zipline_dir + '/data/' + 'SF1.npy'

class FundamentalsSP500(SparseDataFactor):
    outputs = ['marketcap', 'assets', 'liabilities', 'pe', 'currentratio', 'netmargin', 'capex', 'fcf', 'roic']

    def __init__(self, *args, **kwargs):
        super(FundamentalsSP500, self).__init__(*args, **kwargs)
        self.N = len(get_ticker_sid_dict_from_bundle("sp500")) + 1  # max(sid)+1 get this from the bundle
        self.data_path = zipline_dir + '/data/' + 'SF1_SP500.npy'

class FundamentalsSP1500(SparseDataFactor):
    outputs = ['marketcap', 'assets', 'liabilities', 'pe', 'currentratio', 'netmargin', 'capex', 'fcf', 'roic']

    def __init__(self, *args, **kwargs):
        super(FundamentalsSP1500, self).__init__(*args, **kwargs)
        self.N = len(get_ticker_sid_dict_from_bundle("sp1500")) + 1  # max(sid)+1 get this from the bundle
        self.data_path = zipline_dir + '/data/' + 'SF1.npy'

class StaticData(CustomFactor):
    """Returns static values for an SID.
    This holds static data (does not change with time) like: exchange, sector, industry"""
    inputs = []
    window_length = 1
    outputs = ['sector', 'exchange', 'industry']

    def __init__(self, *args, **kwargs):
        self.data = np.load(zipline_dir + '/data/' + 'SHARDAR_static.npy', allow_pickle=True)

    def compute(self, today, assets, out):
        # out[:] = self.data[assets]
        out['sector'][:] = self.data[0, assets]
        out['exchange'][:] = self.data[1, assets]
        out['industry'][:] = self.data[2, assets]

### 2.2 Ingest stock and fundamental data into Zipine

Warning: Running this code will overwrite database and can take almost an hour to run!

In [None]:
# Stock price data ingestion

#!zipline ingest -b 'sep'
#zipline ingest -b 'sp500'
#!zipline ingest -b 'sp1500'

In [None]:
# Fundamental data ingestion

# Choose which fundamental fields you want to be able to process through the pipeline.
# A list of  of all available fields can be found below.

# Fundamental data directory
#sf1_dir = '/content/drive/MyDrive/abnormal-distribution-project-data/stocks/SHARADAR_SF1.csv'
#fields = ['marketcap', 'assets', 'liabilities', 'pe', 'currentratio', 'netmargin', 'capex', 'fcf', 'roic']
#load_sf1(sf1_dir, fields, dimensions=None)


In [None]:
# Static data ingestions
#static_file = '/content/drive/MyDrive/abnormal-distribution-project-data/stocks/SHARADAR_TICKERS.zip'
#load_static(static_file)



Creating static data
Finished creating static data


An explanation of database related fields can be found in :

  https://www.quandl.com/databases/SF1/documentation?anchor=dimensions

A summary of each fundamental field can be found in:

https://docs-1-8--quantrocket.netlify.app/docs/data/fundamental/sharadar/
        
        ['ticker', 'dimension', 'calendardate', 'datekey', 'reportperiod',
       'lastupdated', 'accoci', 'assets', 'assetsavg', 'assetsc', 'assetsnc',
       'assetturnover', 'bvps', 'capex', 'cashneq', 'cashnequsd', 'cor',
       'consolinc', 'currentratio', 'de', 'debt', 'debtc', 'debtnc', 'debtusd',
       'deferredrev', 'depamor', 'deposits', 'divyield', 'dps', 'ebit',
       'ebitda', 'ebitdamargin', 'ebitdausd', 'ebitusd', 'ebt', 'eps',
       'epsdil', 'epsusd', 'equity', 'equityavg', 'equityusd', 'ev', 'evebit',
       'evebitda', 'fcf', 'fcfps', 'fxusd', 'gp', 'grossmargin']

       ['intexp', 'invcap', 'invcapavg', 'inventory', 'investments',
       'investmentsc', 'investmentsnc', 'liabilities', 'liabilitiesc',
       'liabilitiesnc', 'marketcap', 'ncf', 'ncfbus', 'ncfcommon', 'ncfdebt',
       'ncfdiv', 'ncff', 'ncfi', 'ncfinv', 'ncfo', 'ncfx', 'netinc',
       'netinccmn', 'netinccmnusd', 'netincdis', 'netincnci', 'netmargin',
       'opex', 'opinc', 'payables', 'payoutratio', 'pb', 'pe', 'pe1',
       'ppnenet', 'prefdivis', 'price', 'ps', 'ps1', 'receivables', 'retearn',
       'revenue', 'revenueusd', 'rnd', 'roa', 'roe', 'roic', 'ros', 'sbcomp',
       'sgna', 'sharefactor', 'sharesbas', 'shareswa', 'shareswadil', 'sps',
       'tangibles', 'taxassets', 'taxexp', 'taxliabilities', 'tbvps',
       'workingcapital']

# 3. Equity factors

## 3.1 Load helper functions

In [None]:
# Portions of these code adapted from https://www.udacity.com/course/ai-for-trading--nd880 


def register_data(start_date, end_date, bundle_name, address):

    start_session = pd.Timestamp(start_date, tz='utc')
    end_session = pd.Timestamp(end_date, tz='utc')

    register(bundle_name, csvdir_equities(['daily'],address,),
    calendar_name='NYSE', start_session=start_session,
    end_session=end_session)


class PricingLoader(object):
    def __init__(self, bundle_data):
        self.loader = USEquityPricingLoader(
            bundle_data.equity_daily_bar_reader,
            bundle_data.adjustment_reader)

    def get_loader(self, column):
        if column not in USEquityPricing.columns:
            raise Exception('Column not in USEquityPricing')
        return self.loader

def build_pipeline_engine(bundle_data, trading_calendar):
    pricing_loader = PricingLoader(bundle_data)

    engine = SimplePipelineEngine(
        get_loader=pricing_loader.get_loader,
        calendar=trading_calendar.all_sessions,
        asset_finder=bundle_data.asset_finder)

    return engine

# Loading stock list from file
def stock_list(file_name):
    all_stocks = []
    with open(file_name, 'r') as f:
        for line in f:
            # remove linebreak which is the last character of the string
            currentPlace = line[:-1]
            # add item to the list
            all_stocks.append(currentPlace)
        return all_stocks

def get_universe_tickers(engine, universe, end_date):
    universe_end_date = pd.Timestamp(end_date, tz='UTC')

    universe_tickers = engine \
        .run_pipeline(
        Pipeline(screen=universe),
        universe_end_date,
        universe_end_date) \
        .index.get_level_values(1) \
        .values.tolist()

    return universe_tickers


def run_pipeline(engine, pipeline, start_date, end_date):

    # TODO: adjust for trading days
    end_dt = pd.Timestamp(end_date.strftime('%Y-%m-%d'), tz='UTC')
    start_dt = pd.Timestamp(start_date.strftime('%Y-%m-%d'), tz='UTC')
    return engine.run_pipeline(pipeline, start_dt, end_dt)


def get_pipeline_tickers(factors):

    return factors.index.levels[1].values.tolist()


def make_pipeline(factors, universe):
    factors_pipe = OrderedDict()
        
    for name, f in factors.items():
        factors_pipe[name] = f
                    
    pipe = Pipeline(screen=universe, columns=factors_pipe)
    
    return pipe

#pd.DataFrame(sorted([asset.security_name for asset in all_assets]), 
#             columns =['ticker']).to_csv('sp1500.csv', index=False)

#data = pd.read_csv('/content/drive/MyDrive/abnormal-distribution-project-data/stocks/SHARADAR_SEP.csv')
#sp500_tickers = pd.read_csv('/content/drive/MyDrive/abnormal-distribution-project-data/components/sp500.csv')
#data_sp500 = data[data.ticker.isin(sp500_tickers.ticker)]
#data_sp500.reset_index(inplace=True, drop=True)
#data_sp500.to_csv('/content/drive/MyDrive/abnormal-distribution-project-data/stocks/SHARADAR_SP500.csv', index=False)

## 3.2 Custom Factors


In [None]:

def ForwardReturns(long_window, short_window, mask, asset=None):
    if asset == None:
        return ((1 + Returns(window_length=long_window, mask=mask))/\
                (1 + Returns(window_length=short_window, mask=mask)) - 1)
    else:
        return ((1 + Returns(inputs=[asset], window_length=long_window, mask=mask))/\
                (1 + Returns(inputs=[asset], window_length=short_window, mask=mask)) - 1)


def make_returns():
    
    all_factors = {
        
        '1D_ret_open': DailyReturns(inputs=[USEquityPricing.open]),
        '1W_ret_open': Returns(inputs=[USEquityPricing.open], window_length=5),
        '1M_ret_open': Returns(inputs=[USEquityPricing.open], window_length=21), 
        '3M_ret_open': Returns(inputs=[USEquityPricing.open], window_length=63),
        '6M_ret_open': Returns(inputs=[USEquityPricing.open], window_length=126), 
        '1Y_ret_open': Returns(inputs=[USEquityPricing.open], window_length=252),
        
        '1D_ret_close': DailyReturns() ,
        '1W_ret_close': Returns(window_length=5),
        '1M_ret_close': Returns(window_length=21), 
        '3M_ret_close': Returns(window_length=63),
        '6M_ret_close': Returns(window_length=126),
        '1Y_ret_close': Returns(window_length=252),
    }
    
    return all_factors

## 3.3 Run pipeline


In [None]:
# Obtain data, choose trading calendar and build pipeline engine

trading_calendar = get_calendar('NYSE') 
bundles.register('sp1500', from_sep_dump('.'))
bundle_data = bundles.load('sp1500')
engine = build_pipeline_engine(bundle_data, trading_calendar)

In [None]:
# Set date range
start_date = '1999-12-30'
end_date = '2020-11-19'
universe_start_date = pd.Timestamp(start_date, tz='UTC')
universe_end_date = pd.Timestamp(end_date, tz='UTC')

# Select universe of stocks
universe = FundamentalsSP1500().marketcap.top(1500) 

pipeline = make_pipeline(make_returns(), universe)

# Define pipeline
#pipeline = Pipeline(screen=universe)
#pipeline.add(StaticData().sector, 'sector')
#pipeline.add(DailyReturns(window_length=252), '1Y_return')


In [None]:
# Run pipeline 
all_factors = run_pipeline(engine, pipeline, universe_start_date, universe_end_date)
# Convert tickers from zipline class to string
all_factors.index = all_factors.index.set_levels(all_factors.index.levels[1].map(lambda x: x.asset_name), level=1)
# Get all tickers for the stocks we're looking at
all_assets = get_pipeline_tickers(all_factors)



In [None]:
pickle.dump(all_factors, open('/content/drive/MyDrive/abnormal-distribution-project-data/components/returns_sp1500.p','wb'))



In [None]:
import pandas as pd
import pickle
A = pickle.load(open('/content/drive/MyDrive/abnormal-distribution-project-data/components/returns_sp500.p','rb'))

In [None]:
A.head()

Unnamed: 0,Unnamed: 1,1D_ret_close,1D_ret_open,1M_ret_close,1M_ret_open,1W_ret_close,1W_ret_open,1Y_ret_close,1Y_ret_open,3M_ret_close,3M_ret_open,6M_ret_close,6M_ret_open
1999-12-30 00:00:00+00:00,AAPL,0.025086,-0.022599,0.028604,-0.011429,0.007848,-0.057734,1.511173,1.416201,0.59115,0.62594,1.219753,1.094431
1999-12-30 00:00:00+00:00,ACF,-0.009973,-0.027027,0.091941,0.062699,0.042104,0.028571,0.455922,0.364877,0.24267,0.252174,0.151194,0.112003
1999-12-30 00:00:00+00:00,ACGL,0.014894,0.0,-0.004868,-0.085734,0.073518,-0.025585,-0.390545,-0.432283,-0.175691,-0.134977,-0.111732,-0.143316
1999-12-30 00:00:00+00:00,ACIIQ,0.09376,0.012821,0.135371,-0.000559,0.165249,0.11049,-0.334591,-0.403606,-0.100932,-0.167132,-0.199177,-0.273716
1999-12-30 00:00:00+00:00,ADBE,-0.002843,0.011614,-0.039662,-0.077913,-0.013701,0.027788,1.873291,1.808691,0.162314,0.153583,0.548259,0.584315


In [None]:
A = A.reset_index().rename(columns={'level_0':'date','level_1':'ticker'})[['date','ticker','1D_ret_close', '1M_ret_close']]

In [None]:
A.to_csv('/content/drive/MyDrive/abnormal-distribution-project-data/components/returns_sp500.csv',index=False)

In [None]:
A = pd.read_csv('/content/drive/MyDrive/abnormal-distribution-project-data/components/returns_sp500.csv')

In [None]:
A.head()

Unnamed: 0,date,ticker,1D_ret_close
0,1999-12-30 00:00:00+00:00,A,0.170732
1,1999-12-30 00:00:00+00:00,AAPL,0.025086
2,1999-12-30 00:00:00+00:00,ABC,-0.029342
3,1999-12-30 00:00:00+00:00,ABMD,0.014085
4,1999-12-30 00:00:00+00:00,ABS,-0.028287
