In [2]:
from pandas import DataFrame
from NNTrade.common.candle_col_name import CLOSE, HIGH, INDEX, LOW, OPEN, VOLUME
from pyalgotrade.bar import Frequency
from pyalgotrade.barfeed.csvfeed import BarFeed, GenericRowParser, bar
from pyalgotrade.barfeed import BaseBarFeed
from typing import Dict

class CustomBarFeed(BaseBarFeed):
    def __init__(self, stock_data: Dict[str, DataFrame], custom_data: Dict[str, DataFrame], frequency: bar.Frequency):
        super().__init__(frequency)
        self.__stock_data = stock_data
        self.__custom_data = custom_data

    def getNextBars(self):
        # Implement this method to return the next bars from the DataFrames.
        # This method will be called by PyAlgoTrade's trading logic.
        for instrument, stock_df in self.__stock_data.items():
            # Get the next bar from the stock DataFrame
            stock_bar = self.__getNextStockBar(instrument)
            # Get the custom data for the same datetime
            custom_bar = self.__getNextCustomBar(instrument)
            yield (instrument, stock_bar, custom_bar)

    def __getNextStockBar(self, instrument):
        
        stock_df = self.__stock_data[instrument]
        # Get the next row from the DataFrame and create a Bar object
        row = stock_df.iloc[self.getCurrentDateTime()]
        return bar.BasicBar(
            row.name, row[OPEN], row[HIGH], row[LOW], row[CLOSE], row[VOLUME], None, self.getFrequency()
        )

    def __getNextCustomBar(self, instrument):
        custom_df = self.__custom_data.get(instrument)
        if custom_df is None:
            return None
        row = custom_df.loc[self.getCurrentDateTime()]
        return bar.BasicBar(
            row.name, row["Custom1"], row["Custom2"], row["Custom3"], row["Custom4"], row["Custom5"], row["Custom6"], bar.Frequency.DAY
        )
        custom_df = self.__custom_data.get(instrument)
        if custom_df is None:
            return None
        # Get the next row from the custom DataFrame
        row = custom_df.iloc[self.getCurrentDateTime()]
        # Example: Assuming your custom data has columns named "Custom1" and "Custom2"
        return row.to_dict()# {"Custom1": row["Custom1"], "Custom2": row["Custom2"]}

    def barsHaveAdjClose(self):
        # Indicate whether bars have adjusted close prices.
        return False

    def eof(self):
        # Indicate whether there are no more bars to return.
        # You need to implement logic to determine this based on your data source.
        return False

    def getCurrentDateTime(self):
        # Return the current datetime of the last loaded bar.
        # You need to implement logic to track the current datetime based on the data source.
        return None

    def join(self):
        # Perform any necessary cleanup or finalization steps.
        # This method is called when the feed is no longer needed.
        pass

    def peekDateTime(self):
        # Return the datetime of the next bar without advancing the feed.
        # You need to implement logic to determine the next datetime based on the data source.
        return None

    def start(self):
        # Initialize the feed and prepare for loading bars.
        # You may need to perform setup tasks here.
        pass

    def stop(self):
        # Finalize the feed and perform any cleanup tasks.
        # This method is called when the feed is no longer needed.
        pass

Pyarrow will become a required dependency of pandas in the next major release of pandas (pandas 3.0),
(to allow more performant data types, such as the Arrow string type, and better interoperability with other libraries)
but was not found to be installed on your system.
If this would cause problems for you,
please provide us feedback at https://github.com/pandas-dev/pandas/issues/54466
        
  from pandas import DataFrame


In [3]:
test_data_1_path = "./tmp/test_barfeed.1.csv"
test_data_2_path = "./tmp/test_barfeed.2.csv"

In [4]:
import yfinance as yf
import pandas as pd
from NNTrade.common.candle_col_name import INDEX, OPEN,CLOSE, HIGH, LOW, VOLUME

In [5]:
import os

if not os.path.exists(test_data_1_path):
    data_df:pd.DataFrame =  yf.Ticker("EURUSD=X").history(start="2020-01-01",end="2020-01-10", interval="1d")
    data_df.index = data_df.index.date
    data_df.index.name = INDEX
    data_df["High"] = data_df[["Open","High","Low","Close"]].max(axis=1)
    data_df["Low"] = data_df[["Open","High","Low","Close"]].min(axis=1)
    data_df["Volume"] = 10000000
    data_df.rename({"Open":OPEN, "High": HIGH, "Low":LOW, "Close":CLOSE, "Volume":VOLUME},axis=1, inplace=True)
    data_df.to_csv(test_data_1_path)

if not os.path.exists(test_data_2_path):
    data_df:pd.DataFrame =  yf.Ticker("GBPUSD=X").history(start="2020-01-01",end="2020-01-10", interval="1d")
    data_df.index = data_df.index.date
    data_df.index.name = INDEX
    data_df["High"] = data_df[["Open","High","Low","Close"]].max(axis=1)
    data_df["Low"] = data_df[["Open","High","Low","Close"]].min(axis=1)
    data_df["Volume"] = 10000000
    data_df.rename({"Open":OPEN, "High": HIGH, "Low":LOW, "Close":CLOSE, "Volume":VOLUME},axis=1, inplace=True)
    data_df.to_csv(test_data_2_path)

In [6]:
data_1_df:pd.DataFrame = pd.read_csv(test_data_1_path, index_col=INDEX)
data_1_df.index = pd.to_datetime(data_1_df.index)
data_1_df

Unnamed: 0_level_0,open,high,low,close,volume,Dividends,Stock Splits
start_date_time,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1,Unnamed: 6_level_1,Unnamed: 7_level_1
2020-01-01,1.122083,1.122838,1.115947,1.122083,10000000,0.0,0.0
2020-01-02,1.121894,1.122712,1.116682,1.122083,10000000,0.0,0.0
2020-01-03,1.117081,1.118068,1.11257,1.117144,10000000,0.0,0.0
2020-01-06,1.116246,1.120825,1.11581,1.116196,10000000,0.0,0.0
2020-01-07,1.119583,1.119946,1.113487,1.119799,10000000,0.0,0.0
2020-01-08,1.115573,1.11652,1.111086,1.115474,10000000,0.0,0.0
2020-01-09,1.111444,1.112223,1.109509,1.111321,10000000,0.0,0.0


In [7]:
data_2_df:pd.DataFrame = pd.read_csv(test_data_2_path, index_col=INDEX)
data_2_df.index = pd.to_datetime(data_2_df.index)
data_2_df

Unnamed: 0_level_0,open,high,low,close,volume,Dividends,Stock Splits
start_date_time,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1,Unnamed: 6_level_1,Unnamed: 7_level_1
2020-01-01,1.3254,1.3271,1.321877,1.32626,10000000,0.0,0.0
2020-01-02,1.325047,1.326788,1.312508,1.32503,10000000,0.0,0.0
2020-01-03,1.314389,1.316119,1.305483,1.31527,10000000,0.0,0.0
2020-01-06,1.30813,1.3174,1.30654,1.30801,10000000,0.0,0.0
2020-01-07,1.317193,1.321213,1.309638,1.317003,10000000,0.0,0.0
2020-01-08,1.311131,1.316951,1.308267,1.311372,10000000,0.0,0.0
2020-01-09,1.310582,1.312456,1.301507,1.310513,10000000,0.0,0.0


In [8]:
data_1_custom_df = DataFrame({"Custom1": data_1_df[CLOSE] * 10, "Custom2": data_1_df[CLOSE] * 100})
data_1_custom_df

Unnamed: 0_level_0,Custom1,Custom2
start_date_time,Unnamed: 1_level_1,Unnamed: 2_level_1
2020-01-01,11.220826,112.208259
2020-01-02,11.220826,112.208259
2020-01-03,11.171438,111.714375
2020-01-06,11.16196,111.619604
2020-01-07,11.197995,111.979949
2020-01-08,11.154739,111.547387
2020-01-09,11.113211,111.132109


In [9]:
data_2_custom_df = DataFrame({"Custom1": data_2_df[CLOSE] * 10, "Custom2": data_2_df[CLOSE] * 100})
data_2_custom_df

Unnamed: 0_level_0,Custom1,Custom2
start_date_time,Unnamed: 1_level_1,Unnamed: 2_level_1
2020-01-01,13.2626,132.625997
2020-01-02,13.250299,132.502985
2020-01-03,13.152703,131.52703
2020-01-06,13.080102,130.801022
2020-01-07,13.170025,131.700253
2020-01-08,13.113722,131.137216
2020-01-09,13.10513,131.051302


In [10]:
data_1 = data_1_df.join(data_1_custom_df)
data_1

Unnamed: 0_level_0,open,high,low,close,volume,Dividends,Stock Splits,Custom1,Custom2
start_date_time,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1,Unnamed: 6_level_1,Unnamed: 7_level_1,Unnamed: 8_level_1,Unnamed: 9_level_1
2020-01-01,1.122083,1.122838,1.115947,1.122083,10000000,0.0,0.0,11.220826,112.208259
2020-01-02,1.121894,1.122712,1.116682,1.122083,10000000,0.0,0.0,11.220826,112.208259
2020-01-03,1.117081,1.118068,1.11257,1.117144,10000000,0.0,0.0,11.171438,111.714375
2020-01-06,1.116246,1.120825,1.11581,1.116196,10000000,0.0,0.0,11.16196,111.619604
2020-01-07,1.119583,1.119946,1.113487,1.119799,10000000,0.0,0.0,11.197995,111.979949
2020-01-08,1.115573,1.11652,1.111086,1.115474,10000000,0.0,0.0,11.154739,111.547387
2020-01-09,1.111444,1.112223,1.109509,1.111321,10000000,0.0,0.0,11.113211,111.132109


In [11]:
data_2 = data_2_df.join(data_2_custom_df)
data_2

Unnamed: 0_level_0,open,high,low,close,volume,Dividends,Stock Splits,Custom1,Custom2
start_date_time,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1,Unnamed: 6_level_1,Unnamed: 7_level_1,Unnamed: 8_level_1,Unnamed: 9_level_1
2020-01-01,1.3254,1.3271,1.321877,1.32626,10000000,0.0,0.0,13.2626,132.625997
2020-01-02,1.325047,1.326788,1.312508,1.32503,10000000,0.0,0.0,13.250299,132.502985
2020-01-03,1.314389,1.316119,1.305483,1.31527,10000000,0.0,0.0,13.152703,131.52703
2020-01-06,1.30813,1.3174,1.30654,1.30801,10000000,0.0,0.0,13.080102,130.801022
2020-01-07,1.317193,1.321213,1.309638,1.317003,10000000,0.0,0.0,13.170025,131.700253
2020-01-08,1.311131,1.316951,1.308267,1.311372,10000000,0.0,0.0,13.113722,131.137216
2020-01-09,1.310582,1.312456,1.301507,1.310513,10000000,0.0,0.0,13.10513,131.051302


In [12]:
from pyalgotrade.barfeed.membf import BarFeed
class InMemBarFeed(BarFeed):
    def __init__(self, frequency, maxLen=None):
        super().__init__(frequency, maxLen)
    def barsHaveAdjClose(self):
        return False

In [13]:
from pyalgotrade.barfeed import yahoofeed
from pyalgotrade.bar import Frequency
from pyalgotrade.bar import BasicBar
feed = InMemBarFeed(Frequency.DAY)
mask = ~data_1.columns.isin([OPEN, HIGH, CLOSE, LOW,VOLUME])
feed.addBarsFromSequence('Inst1', data_1.index.map(lambda i:
        BasicBar(
            i,      
            data_1.loc[i, OPEN],
            data_1.loc[i, HIGH],            
            data_1.loc[i, LOW],             
            data_1.loc[i, CLOSE],           
            data_1.loc[i, VOLUME],          
            None,           
            Frequency.DAY,        
            data_1.loc[i][mask].to_dict())    
).values)
mask = ~data_2.columns.isin([OPEN, HIGH, CLOSE, LOW,VOLUME])
feed.addBarsFromSequence('Inst2', data_2.index.map(lambda i:
        BasicBar(
            i,      
            data_2.loc[i, OPEN],
            data_2.loc[i, HIGH],            
            data_2.loc[i, LOW],             
            data_2.loc[i, CLOSE],           
            data_2.loc[i, VOLUME],          
            None,           
            Frequency.DAY,        
            data_2.loc[i][mask].to_dict())    
).values)

In [15]:
for col in data_2.columns[mask]:
    print(col)

Dividends
Stock Splits
Custom1
Custom2


In [13]:
from pyalgotrade.dataseries.bards import BarDataSeries
bf:BarDataSeries = feed["Inst2"]
bf.getExtraDataSeries("Custom1")

<pyalgotrade.dataseries.SequenceDataSeries at 0x7fa1342f17d0>

In [14]:
from pyalgotrade import strategy
from pyalgotrade.bar import BasicBar
from pyalgotrade.technical import ma
from pyalgotrade.dataseries.bards import BarDataSeries

class MyStrategy(strategy.BacktestingStrategy):
    def __init__(self, feed, instrument1, instrument2):
        super(MyStrategy, self).__init__(feed)
        self.__instrument1 = instrument1
        self.__instrument2 = instrument2
        bds1:BarDataSeries = feed[instrument1]
        bds2:BarDataSeries = feed[instrument2]
        
        self.__inst1_custom1:ma.SMA = ma.SMA(bds1.getExtraDataSeries("Custom1"), 1)
        self.__inst1_custom2:ma.SMA = ma.SMA(bds1.getExtraDataSeries("Custom2"), 1)
        self.__inst2_custom1:ma.SMA = ma.SMA(bds2.getExtraDataSeries("Custom1"), 1)
        self.__inst2_custom2:ma.SMA = ma.SMA(bds2.getExtraDataSeries("Custom2"), 1)

    def onBars(self, bars):

        bar1:BasicBar = bars[self.__instrument1]
        custom_bar1 = bar1.getExtraColumns()

        bar2:BasicBar = bars[self.__instrument2]
        custom_bar2 = bar2.getExtraColumns()
        print(f"{bar1.getDateTime()}:")
        print(f"\t{self.__instrument1}:")
        print(f"\t\tPrice1 {bar1.getPrice()}")
        print(f"\t\tcustom1-0 {self.__inst1_custom1[-1]}")
        if len(self.__inst1_custom1)>1:
            print(f"\t\tcustom1-1 {self.__inst1_custom1[-2]}")
        print(f"\t\tcustom2-0 {self.__inst1_custom2[-1]}")
        if len(self.__inst1_custom2)>1:
            print(f"\t\tcustom2-1 {self.__inst1_custom2[-2]}")
        print(f"\t{self.__instrument2}:")
        print(f"\t\tPrice2 {bar2.getPrice()}")
        print(f"\t\tcustom1-0 {self.__inst2_custom1[-1]}")
        if len(self.__inst2_custom1)>1:
            print(f"\t\tcustom1-1 {self.__inst2_custom1[-2]}")
        print(f"\t\tcustom2-0 {self.__inst2_custom2[-1]}")
        if len(self.__inst2_custom2)>1:
            print(f"\t\tcustom2-1 {self.__inst2_custom2[-2]}")

In [15]:
MyStrategy(feed, "Inst1", "Inst2").run()

2020-01-01 00:00:00:
	Inst1:
		Price1 1.1220825910568235
		custom1-0 11.220825910568236
		custom2-0 112.20825910568234
	Inst2:
		Price2 1.326259970664978
		custom1-0 13.26259970664978
		custom2-0 132.6259970664978
2020-01-02 00:00:00:
	Inst1:
		Price1 1.1220825910568235
		custom1-0 11.220825910568236
		custom1-1 11.220825910568236
		custom2-0 112.20825910568234
		custom2-1 112.20825910568234
	Inst2:
		Price2 1.3250298500061035
		custom1-0 13.250298500061035
		custom1-1 13.26259970664978
		custom2-0 132.50298500061035
		custom2-1 132.6259970664978
2020-01-03 00:00:00:
	Inst1:
		Price1 1.1171437501907349
		custom1-0 11.17143750190735
		custom1-1 11.220825910568236
		custom2-0 111.71437501907349
		custom2-1 112.20825910568234
	Inst2:
		Price2 1.3152703046798706
		custom1-0 13.152703046798706
		custom1-1 13.250298500061035
		custom2-0 131.52703046798706
		custom2-1 132.50298500061035
2020-01-06 00:00:00:
	Inst1:
		Price1 1.1161960363388062
		custom1-0 11.161960363388062
		custom1-1 11.1714