-
Notifications
You must be signed in to change notification settings - Fork 105
/
main.py
295 lines (259 loc) · 12.7 KB
/
main.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
import robin_stocks.robinhood as r
import pandas as pd
import numpy as np
import ta as ta
from pandas.plotting import register_matplotlib_converters
from ta import *
from misc import *
from tradingstats import *
from config import *
#Log in to Robinhood
#Put your username and password in a config.py file in the same directory (see sample file)
login = r.login(rh_username,rh_password)
#Safe divide by zero division function
def safe_division(n, d):
return n / d if d else 0
def get_historicals(ticker, intervalArg, spanArg, boundsArg):
history = r.get_stock_historicals(ticker,interval=intervalArg,span=spanArg,bounds=boundsArg)
#If it's not a stock ticker, try it as a crypto ticker
if(history is None or None in history):
history = r.get_crypto_historicals(ticker,interval=intervalArg,span=spanArg,bounds=boundsArg)
return history
def get_watchlist_symbols():
"""
Returns: the symbol for each stock in your watchlist as a list of strings
"""
my_list_names = set()
symbols = set()
watchlistInfo = r.get_all_watchlists()
for watchlist in watchlistInfo['results']:
listName = watchlist['display_name']
my_list_names.add(listName)
for listName in my_list_names:
watchlist = r.get_watchlist_by_name(name=listName)
for item in watchlist['results']:
symbol = item['symbol']
symbols.add(symbol)
return symbols
def get_portfolio_symbols():
"""
Returns: the symbol for each stock in your portfolio as a list of strings
"""
symbols = []
holdings_data = r.get_open_stock_positions()
for item in holdings_data:
if not item:
continue
instrument_data = r.get_instrument_by_url(item.get('instrument'))
symbol = instrument_data['symbol']
symbols.append(symbol)
return symbols
def get_position_creation_date(symbol, holdings_data):
"""Returns the time at which we bought a certain stock in our portfolio
Args:
symbol(str): Symbol of the stock that we are trying to figure out when it was bought
holdings_data(dict): dict returned by r.get_open_stock_positions()
Returns:
A string containing the date and time the stock was bought, or "Not found" otherwise
"""
instrument = r.get_instruments_by_symbols(symbol)
url = instrument[0].get('url')
for dict in holdings_data:
if(dict.get('instrument') == url):
return dict.get('created_at')
return "Not found"
def get_modified_holdings():
""" Retrieves the same dictionary as r.build_holdings, but includes data about
when the stock was purchased, which is useful for the read_trade_history() method
in tradingstats.py
Returns:
the same dict from r.build_holdings, but with an extra key-value pair for each
position you have, which is 'bought_at': (the time the stock was purchased)
"""
holdings = r.build_holdings()
holdings_data = r.get_open_stock_positions()
for symbol, dict in holdings.items():
bought_at = get_position_creation_date(symbol, holdings_data)
bought_at = str(pd.to_datetime(bought_at))
holdings[symbol].update({'bought_at': bought_at})
return holdings
def get_last_crossing(df, days, symbol="", direction=""):
"""Searches for a crossing between two indicators for a given stock
Args:
df(pandas.core.frame.DataFrame): Pandas dataframe with columns containing the stock's prices, both indicators, and the dates
days(int): Specifies the maximum number of days that the cross can occur by
symbol(str): Symbol of the stock we're querying. Optional, used for printing purposes
direction(str): "above" if we are searching for an upwards cross, "below" if we are searching for a downwaords cross. Optional, used for printing purposes
Returns:
1 if the short-term indicator crosses above the long-term one
0 if there is no cross between the indicators
-1 if the short-term indicator crosses below the long-term one
"""
prices = df.loc[:,"Price"]
shortTerm = df.loc[:,"Indicator1"]
LongTerm = df.loc[:,"Indicator2"]
dates = df.loc[:,"Dates"]
lastIndex = prices.size - 1
index = lastIndex
found = index
recentDiff = (shortTerm.at[index] - LongTerm.at[index]) >= 0
if((direction == "above" and not recentDiff) or (direction == "below" and recentDiff)):
return 0
index -= 1
while(index >= 0 and found == lastIndex and not np.isnan(shortTerm.at[index]) and not np.isnan(LongTerm.at[index]) \
and ((pd.Timestamp("now", tz='UTC') - dates.at[index]) <= pd.Timedelta(str(days) + " days"))):
if(recentDiff):
if((shortTerm.at[index] - LongTerm.at[index]) < 0):
found = index
else:
if((shortTerm.at[index] - LongTerm.at[index]) > 0):
found = index
index -= 1
if(found != lastIndex):
if((direction == "above" and recentDiff) or (direction == "below" and not recentDiff)):
print(symbol + ": Short SMA crossed" + (" ABOVE " if recentDiff else " BELOW ") + "Long SMA at " + str(dates.at[found]) \
+", which was " + str(pd.Timestamp("now", tz='UTC') - dates.at[found]) + " ago", ", price at cross: " + str(prices.at[found]) \
+ ", current price: " + str(prices.at[lastIndex]))
return (1 if recentDiff else -1)
else:
return 0
def five_year_check(stockTicker):
"""Figure out if a stock has risen or been created within the last five years.
Args:
stockTicker(str): Symbol of the stock we're querying
Returns:
True if the stock's current price is higher than it was five years ago, or the stock IPO'd within the last five years
False otherwise
"""
instrument = r.get_instruments_by_symbols(stockTicker)
if(instrument is None or len(instrument) == 0):
return True
list_date = instrument[0].get("list_date")
if ((pd.Timestamp("now") - pd.to_datetime(list_date)) < pd.Timedelta("5 Y")):
return True
fiveyear = get_historicals(stockTicker, "day", "5year", "regular")
if (fiveyear is None or None in fiveyear):
return True
closingPrices = []
for item in fiveyear:
closingPrices.append(float(item['close_price']))
recent_price = closingPrices[len(closingPrices) - 1]
oldest_price = closingPrices[0]
return (recent_price > oldest_price)
def golden_cross(stockTicker, n1, n2, days, direction=""):
"""Determine if a golden/death cross has occured for a specified stock in the last X trading days
Args:
stockTicker(str): Symbol of the stock we're querying
n1(int): Specifies the short-term indicator as an X-day moving average.
n2(int): Specifies the long-term indicator as an X-day moving average.
(n1 should be smaller than n2 to produce meaningful results, e.g n1=50, n2=200)
days(int): Specifies the maximum number of days that the cross can occur by
direction(str): "above" if we are searching for an upwards cross, "below" if we are searching for a downwaords cross. Optional, used for printing purposes
Returns:
1 if the short-term indicator crosses above the long-term one
0 if there is no cross between the indicators
-1 if the short-term indicator crosses below the long-term one
False if direction == "above" and five_year_check(stockTicker) returns False, meaning that we're considering whether to
buy the stock but it hasn't risen overall in the last five years, suggesting it contains fundamental issues
"""
if(direction == "above" and not five_year_check(stockTicker)):
return False
history = get_historicals(stockTicker, "day", "year", "regular")
#Couldn't get pricing data
if(history is None or None in history):
return False
closingPrices = []
dates = []
for item in history:
closingPrices.append(float(item['close_price']))
dates.append(item['begins_at'])
price = pd.Series(closingPrices)
dates = pd.Series(dates)
dates = pd.to_datetime(dates)
sma1 = ta.volatility.bollinger_mavg(price, int(n1), False)
sma2 = ta.volatility.bollinger_mavg(price, int(n2), False)
series = [price.rename("Price"), sma1.rename("Indicator1"), sma2.rename("Indicator2"), dates.rename("Dates")]
df = pd.concat(series, axis=1)
cross = get_last_crossing(df, days, symbol=stockTicker, direction=direction)
if(cross) and plot:
show_plot(price, sma1, sma2, dates, symbol=stockTicker, label1=str(n1)+" day SMA", label2=str(n2)+" day SMA")
return cross
def sell_holdings(symbol, holdings_data):
""" Place an order to sell all holdings of a stock.
Args:
symbol(str): Symbol of the stock we want to sell
holdings_data(dict): dict obtained from get_modified_holdings() method
"""
shares_owned = int(float(holdings_data[symbol].get("quantity")))
if not debug:
r.order_sell_market(symbol, shares_owned)
print("####### Selling " + str(shares_owned) + " shares of " + symbol + " #######")
def buy_holdings(potential_buys, profile_data, holdings_data):
""" Places orders to buy holdings of stocks. This method will try to order
an appropriate amount of shares such that your holdings of the stock will
roughly match the average for the rest of your portfoilio. If the share
price is too high considering the rest of your holdings and the amount of
buying power in your account, it will not order any shares.
Args:
potential_buys(list): List of strings, the strings are the symbols of stocks we want to buy
symbol(str): Symbol of the stock we want to sell
holdings_data(dict): dict obtained from r.build_holdings() or get_modified_holdings() method
"""
cash = float(profile_data.get('cash'))
portfolio_value = float(profile_data.get('equity')) - cash
ideal_position_size = (safe_division(portfolio_value, len(holdings_data))+cash/len(potential_buys))/(2 * len(potential_buys))
prices = r.get_latest_price(potential_buys)
for i in range(0, len(potential_buys)):
stock_price = float(prices[i])
if(ideal_position_size < stock_price < ideal_position_size*1.5):
num_shares = int(ideal_position_size*1.5/stock_price)
elif (stock_price < ideal_position_size):
num_shares = int(ideal_position_size/stock_price)
else:
print("####### Tried buying shares of " + potential_buys[i] + ", but not enough buying power to do so#######")
break
print("####### Buying " + str(num_shares) + " shares of " + potential_buys[i] + " #######")
if not debug:
r.order_buy_market(potential_buys[i], num_shares)
def scan_stocks():
""" The main method. Sells stocks in your portfolio if their 50 day moving average crosses
below the 200 day, and buys stocks in your watchlist if the opposite happens.
###############################################################################################
WARNING: Comment out the sell_holdings and buy_holdings lines if you don't actually want to execute the trade.
###############################################################################################
If you sell a stock, this updates tradehistory.txt with information about the position,
how much you've earned/lost, etc.
"""
if debug:
print("----- DEBUG MODE -----\n")
print("----- Starting scan... -----\n")
register_matplotlib_converters()
watchlist_symbols = get_watchlist_symbols()
portfolio_symbols = get_portfolio_symbols()
holdings_data = get_modified_holdings()
potential_buys = []
sells = []
print("Current Portfolio: " + str(portfolio_symbols) + "\n")
print("Current Watchlist: " + str(watchlist_symbols) + "\n")
print("----- Scanning portfolio for stocks to sell -----\n")
for symbol in portfolio_symbols:
cross = golden_cross(symbol, n1=50, n2=200, days=30, direction="below")
if(cross == -1):
sell_holdings(symbol, holdings_data)
sells.append(symbol)
profile_data = r.build_user_profile()
print("\n----- Scanning watchlist for stocks to buy -----\n")
for symbol in watchlist_symbols:
if(symbol not in portfolio_symbols):
cross = golden_cross(symbol, n1=50, n2=200, days=10, direction="above")
if(cross == 1):
potential_buys.append(symbol)
if(len(potential_buys) > 0):
buy_holdings(potential_buys, profile_data, holdings_data)
if(len(sells) > 0):
update_trade_history(sells, holdings_data, "tradehistory.txt")
print("----- Scan over -----\n")
if debug:
print("----- DEBUG MODE -----\n")
#execute the scan
scan_stocks()