-
Notifications
You must be signed in to change notification settings - Fork 13
/
half_life.py
192 lines (156 loc) · 6.04 KB
/
half_life.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
# -*- coding: utf-8 -*-
"""
Created on Mon Jul 23 20:32:54 2018
@author: antonio constandinou
"""
# CALCULATE HALF LIFE FOR HURST MEAN REVERTING STOCKS
import datetime
import numpy as np
import pandas as pd
import os
import psycopg2
import statsmodels.api as sm
import math
import matplotlib.pyplot as plt
def load_db_tickers_start_date(start_date, conn):
"""
return a list of stock tickers that have data on the start_date arg provided
args:
start_date: datetime object to be used to query or PostgreSQL database
conn: a Postgres DB connection object
returns:
list of tuples
"""
# convert start_date to string for our SQL query
date_string = start_date.strftime("%Y-%m-%d")
cur = conn.cursor()
SQL = """
SELECT ticker FROM symbol
WHERE id IN
(SELECT DISTINCT(stock_id)
FROM daily_data
WHERE date_price = %s)
"""
cur.execute(SQL, (date_string,))
data = cur.fetchall()
return data
def load_db_credential_info(f_name_path):
"""
load text file holding our database credential info and the database name
args:
f_name_path: name of file preceded with "\\", type string
returns:
array of 4 values that should match text file info
"""
cur_path = os.getcwd()
# lets load our database credentials and info
f = open(cur_path + f_name_path, 'r')
lines = f.readlines()[1:]
lines = lines[0].split(',')
return lines
def load_txt_file_array(f_name):
"""
return an array of strings from input text file
args:
f_name: file name as string
returns:
array
"""
cur_path = os.getcwd() + f_name
lines = open(cur_path).read().splitlines()
return lines
def write_results_text_file(f_name, sub_array):
"""
write an array to text file
args:
f_name: name of our file to be written with extension (.txt), type string
sub_array: array of our data
returns:
None
"""
# lets write elements of array to a file
file_to_write = open(f_name, 'w')
for ele in sub_array:
file_to_write.write("%s\n" % ele)
def main():
# name of our database credential files (.txt)
db_credential_info = "database_info.txt"
# create a path version of our text file
db_credential_info_p = "\\" + db_credential_info
# create our instance variables for host, username, password and database name
db_host, db_user, db_password, db_name = load_db_credential_info(db_credential_info_p)
conn = psycopg2.connect(host=db_host,database=db_name, user=db_user, password=db_password)
cur = conn.cursor()
# we will need to filter our results to create a test sample
start_date = datetime.date(2004,12,30)
end_date = datetime.date(2010,12,30)
# load our hurst exponent output file for stocks that passed < 0.5
file_name = "\\" + "he_stock_list_2010_12_30.txt"
list_of_stocks = load_txt_file_array(file_name)
# stocks whos half life is less than a criteria
halflife_value_arr = []
failed_tickers = []
passed_tickers = []
min_hf_var = 50.0
for ticker in list_of_stocks:
# our SQL statement
SQL = """
SELECT date_price, adj_close_price
FROM daily_data
INNER JOIN symbol ON symbol.id = daily_data.stock_id
WHERE symbol.ticker LIKE %s
"""
cur.execute(SQL, (ticker,))
# will return a list of tuples
results = cur.fetchall()
# convert query results to pandas dataframe
stock_data = pd.DataFrame(results, columns=['Date', 'Adj_Close'])
# ensure our df is in order
stock_data = stock_data.sort_values(by=['Date'], ascending = True)
# change our data type in our Adj Close column
stock_data['Adj_Close'] = stock_data['Adj_Close'].astype(float)
mask = (stock_data['Date'] > start_date) & (stock_data['Date'] <= end_date)
stock_data = stock_data.loc[mask]
# create a lag of our data
try:
stock_data.replace([np.inf, -np.inf], np.nan)
stock_data.fillna(0, inplace=True)
stock_data = stock_data.sort_values(by=['Date'], ascending = True)
# re-index our df
stock_data = stock_data.reset_index(drop=True)
stock_lag = stock_data['Adj_Close'].shift(1)
stock_lag.at[0] = 1.0
stock_returns = stock_data['Adj_Close'] - stock_lag
stock_returns.at[0] = 1.0
# run OLS regression
# add constant to predictor
stock_lag2 = sm.add_constant(stock_lag)
# independent var (X) = stock_lag2, dependent var (Y) = stock_returns
# speed of MR = slope
model = sm.OLS(stock_returns, stock_lag2)
res = model.fit()
# divide by speed of MR
halflife = round(-np.log(2))/res.params[1]
print("{0} Halflife: {1}".format(ticker, halflife))
if halflife > 0.0:
halflife_value_arr.append(halflife)
if halflife > min_hf_var:
failed_tickers.append(ticker)
else:
passed_tickers.append(ticker)
except:
print("Failed at {}".format(ticker))
# output our results of those above or below our threshold
write_results_text_file('halfL_failed_tickers.txt', failed_tickers)
write_results_text_file('halfL_passed_tickers.txt', passed_tickers)
mean = np.mean(halflife_value_arr)
median = np.median(halflife_value_arr)
count = len(halflife_value_arr)
std_dev = np.std(halflife_value_arr)
print("Total: {}".format(count))
print("Median: {}".format(median))
print("Mean: {}".format(mean))
print("Std. Dev: {}".format(std_dev))
plt.hist(halflife_value_arr, bins = 20, edgecolor='black')
if __name__ == "__main__":
main()