In [2]:
from airflow import DAG
from datetime import datetime,timedelta
#from airflow.operators.python import PythonOperator

## Projects
import pandas as pd
import numpy as np
import joblib
import os
import time
import datetime
from datetime import timedelta,date,datetime
import configparser as cp
from sqlalchemy import create_engine
import mysql.connector
import joblib
from pytz import timezone
import pytz
import configparser as cp

# Stats
import math
import statsmodels.formula.api as smf
import statsmodels.api as sm

### Task 
* Added Mysql hook
* xcom

# Environment Set-up

In [3]:
## Airflow ##
linear_analysis_arg={'owner':'airflow',
                     'depends_on_past':False,
                     'start_date':datetime(2022,3,1),
                     'retries':1,
                     'retry_delay':timedelta(minutes=10)
                    }

## Database ##
# update to MySQL hook in Airflow
config=cp.ConfigParser()
config.read('/home/ubuntu/certi/db_login.txt')
db_config=config['ivan_db']

engine=create_engine('mysql+mysqlconnector://{0:s}:{1:s}@{2:s}/{3:s}'.format(db_config['userid'],
                                                                             db_config['pwd'],
                                                                             db_config['hostname'],
                                                                             'STOCK_PRED'
                                                                            ))
stock_mapping=pd.read_sql("""SELECT * 
                             FROM STOCK_PRED.NYSE_NASDAQ_TICKERS
                          """,con=engine)
print(stock_mapping.shape)
print(stock_mapping.Symbol.nunique())


## Regression ## 
def linear_reg_analysis_for(df):
    lr_model=smf.ols('Close ~ DATE_ORDER',data=df).fit()
    #lr_model=sm.OLS(x.Close,x.DATE_ORDER).fit()
    
    model_result={#'Stock':df.Stock[0],
                  'R_squared':[lr_model.rsquared],
                  'Coef':[lr_model.params[1]],
                  'P_values':[lr_model.pvalues[1]],
                  
                  'Start_Date':df['Date'].min(),
                  'End_Date':df['Date'].max(),
                  'Num_records':[df.shape[0]],
                  'Num_records_dist':[df.Date.nunique()]  
                 }
    
    return pd.DataFrame(model_result)

(5991, 12)
5991


# Data Loading & Cleaning
* Save the final result as pkl

In [12]:
def stock_data_cleaning():
    ## 1 Data Loading ##
    # 1.1 Stock data - last 60 days
    df=pd.read_sql("""SELECT * 
                      FROM STOCK_PRED.ALL_STOCK_HIST
                      WHERE DATE>=CURDATE()-INTERVAL 60 DAY
                             """,
                   con=engine)
    print(df.shape)
    print(df.Stock.nunique())
    print(df['Date'].max(),df['Date'].min())
    
    if df.loc[df.Date==df.Date.max(),:].shape[0]>5000:
        ## 2 Data Cleaning ##
        # 2.1 Remove NAs
        df_1=df.dropna(axis=0,how='any')
    
        # 2.2 Remove accounts with Negative Stock price
        negative_stocks=df_1.loc[df_1.Close<0,'Stock'].unique()
        df_1=df_1.loc[~df_1.Stock.isin(negative_stocks),:]
    
        # 2.3 Keep active stocks
        active_stocks=df_1.loc[df_1.Date==df_1.Date.max(),'Stock'].to_list()
        df_2=df_1.loc[df_1.Stock.isin(active_stocks),:].reset_index(drop=True)
    
        # 2.4 Add DATE_ORDER
        df_2.loc[:,'DATE_ORDER']=df_2.groupby('Stock').Date.transform(lambda x:x.rank(method='first',ascending=True))
        
        stock_mapping=pd.read_sql("""SELECT * 
                                     FROM STOCK_PRED.NYSE_NASDAQ_TICKERS
                                  """,con=engine)
    
        # 2.4 Merging
        df_3=pd.merge(df_2,
                      stock_mapping.loc[:,['Symbol','Name','Country','IPOYear','Sector','Industry']],
                      how='left',
                      left_on='Stock',
                      right_on='Symbol'
                     )
        df_3.drop(['Symbol'],axis=1,inplace=True)
        df_3.sort_values(by=['Stock','Date'],ascending=True,inplace=True)
    
        ## 3. Saving ##
        joblib.dump(df_3,'/home/ubuntu/data/stock_price_pred/ALL_STOCK_L60D.pkl')
        
        print('{:,.0f} records saved, data as of {:s}'.format(df_3.shape[0],
                                                              df_3.Date.max().strftime('%Y-%m-%d')
                                                             ))
        
    else:
        print('Data issue: latest data {:s} only as {:,.0f} rows'.format(df.Date.max().strftime('%Y-%m-%d'),
                                                                         df.loc[df.Date==df.Date.max(),:].shape[0]
                                                                        ))

    

In [13]:
stock_data_cleaning()

(226383, 9)
5739
2022-03-15 00:00:00 2022-01-18 00:00:00
222,553 records saved, data as of 2022-03-15


In [5]:
df=joblib.load('/home/ubuntu/data/stock_price_pred/airflow_test.pkl')
print(df.shape)
print(df.Stock.nunique())

(222483, 15)
5610


In [11]:
df.head()

Unnamed: 0,Date,Open,High,Low,Close,Volume,SE,Stock,REFRESH_DATE,DATE_ORDER,Name,Country,IPOYear,Sector,Industry
6384,2022-01-03,159.0,159.440002,153.929993,156.479996,1606300.0,NYSE,A,2022-01-05 03:00:45,1.0,Agilent Technologies Inc. Common Stock,United States,1999.0,Capital Goods,Electrical Products
6385,2022-01-04,157.289993,158.860001,149.699997,151.190002,2233958.0,NYSE,A,2022-01-05 03:00:45,2.0,Agilent Technologies Inc. Common Stock,United States,1999.0,Capital Goods,Electrical Products
14353,2022-01-05,150.830002,153.100006,148.529999,148.600006,2370500.0,NYSE,A,2022-01-06 02:58:12,3.0,Agilent Technologies Inc. Common Stock,United States,1999.0,Capital Goods,Electrical Products
19925,2022-01-06,148.850006,149.960007,145.580002,149.119995,2298300.0,NYSE,A,2022-01-07 03:00:14,4.0,Agilent Technologies Inc. Common Stock,United States,1999.0,Capital Goods,Electrical Products
25498,2022-01-07,149.119995,149.729996,145.089996,145.149994,2050643.0,NYSE,A,2022-01-08 03:00:41,5.0,Agilent Technologies Inc. Common Stock,United States,1999.0,Capital Goods,Electrical Products


# Model Implementing

In [14]:
def impl_linear_reg():
    ## 1 Data Loading & Preprocessing ##
    df_3=joblib.load('/home/ubuntu/data/stock_price_pred/ALL_STOCK_L60D.pkl')
    print(df_3.shape)
    print(df_3.Date.max())
    
    stock_mapping=pd.read_sql("""SELECT * 
                                 FROM STOCK_PRED.NYSE_NASDAQ_TICKERS
                              """,con=engine)
    
    
    ## 2 Modeling Implementing ##
    linear_reg_sum=df_3.groupby(['Stock']).apply(linear_reg_analysis_for).reset_index(drop=False)
    print(linear_reg_sum.shape)
    
    ## 3. Processing ##
    # 3.1 Add new columns
    linear_reg_sum.loc[:,'WT_Coef']=linear_reg_sum.R_squared*linear_reg_sum.Coef
    linear_reg_sum.loc[:,'Model_date']=datetime.now(tz=pytz.utc).astimezone(timezone('US/Pacific'))
    
    # 3.2 Additional tables
    stock_strt_end_price=df_3.groupby('Stock').agg(start_price=('Close','first'),
                                                   end_price=('Close','last')
                                                  ).reset_index(drop=False)
    linear_reg_sum_2=pd.merge(linear_reg_sum,
                              stock_strt_end_price,
                              how='left',
                              on='Stock'
                             ).assign(growth_rate=lambda x:(x.end_price-x.start_price)/x.start_price)
    
    linear_reg_sum_2=pd.merge(linear_reg_sum_2,
                              stock_mapping.loc[:,['Symbol','Name','Industry','SE']],
                              how='left',
                              left_on='Stock',
                              right_on='Symbol')
    
    linear_reg_sum_2.drop('level_1',axis=1,inplace=True)
    
    # 3.3 Reorder columns
    linear_reg_sum_2=linear_reg_sum_2.loc[:,['Model_date',
                                             'Stock','Name','Industry',
                                             'R_squared','Coef','P_values','WT_Coef',
                                             'Start_Date','End_Date','start_price','end_price',
                                             'Num_records_dist','growth_rate']]
    
    print(linear_reg_sum_2.shape)
    
    
    ## 4. Load to MySQL ##
    linear_reg_sum_2.to_sql(name='LINEAR_REG_L40',
                            con=engine,
                            if_exists='append',
                            index=False,
                            chunksize=1000)
    
    print('{0:,.0f} records created; max stock price date: {1:s}; model implemented date: {2:s}'.format(linear_reg_sum_2.shape[0],
                                                                                                        linear_reg_sum_2.End_Date.max().strftime('%Y-%m-%d'),
                                                                                                        linear_reg_sum_2.Model_date.min().strftime('%Y-%m-%d %H:%M:%S')
                                                                                                       ))
    
    
    
    
    
    
    

In [15]:
impl_linear_reg()

(222553, 15)
2022-03-15 00:00:00
(5612, 9)
(5612, 14)
5,612 records created; max stock price date: 2022-03-15; model implemented date: 2022-03-15 21:52:56
