In [None]:
import pandas as pd
import numpy as np
import plotly.graph_objects as go

from sklearn.model_selection import KFold
from sklearn.model_selection import train_test_split as ttsplit
from sklearn.metrics import mean_squared_error as mse

import xgboost as xgb
from xgboost import XGBClassifier
from xgboost import plot_importance
from xgboost import plot_tree

from pyastrotrader import calculate_chart, calculate_aspects, calculate_transits
from pyastrotrader.utils import create_input_json
from pyastrotrader.constants import *

In [None]:
SWING_TRADE_DURATION = 5
SWING_EXPECTED_VOLATILITY = 3.8
STAGNATION_THRESHOLD = 3

PETR4_NATAL_DATE = '1953-10-03T19:05:00-03:00'
NATAL_DATE = PETR4_NATAL_DATE
DEFAULT_PARAMETERS = './config/default_parameters.json'
DEFAULT_CONFIG = './config/default_config.json'

SOURCE_FILE = "./input/PETR4_Daily"

ETA = 0.3
DEPTH = 9
NUM_TREES = 100
MAX_INTERACTIONS = 20

In [None]:
StockPrices = pd.read_csv("{}.csv".format(SOURCE_FILE))
StockPrices['Counter'] = np.arange(len(StockPrices))

In [None]:
def correct_date(x):
    date_str = str(x['Date'])
    return date_str[0:4] + '-' + date_str[4:6] + '-' + date_str[6:8]    

def change_sign(x,y):
    return  not ((x > 0 and y > 0) or (x < 0 and y < 0))

In [None]:
def get_previous_stock_price(df, x):
    if x['Counter'] > (SWING_TRADE_DURATION - 1):
        return float(df[df['Counter'] == (x['Counter'] - SWING_TRADE_DURATION)]['Price'])
    else:
        return 0

def get_future_stock_price(df, x, max):
    if x['Counter'] < (max - SWING_TRADE_DURATION):
        return float(df[df['Counter'] == (x['Counter'] + SWING_TRADE_DURATION)]['Price'])
    else:
        return 0    
    
def calculate_current_trend(x):
    if x['PreviousStartPrice'] > 0:
        return ((float(x['Price']) / float(x['PreviousStartPrice'])) - 1) * 100
    else:
        return 0;

def get_previous_stock_date(df, x):
    if x['Counter'] > (SWING_TRADE_DURATION - 1):
        return float(df[df['Counter'] == (x['Counter'] - SWING_TRADE_DURATION)]['Date'])
    else:
        return 0

def get_future_stock_date(df, x, max):
    if x['Counter'] < (max - SWING_TRADE_DURATION):
        return float(df[df['Counter'] == (x['Counter'] + SWING_TRADE_DURATION)]['Date'])
    else:
        return 0    

def calculate_future_trend(x):
    if x['FutureFinalPrice'] > 0:
        return ((float(x['FutureFinalPrice']) / float(x['Price'])) - 1) * 100
    else:
        return 0;

def calculate_intraday_volatility(df, x):
    current_range = df[df['Counter'] >= (x['Counter'] - (SWING_TRADE_DURATION * 2))]
    current_range = current_range[current_range['Counter'] < x['Counter']]
    return (((current_range['High'] / current_range['Low']) - 1).sum() / (SWING_TRADE_DURATION * 2)) * 100

In [None]:
def calculate_swing_strenght(x):
    if float(x['CurrentTrend']) != 0 and float(x['FutureTrend'] != 0):
        return abs(float(x['CurrentTrend']) - float(x['FutureTrend']))
    else:
        return 0

def is_swing_trade(x):
    return 1 if change_sign(x['FutureTrend'], x['CurrentTrend']) and \
                x['SwingStrength'] > (x['IntradayVolatility'] * SWING_EXPECTED_VOLATILITY) and \
                abs(x['FutureTrend']) >= abs(x['CurrentTrend']) else 0

def clean_swing_trade(df, x):
    if x['IsSwing'] == 1:
        current_range = df[df['Counter'] <= (x['Counter'] + SWING_TRADE_DURATION)]
        current_range = current_range[current_range['Counter'] > x['Counter']]
        if current_range['IsSwing'].sum() > 0:
            return 0
        else:
            return 1
    else:
        return 0
    
def detect_price_increase(x):
    return 1 if abs(x['FutureTrend']) >= (x['IntradayVolatility'] * STAGNATION_THRESHOLD) and x['FutureTrend'] > 0 else 0

def detect_price_decrease(x):
    return 1 if abs(x['FutureTrend']) >= (x['IntradayVolatility'] * STAGNATION_THRESHOLD) and x['FutureTrend'] < 0 else 0

def detect_price_stagnated(x):
    return 0 if abs(x['FutureTrend']) >= (x['IntradayVolatility'] * STAGNATION_THRESHOLD) else 1

In [None]:
max_counter = StockPrices['Counter'].max()

StockPrices['CorrectedDate'] = StockPrices.apply( lambda x :  correct_date(x), axis =1 )
StockPrices['PreviousStartPrice'] = StockPrices.apply( lambda x :  get_previous_stock_price(StockPrices, x), axis =1 )
StockPrices['FutureFinalPrice'] = StockPrices.apply( lambda x :  get_future_stock_price(StockPrices, x, max_counter), axis =1 )
StockPrices['PreviousStartDate'] = StockPrices.apply( lambda x :  get_previous_stock_date(StockPrices, x), axis =1 )
StockPrices['FutureFinalDate'] = StockPrices.apply( lambda x :  get_future_stock_date(StockPrices, x, max_counter), axis =1 )

StockPrices['CurrentTrend'] = StockPrices.apply(lambda x : calculate_current_trend(x), axis = 1)

StockPrices['FutureTrend'] = StockPrices.apply(lambda x : calculate_future_trend(x), axis = 1)
StockPrices['SwingStrength'] = StockPrices.apply(lambda x: calculate_swing_strenght(x), axis =1)
StockPrices['IntradayVolatility'] = StockPrices.apply(lambda x: calculate_intraday_volatility(StockPrices, x), axis =1)

StockPrices['IsSwing'] = StockPrices.apply(lambda x: is_swing_trade(x), axis =1)
StockPrices['IsSwing'] = StockPrices.apply(lambda x:clean_swing_trade(StockPrices, x), axis =1)

StockPrices['StockIncreasedPrice'] = StockPrices.apply(lambda x:detect_price_increase(x), axis =1)
StockPrices['StockDecreasedPrice'] = StockPrices.apply(lambda x:detect_price_decrease(x), axis =1)
StockPrices['StockStagnated'] = StockPrices.apply(lambda x:detect_price_stagnated(x), axis =1)

StockPrices['TARGET_IS_SWING'] = StockPrices['IsSwing']

In [None]:
#StockPrices[StockPrices['IsSwing'] == 1].sort_values(by=['Date'], ascending=[0]).head(20)

In [None]:
#StockPrices[StockPrices['StockIncreasedPrice'] == 1].sort_values(by=['Date'], ascending=[0]).head(20)

In [None]:
#StockPrices[StockPrices['StockDecreasedPrice'] == 1].sort_values(by=['Date'], ascending=[0]).head(20)

In [None]:
swing_to_chart = []
for index, current_swing in StockPrices[StockPrices['IsSwing'] == 1].iterrows():
    swing_to_chart.append(dict(
        x0=current_swing['CorrectedDate'], 
        x1=current_swing['CorrectedDate'], 
        y0=0, 
        y1=1, 
        xref='x', 
        yref='paper',
        line_width=2))


In [None]:
fig = go.Figure(data=[go.Candlestick(
                x=StockPrices['CorrectedDate'],
                open=StockPrices['Open'],
                high=StockPrices['High'],
                low=StockPrices['Low'],
                close=StockPrices['Price'])])
fig.update_layout(
    title="PETR4 Detected Swing Trade Opportunities",
    width=1000,
    height=500,
    xaxis_rangeslider_visible=False,
    shapes=swing_to_chart,
    margin=go.layout.Margin(
        l=0,
        r=0,
        b=0,
        t=30,
        pad=4
    ),    
)
fig.show()

In [None]:
petr4_natal_chart_input = create_input_json(NATAL_DATE, 
                                            DEFAULT_PARAMETERS, 
                                            DEFAULT_CONFIG)

petr4_natal_chart = calculate_chart(petr4_natal_chart_input)
dates_to_generate = list(StockPrices['CorrectedDate'])

In [None]:
planets = [SUN,MOON,SATURN, JUPITER, VENUS]
aspects = [CONJUNCTION, SQUARE, TRINE, OPPOSITION, SEMISQUARE]

charts = {}
transits = {}

for current_date in dates_to_generate:
    chart_input = create_input_json(current_date + 'T10:00:00-03:00', 
                                      DEFAULT_PARAMETERS, 
                                      DEFAULT_CONFIG)
    charts[current_date] = calculate_chart(chart_input)
    transits[current_date] = calculate_transits(petr4_natal_chart, charts[current_date], planets, aspects, 4)    

In [None]:
def is_aspected(row, first_planet, second_planet, aspect):
    c_transits = transits[row['CorrectedDate']]
    found_params = [x for x in c_transits if (x['c_planet'] == first_planet and x['n_planet'] == second_planet and x['c_aspect'] == aspect) or \
                                           (x['n_planet'] == first_planet and x['c_planet'] == second_planet and x['c_aspect'] == aspect) ]
    return 1 if len(found_params) > 0 else 0

In [None]:
astro_columns = []

for first_planet in planets:
    for second_planet in planets:
        for aspect in aspects:
            column_name="ASTRO_{}_{}_{}".format(PLANETS[first_planet],ASPECT_NAME[aspect],PLANETS[second_planet]).upper()
            astro_columns.append(column_name)
            StockPrices['StockIncreasedPrice'] = StockPrices.apply(lambda x:detect_price_increase(x), axis =1)
            StockPrices[column_name] = StockPrices.apply(lambda x:is_aspected(x, first_planet, second_planet, aspect), axis =1)

In [None]:
def is_retrograde(row, planet):
    if planet in [SUN,MOON]:
        return 0
    c_chart = charts[row['CorrectedDate']]
    return 1 if c_chart['planets']['planets_retrograde'][planet] else 0   
    

In [None]:
for first_planet in planets:
    column_name="ASTRO_{}_RETROGADE".format(PLANETS[first_planet]).upper()
    astro_columns.append(column_name)
    StockPrices[column_name] = StockPrices.apply(lambda x:is_retrograde(x, first_planet), axis =1)

In [None]:
param = {}
param['booster'] = 'gbtree'
param['objective'] = 'binary:logistic'
param['eval_metric'] = 'auc'
param['tree_method'] = 'auto'
param['silent'] = 0
param['subsample'] = 0.5



def create_booster_swing_trade(eta,depth,num_trees, train_x, train_y, test_x, test_y, columns, trained_model):
    param['max_depth'] = depth
    param['eta'] = eta
    num_round = num_trees
    dtrain = xgb.DMatrix(train_x, train_y, feature_names = columns)
    dtest = xgb.DMatrix(test_x, test_y, feature_names = columns)
    train_labels = dtrain.get_label()
    ratio = float(np.sum(train_labels == 0)) / np.sum(train_labels == 1) 
    param['scale_pos_weight'] = ratio
    gpu_res = {}
    booster = xgb.train(param, dtrain, num_round, evals_result=gpu_res, evals = [], xgb_model = trained_model)    
    return booster

In [None]:
booster = None
best_score = 1
best_booster = None

for current_run in range(MAX_INTERACTIONS):
    X = StockPrices[astro_columns].values
    Y = StockPrices['TARGET_IS_SWING'].values
    total_test = xgb.DMatrix(X, feature_names = astro_columns)
    X_train_1, X_train_2, y_train_1, y_train_2 = ttsplit(X, Y, 
                                                         test_size=0.3, 
                                                         random_state=None,
                                                         shuffle=True)
    booster = create_booster_swing_trade(
        ETA, DEPTH, NUM_TREES, 
        X_train_1, y_train_1, 
        X_train_2, y_train_2, 
        astro_columns, booster)
    current_score = mse(booster.predict(total_test), Y)
    if current_score < best_score:
        best_score = current_score
        best_booster = booster
        #print(best_score)

booster = best_booster
print("Best Score:{}".format(best_score))

relevant_features = sorted( ((v,k) for k,v in booster.get_score().items()), reverse=True)
relevant_features

In [None]:
interaction = 0

def predict_swing_trade_score(row):
    global interaction
    matrix_to_predict = row[astro_columns].values
    matrix_to_predict = matrix_to_predict.reshape(1,-1)
    interaction = interaction + 1
    row_features = xgb.DMatrix(matrix_to_predict, feature_names = astro_columns)
    return booster.predict(row_features)[0]
    
StockPrices['PredictSwingTradeScore'] = StockPrices.apply(lambda x:predict_swing_trade_score(x), axis =1)

In [None]:
output_excel_file='./output/PETR4.Analisys.xlsx'
StockPrices.to_excel(output_excel_file)

In [None]:
swing_to_chart = []
for index, current_swing in StockPrices[StockPrices['PredictSwingTradeScore'] > 0.9].iterrows():
    swing_to_chart.append(dict(
        x0=current_swing['CorrectedDate'], 
        x1=current_swing['CorrectedDate'], 
        y0=0, 
        y1=1, 
        xref='x', 
        yref='paper',
        line_width=2))

In [None]:
fig = go.Figure(data=[go.Candlestick(
                x=StockPrices['CorrectedDate'],
                open=StockPrices['Open'],
                high=StockPrices['High'],
                low=StockPrices['Low'],
                close=StockPrices['Price'])])
fig.update_layout(
    title="PETR4 Swing Trade Opportunities detected by XGBoost",
    width=1000,
    height=500,
    xaxis_rangeslider_visible=False,
    shapes=swing_to_chart,
    margin=go.layout.Margin(
        l=0,
        r=0,
        b=0,
        t=30,
        pad=4
    ),    
)
fig.show()