## Update Stock Prices and Corporate Actions for each new trading day

This notebook includes modules to update the stock prices, make adjustments to historical data for splits and dividends, and run Point & Figure calculations on the new price data, all in one script

In [None]:
# Import credentials

import json
f = open("/. .<your file path here> . . /credentials.json")
credentials = json.load(f)

file_path = list(credentials.values())[0]
intrinio_key = list(credentials.values())[1]
aws_key = list(credentials.values())[3]
aws_secret_key = list(credentials.values())[4]
rds_host = list(credentials.values())[5]
rds_user = list(credentials.values())[6]
rds_password = list(credentials.values())[7]
rds_database = list(credentials.values())[8]
rds_charset = list(credentials.values())[9]


In [None]:
# Import Intrinio libraries

import time
import intrinio_sdk as intrinio
from intrinio_sdk.rest import ApiException

intrinio.ApiClient().configuration.api_key['api_key'] = intrinio_key

# Import Prefect library

from prefect.triggers import all_successful, all_failed
from prefect import task, Flow
import pendulum
from prefect.schedules import IntervalSchedule
from prefect.schedules.clocks import IntervalClock

# Import the usual Python libraries

from tqdm.notebook import tqdm, trange  # to be used to track progress in loop iterations
import pandas as pd
import numpy as np
import datetime as datetime
from datetime import datetime, date, time, timedelta
import sys

# Import SQL libraries

import mysql.connector 
from mysql.connector import errorcode
from sqlalchemy import create_engine

# Import the AWS libraries

import boto3
from boto3.s3.transfer import TransferConfig
from boto3.s3.transfer import S3Transfer
import io
import pyarrow as pa
import pyarrow.parquet as pq

# Create the AWS client
client = boto3.client(
    's3',
    aws_access_key_id = aws_key,
    aws_secret_access_key = aws_secret_key,
    region_name = 'us-east-1'
)

# Declare the local File Path:

global my_path
my_path = file_path


## Start price update process

In [None]:
# Fetch the last (max) date from the price history column

@task
def get_max_update_date():

    from datetime import datetime, date, time, timedelta

    global lastPriceUpdate
    global td_days
    global todayDate

    mydb = mysql.connector.connect(
      host = rds_host,
      user = rds_user,
      password = rds_password,
      database = rds_database
    )

    mycursor = mydb.cursor()

    mycursor.execute("SELECT MAX(date) FROM price_data_historical")

    myresult = mycursor.fetchall()[0][0].date()

    todayDate = date.today()
    lastPriceUpdate = myresult
    td = todayDate - lastPriceUpdate
    td_days = td.days

    print("The last day that prices were updated was", lastPriceUpdate.strftime('%m/%d/%Y'))
    print("That date was", td_days, "days ago.")

    return lastPriceUpdate, td_days


In [None]:
# Get new data for each ticker to append to the price history table

@task
def get_recent_price_data(lastPriceUpdate, td_days):

    from datetime import datetime, date, time, timedelta

    bad_tickers = []
    
    global df_prices
    global nextDateString
    global df_price_update_total
    df_price_update_total = pd.DataFrame()

    # For each day from the last price update to today, retrieve the new security prices from the Intrinio API.
    for updateDate in tqdm(range(1, td_days+1)):

        nextDate = lastPriceUpdate + timedelta(updateDate)
        nextDateString = nextDate.strftime("%Y-%m-%d")

        identifier = 'USCOMP'
        date = nextDateString
        records = 10000
        next_page = ''

        while next_page != None:

            try:

                response = intrinio.StockExchangeApi().get_stock_exchange_prices(identifier, date=date, page_size=records, next_page=next_page)
                df_prices = pd.DataFrame([x.to_dict() for x in response.stock_prices])

                if df_prices.empty:
                    print("No new prices available for ", nextDate.strftime('%m/%d/%Y'))
                    break

                df_security = df_prices.security.apply(pd.Series)
                df_price_update = pd.concat([df_prices, df_security], axis = 1).drop(['security'], axis = 1)

                df_price_update_total = pd.concat([df_price_update_total, df_price_update], ignore_index = True, axis = 0)

                next_page = response.next_page

            except:
                pass

    # If the API returns new prices, drop any duplicates and securities other than stocks, ADRS and ETFs, then
    # convert the intraperiod flag to a boolean, rename the adj factor column, set the dates to datetime format
    # and reset the column order for uploading to the database.
    
    if len(df_price_update_total) > 0:
    
        df_price_update_total = df_price_update_total.drop_duplicates(subset=['ticker', 'figi', 'date'], keep='last')
        df_price_update_total = df_price_update_total[df_price_update_total.code.isin(['EQS', 'DR', 'ETF'])]
        df_price_update_total.dropna(subset=['figi'], inplace = True)
        df_price_update_total['intraperiod'] = (df_price_update_total['intraperiod'] == 'TRUE').astype(int)
        df_price_update_total = df_price_update_total.rename(columns = {'factor':'adj_factor'})
        df_price_update_total['date'] = pd.to_datetime(df_price_update_total['date'])
        df_price_update_total = df_price_update_total[['ticker', 'figi', 'date', 'open', 'high', 'low', 'close', 
                                                'volume', 'adj_open', 'adj_high', 'adj_low', 'adj_close', 'adj_volume', 
                                                'adj_factor', 'split_ratio', 'change', 'percent_change', 
                                                'fifty_two_week_high', 'fifty_two_week_low', 'intraperiod']]

        print("The initial price update dataframe is retrieved.")
        print("The shape of the price update DF is", df_price_update_total.shape)
        print("The date range in the update DF goes from ", df_price_update_total.date.min().strftime('%m/%d/%Y'), " to ", 
              df_price_update_total.date.max().strftime('%m/%d/%Y'))

    return df_price_update_total

    

In [None]:
# Get historical weighted average diluted shares outstanding for each ticker

def get_latest_shares_out_sdk(myFigi, myTicker):
    
    global shares_out_list
    global shares_out_lists_combined
    
    identifier = myFigi
    tag = 'adjweightedavedilutedsharesos'
    frequency = ''
    type = ''
    start_date = ''
    end_date = ''
    sort_order = 'desc'
    page_size = 2
    next_page = ''

    try:
        response = intrinio.HistoricalDataApi().get_historical_data(identifier, tag, frequency=frequency, type=type, start_date=start_date, end_date=end_date, sort_order=sort_order, page_size=page_size, next_page=next_page)
        shares_out_data = response.historical_data

        shares_out_list = []

        for item in range(len(shares_out_data)):
    
            # Add the ticker and figi values to the results
            dict_item = shares_out_data[item].to_dict()
            dict_item['ticker'] = myTicker
            dict_item['figi'] = myFigi
            shares_out_list.append(dict_item)
            shares_out_lists_combined.extend(shares_out_list)

    except:
        
        # Track any tickers that do not have shares outstanding data available.
        bad_tickers.append(myTicker)
        pass

    time.sleep( 0.5 )
    return shares_out_lists_combined
    


In [None]:
# Alternative method for getting shares oustanding data by using the Intrinio web API instead of their Python SDK.

import requests

def get_latest_shares_out_webapi(myFigi, myTicker):
    
    global dict_item
    global shares_out_list
    global shares_out_lists_combined

    identifier = myFigi
    tag = 'adjweightedavedilutedsharesos'
    pageSize = 2
    apiKey = intrinio_key
    
    try:
        response = requests.get(f"https://api-v2.intrinio.com/historical_data/{identifier}/{tag}?page_size={pageSize}&api_key={apiKey}")

        if response.status_code == 200:
            
            shares_out_list = []
            
            # Add the ticker and figi values to the results
            dict_item = response.json()['historical_data'][0]
            dict_item['ticker'] = myTicker
            dict_item['figi'] = myFigi
            shares_out_list.append(dict_item)
            shares_out_lists_combined.extend(shares_out_list)
            
    except:
        
        # Track any tickers that do not have shares outstanding data available.
        bad_tickers.append(myTicker)
        pass

    time.sleep( 0.5 )
    return shares_out_lists_combined, dict_item

    

In [None]:
# Get shares out data for each ticker.

@task
def get_latest_shares_out_data(df_price_update_total):

    import concurrent.futures

    global df_latest_shares_out
    global shares_out_lists_combined

    df_latest_shares_out = pd.DataFrame()
    bad_tickers = []
    shares_out_lists_combined = []
    
    if len(df_price_update_total) > 0:

        # Grab tickers and figis from the price history DF and drop any figi duplicates that might show up.    
        arg_list = list(df_price_update_total[['figi', 'ticker']].drop_duplicates().to_records(index = False))

        # Use concurrent.futures to use multiple threads to retrieve shares out data.
        with concurrent.futures.ThreadPoolExecutor(max_workers = 10) as executor:
            executor.map(lambda f: get_latest_shares_out_sdk(*f), arg_list)

        # Comvert the shares out array to a dataframe
        df_latest_shares_out = pd.DataFrame(shares_out_lists_combined)

        # Drop any duplicates, make sure the date column is in datetime format, rename the shares column and make
        # sure zeros are nulled out and any negative values are replaced with absolutes.
        
        # comment this statement out if the web api is used for shares out
        df_latest_shares_out = df_latest_shares_out.drop_duplicates(subset=['ticker', 'date'], keep = 'first').copy()
        
        
        df_latest_shares_out['date']= pd.to_datetime(df_latest_shares_out['date'])
        df_latest_shares_out = df_latest_shares_out.rename(columns = {'value':'weighted_avg_shares_out'})
        df_latest_shares_out['weighted_avg_shares_out'] = df_latest_shares_out['weighted_avg_shares_out'].replace(0, np.nan)
        df_latest_shares_out['weighted_avg_shares_out'] = df_latest_shares_out['weighted_avg_shares_out'].abs()

        # Isolate the most recent shares out figures for each ticker
        df_latest_shares_out = df_latest_shares_out[df_latest_shares_out.groupby('ticker')['date'].transform('max') == df_latest_shares_out['date']]
    
    print("The shape of the shares out DF is ", df_latest_shares_out.shape)
    
    return df_latest_shares_out


In [None]:
# Merge the price data with the shares out data to create the final update dataframe.

@task
def create_complete_update_dataframe(df_latest_shares_out, df_price_update_total):
    
    global df_price_update_complete
    
    df_price_update_complete = pd.DataFrame()
    
    if len(df_price_update_total) > 0:
    
        # Use left join to add the shares out data to the stock prices, then calculate market caps and sort by ticker and dates
        df_price_update_complete = df_price_update_total.merge(df_latest_shares_out[['weighted_avg_shares_out', 'ticker', 'figi']], on=['ticker', 'figi'], how='left')
        df_price_update_complete['market_cap'] = df_price_update_complete['adj_close'] * df_price_update_complete['weighted_avg_shares_out']
        df_price_update_complete['date'] = pd.to_datetime(df_price_update_complete['date'])
        df_price_update_complete = df_price_update_complete.sort_values(by = ['ticker', 'date'])

        # Add unique primary key column, last update date, last corporate action date and re-order columns
        df_price_update_complete['key_id'] = df_price_update_complete['ticker'] + df_price_update_complete['figi'] + df_price_update_complete['date'].dt.strftime('%Y-%m-%d')
        df_price_update_complete['last_updated_date'] = todayDate
        df_price_update_complete['last_corp_action_date'] = None
        df_price_update_complete = df_price_update_complete[['key_id', 'ticker', 'figi', 'date', 'open', 'high', 'low', 'close', 'volume',
               'adj_open', 'adj_high', 'adj_low', 'adj_close', 'adj_volume',
               'adj_factor', 'split_ratio', 'change', 'percent_change',
               'fifty_two_week_high', 'fifty_two_week_low', 'market_cap',
               'weighted_avg_shares_out', 'intraperiod', 'last_updated_date', 'last_corp_action_date']]

        print("The shares outstanding are captured and market caps calculated for all tickers that have shares out data available.")
        print("The shape of the new DF is ", df_price_update_complete.shape)
    
    return df_price_update_complete



In [None]:
# Push the price update DF to a CSV file for troubleshooting if needed.

# df_price_update_total.to_csv(path_or_buf = my_path + "/df_price_update_total.csv", index=False)

# df_price_update_complete.to_csv(path_or_buf = my_path + "/df_price_update_complete.csv", index=False)



In [None]:
# Push the dataframe to CSV on S3 if you want to use AWS Lambda to take it from there and push it into 
# the RDS table.

@task
def push_data_to_S3(df_price_update_complete):

    import io
    
    if len(df_price_update_total) > 0:

        # Create the AWS client
        client = boto3.client(
            's3',
            aws_access_key_id = aws_key,
            aws_secret_access_key = aws_secret_key,
            region_name = 'us-east-1'
        )

        myBucket = 'bns-intrinio-data'
        myFileLocation = "price-data-daily/df_price_update_complete_" + nextDateString + ".csv"

        with io.StringIO() as csv_buffer:
            df_price_update_complete.to_csv(csv_buffer, index=False)

            response = client.put_object(
                Bucket = myBucket, Key = myFileLocation, Body=csv_buffer.getvalue()
            )

            status = response.get("ResponseMetadata", {}).get("HTTPStatusCode")

            if status == 200:
                print(f"Successful S3 put_object response. Status - {status}")
            else:
                print(f"Unsuccessful S3 put_object response. Status - {status}")


In [None]:
# Or use SQLAlchemy to push the final dataframe into SQL DB on AWS RDS:

@task(trigger=all_successful)
def push_data_to_RDS(df_price_update_complete):
    
    if len(df_price_update_total) > 0:

        # Set database credentials.
        creds = {'usr': rds_user,
                 'pwd': rds_password,
                 'hst': rds_host,
                 'prt': 3306,
                 'dbn': rds_database}

        # MySQL conection string.
        connstr = 'mysql+mysqlconnector://{usr}:{pwd}@{hst}:{prt}/{dbn}'

        # Create sqlalchemy engine for MySQL connection.
        engine = create_engine(connstr.format(**creds))
        
        

        # Write DataFrame to MySQL using the engine (connection) created above.
        df_price_update_complete.to_sql(name='price_data_historical', 
                                        con=engine, 
                                        if_exists='append', 
                                        index=False
                                        #chunksize = int(len(df_price_update_complete)/10)
                                       )

        print("The new data has been appended to RDS. The number of new rows added is", df_price_update_complete.shape[0])


## Start corporate actions update process

In [None]:
# Fetch the last (max) corporate action date from the price history RDS table

@task
def get_max_corax_date():

    from datetime import datetime, date, time, timedelta
    import mysql.connector

    global lastCoraxUpdate
    global td_days
    global todayDate

    mydb = mysql.connector.connect(
      host = rds_host,
      user = rds_user,
      password = rds_password,
      database = rds_database
    )

    mycursor = mydb.cursor()

    mycursor.execute("SELECT MAX(last_corp_action_date) FROM price_data_historical")

    myresult = mycursor.fetchall()

    lastCoraxUpdate = myresult[0]
    todayDate = date.today()
    td = todayDate - lastCoraxUpdate[0].date()
    td_days = td.days
    
    # Print the number of days since last script run, or if this script is run too soon, exit the script.
    
    if td_days > 0:
        print("The last day that corp actions were updated was", lastCoraxUpdate[0])
        print("That date was", td_days, "day(s) ago.")
    else:
        print("The last corporate action update was today. Try again tomorrow.")
        sys.exit(0)

    return lastCoraxUpdate, td_days


In [None]:
# Fetch the list of adjusted tickers and figis

@task
def get_adjusted_tickers_figis(lastCoraxUpdate, td_days):
    
    global df_adjusted_tickers_total
    global response
    global df_security
    
    df_adjusted_tickers = pd.DataFrame()
    df_adjusted_tickers_total = pd.DataFrame()

    # For each day since the last corporate actions update, fetch all the tickers/figis with recent corp actions.
    for updateDate in tqdm(range(0, td_days)):

        nextDate = lastCoraxUpdate[0] + timedelta(updateDate)
        nextDateString = nextDate.strftime("%Y-%m-%d")

        identifier = 'USCOMP'
        date = nextDateString
        page_size = 10000
        next_page = ''

        try:

            response = intrinio.StockExchangeApi().get_stock_exchange_price_adjustments(identifier, date=date, page_size=page_size, next_page=next_page)

            if len(response.stock_price_adjustments) > 0:

                df_security = pd.DataFrame([x.to_dict() for x in response.stock_price_adjustments]).security.apply(pd.Series)

                # If no new adjustments show up, exit the script.
                if df_security.empty:
                    print("No new adjustments available for ", nextDate.strftime('%m/%d/%Y'))
                    break

                # Filter the data for only stocks, ADRs and ETFs
                df_adjusted_tickers = df_security[df_security['code'].isin(['EQS', 'DR','ETF'])][['ticker', 'figi', 'code']]

                # Add a date column
                df_adjusted_tickers['date'] = nextDateString

                # Get the new split ratios and adjustment factors for each ticker/figi
                df_data = pd.DataFrame([x.to_dict() for x in response.stock_price_adjustments])[['split_ratio', 'factor']]
                df_data['ticker'] = df_security['ticker']

                # Merge the data to a single dataframe
                df_adjusted_tickers = pd.merge(df_adjusted_tickers, df_data, on = 'ticker', how = 'left')

        except:
            pass

        # Add the daily lists to a total adjustments list, sort by date and ticker, and drop any duplicates or NaNs
        df_adjusted_tickers_total = pd.concat([df_adjusted_tickers_total, df_adjusted_tickers], ignore_index = True, axis = 0)
 
    # If there are no tickers to be adjusted, quit the routine, else continue.
    if df_adjusted_tickers.shape[0] == 0:
        print("The number of adjusted EQS, DR or ETF securities is ", df_adjusted_tickers_total.shape[0])
        print("There is no need to proceed further")
        quit()
    else:
        
        df_adjusted_tickers_total = df_adjusted_tickers_total.sort_values(by = ['date', 'ticker'], ascending = True)
        df_adjusted_tickers_total = df_adjusted_tickers_total.drop_duplicates(keep = 'first')
        df_adjusted_tickers_total = df_adjusted_tickers_total.dropna(axis=0)
        df_adjusted_tickers_total = df_adjusted_tickers_total.rename(columns = {'factor':'adj_factor'})

        print("The number of adjusted securities is ", df_adjusted_tickers_total.shape[0])
        print("The date range in the update list DF goes from ", df_adjusted_tickers_total.date.min(), " to ", 
              df_adjusted_tickers_total.date.max())
        
    return df_adjusted_tickers_total


In [None]:
# Fetch the historical adjustment factors and split ratios for the adjusted tickers from the historical price data RDS table

@task
def get_adj_factors_splits(df_adjusted_tickers_total):
    
    import mysql.connector
    
    global df_splits_factors

    figi_list = df_adjusted_tickers_total['figi'].tolist()
    
    mydb = mysql.connector.connect(
      host = rds_host,
      user = rds_user,
      password = rds_password,
      database = rds_database
    )

    mycursor = mydb.cursor()

    mycursor.execute("SELECT date, ticker, figi, adj_factor, split_ratio FROM price_data_historical WHERE figi in" + str(tuple(figi_list)))

    myresult = mycursor.fetchall()

    df_splits_factors = pd.DataFrame(myresult, columns = ['date', 'ticker', 'figi', 'adj_factor', 'split_ratio']).sort_values(by = ['figi', 'date'], ascending = False)

    df_splits_factors = pd.concat([df_splits_factors, df_adjusted_tickers_total[['date', 'ticker', 'figi', 'split_ratio', 'adj_factor']].copy()])
    df_splits_factors['date'] = pd.to_datetime(df_splits_factors['date'])
    df_splits_factors = df_splits_factors.sort_values(by = ['ticker', 'date'], ascending = False)
    df_splits_factors = df_splits_factors.drop_duplicates(keep = 'first')

    print("The shape of the historical splits and adjustments DF is ", df_splits_factors.shape)

    return df_splits_factors


In [None]:
# Get the new historical prices for each ticker

def get_historical_prices(myFigi, myTicker):
    
    global adjusted_prices_list
    global adjusted_prices_total
    
    identifier = myFigi
    start_date = ''
    end_date = todayDate
    frequency = 'daily'
    page_size = 10000
    next_page = ''
    
    adjusted_prices_list = []
    
    while next_page != None:
        
        try:

            response = intrinio.SecurityApi().get_security_stock_prices(identifier, start_date=start_date, end_date=end_date, frequency=frequency, page_size=page_size, next_page=next_page)
            adjusted_prices = [x.to_dict() for x in response.stock_prices]

            for item in range(len(adjusted_prices)):

                # Add ticker and figi to the results
                dict_item = adjusted_prices[item]
                dict_item['ticker'] = response.security.ticker
                dict_item['figi'] = response.security.figi
                adjusted_prices_list.append(dict_item)

        except:

            # Track tickers that do not have any price data available.
            bad_tickers.append(myTicker)        
            pass
        
        next_page = response.next_page
    
    # Return adjusted prices
    adjusted_prices_total.extend(adjusted_prices_list)
    
    return adjusted_prices_total


In [None]:
# Fetch historical prices for adjusted tickers and add some extra calculations to match those in the history table

@task
def get_adjusted_price_data(df_adjusted_tickers_total):

    import concurrent.futures

    global df_adjusted_prices_total
    global arg_list
    global adjusted_prices_total
    global df_prices_postiive

    bad_tickers = []
    adjusted_prices_total = []

    arg_list = list(df_adjusted_tickers_total[['figi', 'ticker']].to_records(index=False))

    # Use concurrent.futures to use multiple threads to retrieve price data.
    with concurrent.futures.ThreadPoolExecutor(max_workers = 10) as executor:
        executor.map(lambda f: get_historical_prices(*f), arg_list)

    # Convert the shares out array to a dataframe, drop any duplicates, set the date columnst to datetime format,
    # combine the prices with split and adjustment factors, the fill forward any null values. Also get rid of any rows
    # with negative prices. (Again, market data is always dirty, LOL!)
    df_adjusted_prices_total = pd.DataFrame(adjusted_prices_total)
    df_adjusted_prices_total['date']= pd.to_datetime(df_adjusted_prices_total['date'])
    df_adjusted_prices_total = df_adjusted_prices_total.drop_duplicates(subset=['ticker', 'date'], keep = 'first')
    df_adjusted_prices_total = pd.merge(df_adjusted_prices_total, df_splits_factors, on = ['date', 'ticker', 'figi'], how = 'left')
    df_adjusted_prices_total[['adj_factor', 'split_ratio']] = df_adjusted_prices_total[['adj_factor', 'split_ratio']].fillna(1)
    df_adjusted_prices_total = df_adjusted_prices_total[(df_adjusted_prices_total[['adj_open', 'adj_high', 'adj_low', 'adj_close']] > 0).all(1)]
  
    # Add change, pct_change, and 52 week high/low columns

    df_adjusted_prices_total['change'] = df_adjusted_prices_total.sort_values('date').groupby(['ticker']).adj_close.diff()
    df_adjusted_prices_total['percent_change'] = df_adjusted_prices_total.sort_values('date').groupby(['figi']).adj_close.pct_change().replace({np.inf: np.nan, -np.inf: np.nan})

    df_adjusted_prices_total['fifty_two_week_high'] = df_adjusted_prices_total.sort_values('date').groupby(['ticker']).adj_close.rolling(window = 260).max().reset_index(0,drop=True)
    df_adjusted_prices_total['fifty_two_week_low'] = df_adjusted_prices_total.sort_values('date').groupby(['ticker']).adj_close.rolling(window = 260).min().reset_index(0,drop=True)

     
    
    
    print("The shape of the historical price data DF is ", df_adjusted_prices_total.shape)
    print("The earliest date is ", df_adjusted_prices_total['date'].min())

    return df_adjusted_prices_total


In [None]:
# Get historical weighted average diluted shares outstanding for each ticker

def get_historical_shares_out(myFigi, myTicker):
    
    global shares_out_list
    global shares_out_lists_combined
    
    identifier = myFigi
    tag = 'adjweightedavedilutedsharesos'
    frequency = ''
    type = ''
    start_date = ''
    end_date = ''
    sort_order = 'desc'
    page_size = 10000
    next_page = ''

    try:
        response = intrinio.HistoricalDataApi().get_historical_data(identifier, tag, frequency=frequency, type=type, start_date=start_date, end_date=end_date, sort_order=sort_order, page_size=page_size, next_page=next_page)
        shares_out_data = response.historical_data

        shares_out_list = []

        for item in range(len(shares_out_data)):
    
            # Add ticker and figi to the results
            dict_item = shares_out_data[item].to_dict()
            dict_item['ticker'] = myTicker
            dict_item['figi'] = myFigi
            shares_out_list.append(dict_item)

    except:
        
        # Track tickers that do not have any shares out data available.
        bad_tickers.append(myTicker)
        pass
        
    
    #return shares_out_list
    shares_out_lists_combined.extend(shares_out_list)
    
    time.sleep( 0.5 )
    return shares_out_lists_combined, shares_out_list


In [None]:
# Get the historical shares outstanding data for all tickers. Since shares out are reported quarterly, resample
# the data to show daily records.

@task
def get_historical_shares_out_data(df_adjusted_tickers_total):

    import concurrent.futures

    global df_hist_shares_out
    global shares_out_lists_combined

    shares_out_list = []
    bad_tickers = []
    shares_out_lists_combined = []

    arg_list = list(df_adjusted_tickers_total[['figi', 'ticker']].to_records(index=False))

    # Use concurrent.futures to use multiple threads to retrieve shares out data.
    with concurrent.futures.ThreadPoolExecutor(max_workers = 10) as executor:
        executor.map(lambda f: get_historical_shares_out(*f), arg_list)

    # Convert the shares out array to a dataframe, drop any duplicates, rename the values column, replace zeros with
    # NaNs, and get rid of any negative shares out numbers.
    df_hist_shares_out = pd.DataFrame(shares_out_lists_combined)
    df_hist_shares_out = df_hist_shares_out.drop_duplicates(subset=['ticker', 'date'], keep = 'first')
    df_hist_shares_out['date'] = pd.to_datetime(df_hist_shares_out['date'])
    df_hist_shares_out = df_hist_shares_out.rename(columns = {'value':'weighted_avg_shares_out'})
    df_hist_shares_out['weighted_avg_shares_out'] = df_hist_shares_out['weighted_avg_shares_out'].replace(0, np.nan)
    df_hist_shares_out['weighted_avg_shares_out'] = df_hist_shares_out['weighted_avg_shares_out'].abs()

    # Set date as index and convert to daily periods. Since shares out are reported quarterly, we need to resample to
    # daily records.
    df_hist_shares_resample = df_hist_shares_out.copy()
    df_hist_shares_resample = df_hist_shares_resample.set_index('date')
    df_hist_shares_resample.index = pd.to_datetime(df_hist_shares_resample.index)
    df_hist_shares_resample = df_hist_shares_resample.groupby('ticker').resample('D', convention = 'end').ffill()
    df_hist_shares_resample = df_hist_shares_resample.droplevel('ticker')
    df_hist_shares_resample = df_hist_shares_resample.reset_index()

    df_hist_shares_out = df_hist_shares_resample.copy()

    print("The shape of the shares out DF is ", df_hist_shares_out.shape)

    return df_hist_shares_out

In [None]:
# Use left join to add shares out to history dataframe and calculate market cap

@task
def combine_transform_adjusted_data(df_adjusted_prices_total, df_hist_shares_out):
    
    global df_adjusted_prices_complete

    df_adjusted_prices_complete = pd.merge(df_adjusted_prices_total, df_hist_shares_out, on=['ticker', 'figi', 'date'], how='left')
    df_adjusted_prices_complete = df_adjusted_prices_complete.sort_values(by = ['ticker', 'date'], ascending = True)

    df_adjusted_prices_complete['weighted_avg_shares_out'] = df_adjusted_prices_complete.groupby('ticker')['weighted_avg_shares_out'].transform(lambda x: x.ffill())
    df_adjusted_prices_complete['market_cap'] = df_adjusted_prices_complete['adj_close'] * df_adjusted_prices_complete['weighted_avg_shares_out']

    # Add last update date and primary key column, reset the data types for each column to be MySQL compliant, 
    # then reset the column order.

    df_adjusted_prices_complete['last_updated_date'] = todayDate
    df_adjusted_prices_complete['last_corp_action_date'] = todayDate
    df_adjusted_prices_complete['date'] = pd.to_datetime(df_adjusted_prices_complete['date'])
    df_adjusted_prices_complete['key_id'] = df_adjusted_prices_complete['ticker'] + df_adjusted_prices_complete['figi'] + df_adjusted_prices_complete['date'].dt.strftime('%Y-%m-%d')
    df_adjusted_prices_complete = df_adjusted_prices_complete.drop_duplicates(subset = ['key_id'], keep = 'first')
    #df_adjusted_prices_complete = df_adjusted_prices_complete.where(df_adjusted_prices_complete.notnull(), None)
    df_adjusted_prices_complete = df_adjusted_prices_complete.convert_dtypes()
    df_adjusted_prices_complete['date'] = df_adjusted_prices_complete['date'].dt.date
    df_adjusted_prices_complete = df_adjusted_prices_complete.astype({'open':'Float32', 'high':'Float32', 'low':'Float32', 'close':'Float32', 'volume':'Int32', 'adj_open':'Float32', 'adj_high':'Float32', 'adj_low':'Float32', 'adj_close':'Float32', 'adj_volume':'Int32', 'adj_factor':'Float32', 'split_ratio':'Int32', 'change':'Float32', 'percent_change':'Float32', 'fifty_two_week_high':'Float32', 'fifty_two_week_low':'Float32'})

    df_adjusted_prices_complete = df_adjusted_prices_complete[['key_id', 'ticker', 'figi', 'date', 'open', 'high', 'low', 'close', 'volume', 'adj_open', 'adj_high', 
                                                               'adj_low', 'adj_close', 'adj_volume', 'adj_factor', 'split_ratio', 'change', 'percent_change', 
                                                               'fifty_two_week_high', 'fifty_two_week_low', 'market_cap', 'weighted_avg_shares_out', 'intraperiod', 
                                                               'last_updated_date', 'last_corp_action_date']]

    print("The complete corp actions DF shape is ", df_adjusted_prices_complete.shape)

    return df_adjusted_prices_complete


In [None]:

# df_adjusted_prices_complete.to_csv(path_or_buf = my_path + "/df_adjusted_prices_complete.csv", index=False)


In [None]:
# Push the dataframe to CSV on S3 for backup and/or archive purposes.

@task
def export_data_to_S3(df_adjusted_prices_complete):

    import io

    myBucket = 'bns-intrinio-data'
    myFileLocation = "price-data-daily/df_adjusted_prices_" + str(todayDate) + ".csv"

    with io.StringIO() as csv_buffer:
        df_adjusted_prices_complete.to_csv(csv_buffer, index=False)

        response = client.put_object(
            Bucket = myBucket, Key = myFileLocation, Body=csv_buffer.getvalue()
        )

        status = response.get("ResponseMetadata", {}).get("HTTPStatusCode")

        if status == 200:
            print(f"Successful S3 put_object response. Status - {status}")
            print("New adjusted history data sucessfully posted to S3.")
        else:
            print(f"Unsuccessful S3 put_object response. Status - {status}")


### Update the History Table with new adjusted data

In [None]:
# Select and save pre-adjusted records in S3 prior to deletion. Just in case we need to undo the operation.

@task
def save_preadjusted_records_to_S3(df_adjusted_prices_complete):

    import mysql.connector
    import io
    
    global save_records_completion_status

    figi_list = df_adjusted_prices_complete['figi'].unique()
    preadjusted_records = []

    mydb = mysql.connector.connect(
      host = rds_host,
      user = rds_user,
      password = rds_password,
      database = rds_database
    )

    def chunker(seq, size):
        return (seq[pos:pos + size] for pos in range(0, len(seq), size))

    for chunk in chunker(figi_list, int(len(figi_list)/10)):

        mycursor = mydb.cursor()

        if len(tuple(chunk)) > 1:
            mycursor.execute("SELECT * FROM price_data_historical WHERE figi IN " + str(tuple(chunk)))
        else:
            mycursor.execute("SELECT * FROM price_data_historical WHERE figi IN ('" + str(chunk[0]) + "')")

        myresult = mycursor.fetchall()
        mycursor.close()
        preadjusted_records.extend(myresult)

    df_preadjusted_records = pd.DataFrame(preadjusted_records, columns = df_adjusted_prices_complete.columns)

    myBucket = 'bns-intrinio-data'
    myFileLocation = "price-data-daily/df_preadjusted_records_" + str(todayDate) + ".csv"

    with io.StringIO() as csv_buffer:
        df_preadjusted_records.to_csv(csv_buffer, index=False)

        response = client.put_object(
            Bucket = myBucket, Key = myFileLocation, Body=csv_buffer.getvalue()
        )

        status = response.get("ResponseMetadata", {}).get("HTTPStatusCode")
        if status == 200:
            print(f"Successful S3 put_object response. Status - {status}")
            print(df_preadjusted_records.shape[0], "Data record(s) saved on S3.")
        else:
            print(f"Unsuccessful S3 put_object response. Status - {status}")
            
    save_records_completion_status = status
    
    return save_records_completion_status


In [None]:
# Delete records to be updated from SQL table. The SQL Connector library does not have a reliable "Upsert" function,
# so we need to delete then replace the data.

@task
def delete_preadjusted_records(save_records_completion_status):

    import mysql.connector
    
    global delete_records_status

    mydb = mysql.connector.connect(
      host = rds_host,
      user = rds_user,
      password = rds_password,
      database = rds_database
    )

    mycursor = mydb.cursor()

    sql_delete_query = "DELETE FROM price_data_historical WHERE figi IN " + str(tuple(df_adjusted_prices_complete['figi'].unique()))

    mycursor.execute(sql_delete_query)

    mydb.commit()

    print(mycursor.rowcount, "Data record(s) deleted.")
    delete_records_status = "Deleted."
    
    return delete_records_status



In [None]:
# Insert new updated records in the SQL history table.

@task
def insert_new_records(delete_records_status):

    import pymysql.cursors
    
    global insert_records_status

    rowCount = 0

    connection = pymysql.connect(host = rds_host,
                                 user = rds_user, 
                                 password = rds_password, 
                                 database = rds_database,
                                 charset = rds_charset,
                                 cursorclass=pymysql.cursors.DictCursor)

    mycursor = connection.cursor()

    sql_insert_query = """
    INSERT INTO price_data_historical 
    VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s)
    """

    df = df_adjusted_prices_complete.copy()

    def chunker(seq, size):
        return (seq[pos:pos + size] for pos in range(0, len(seq), size))

    for chunk in tqdm(chunker(df, int(len(df)/10))):

        data = chunk.values.tolist()
        mycursor.executemany(sql_insert_query, data)
        connection.commit()
        rowCount = rowCount + mycursor.rowcount

    print(rowCount, "Data records inserted.")
    insert_records_status = "Inserted. Done."

    
    return insert_records_status


## Start the P&F update process

In [None]:
# Fetch the last (max) date from the Point & Figure history table

@task
def get_max_pnf_date():

    from datetime import datetime, date, time, timedelta

    global lastPnFUpdate
    global td_days
    global todayDate

    mydb = mysql.connector.connect(
      host = rds_host,
      user = rds_user,
      password = rds_password,
      database = rds_database
    )

    mycursor = mydb.cursor()

    mycursor.execute("SELECT MAX(date) FROM base_pnf_data_historical")

    myResultDate = mycursor.fetchall()[0][0].date()

    todayDate = date.today()       # Save today's date
    lastPnFUpdate = myResultDate   # Save the last trading date from the historical data table
    td = todayDate - lastPnFUpdate # Calculate the number of days since the last trading date
    td_days = td.days              # Save the date difference calculation

    print("The last day that prices were updated was", lastPnFUpdate.strftime('%m/%d/%Y'))
    print("That date was", td_days, "days ago.")

    return lastPnFUpdate, td_days, todayDate


In [None]:
# Get new data from the Price History table for each ticker to append to the P&F history table.

@task
def get_price_data(todayDate, lastPnFUpdate):

    from datetime import datetime, date, time, timedelta

    bad_tickers = []
    
    global df_price_data
    global nextDateString
    global df_price_update_total
    global myResultData
    
    df_price_update_total = pd.DataFrame()

    # For each day from the last price update to today, retrieve the new security prices from the Price History table.
    
    mydb = mysql.connector.connect(
      host = rds_host,
      user = rds_user,
      password = rds_password,
      database = rds_database
    )

    mycursor = mydb.cursor()
    
    mycursor.execute("SELECT * FROM price_data_historical WHERE date BETWEEN '" + lastPnFUpdate.strftime('%Y-%m-%d') + \
                     "' AND '" + todayDate.strftime('%Y-%m-%d') + "';")
    
    myResultData = mycursor.fetchall()

    columns = ['key_id', 'ticker', 'figi', 'date', 'open', 'high', 'low', 'close', 'volume', 'adj_open', 'adj_high', 
               'adj_low', 'adj_close', 'adj_volume', 'adj_factor', 'split_ratio', 'change', 'percent_change', 
               'fifty_two_week_high', 'fifty_two_week_low', 'market_cap', 'weighted_avg_shares_out', 'intraperiod', 
               'last_updated_date', 'last_corp_action_date']

    df_price_data = pd.DataFrame(myResultData, columns = columns)  # Save the records from the price history table
                                                                   # that we will apply P&F calculations to.
    
    # Add columns for Plot Symbol, Reversal, Signal Name and Percent Change and other P&F calcs
    
    df_price_data['plot_symbol'] = np.nan
    df_price_data['reversal'] = 0
    df_price_data['signal_name'] = np.nan
    df_price_data['high_point'] = np.nan
    df_price_data['last_high_point'] = np.nan
    df_price_data['prev_high_point'] = np.nan
    df_price_data['low_point'] = np.nan
    df_price_data['last_low_point'] = np.nan
    df_price_data['prev_low_point'] = np.nan
    df_price_data['entry_x'] = np.nan
    df_price_data['entry_o'] = np.nan
    df_price_data['next_entry'] = np.nan
    df_price_data['stop_loss'] = np.nan
    df_price_data['target_price'] = np.nan

    # Reorder the columns
    
    df_price_data = df_price_data[['key_id', 'date', 'figi', 'ticker', 'open', 'high', 'low', 'close', 'change', 'percent_change', 'volume', 'plot_symbol', 'reversal', 
                 'signal_name', 'high_point', 'last_high_point', 'prev_high_point', 'low_point', 'last_low_point', 'prev_low_point', 'entry_x', 'entry_o', 
                 'next_entry', 'stop_loss', 'target_price', 'last_updated_date', 'last_corp_action_date']]

    print("The shape of the new price data DF is", df_price_data.shape)
    
    return df_price_data


In [None]:
# Get last record for each stock from the historical database and append them to the update DF.

@task
def get_last_records(lastPnFUpdate):

    from datetime import datetime, date, time, timedelta

    global df_last_records

    mydb = mysql.connector.connect(
      host = rds_host,
      user = rds_user,
      password = rds_password,
      database = rds_database
    )

    mycursor = mydb.cursor()

    mycursor.execute("SELECT * FROM base_pnf_data_historical WHERE date = '" + lastPnFUpdate.strftime('%Y-%m-%d') + "'")

    myresult = mycursor.fetchall()
    
    myColumns = ['key_id', 'date', 'figi', 'ticker', 'open', 'high', 'low', 'close', 'change', 'percent_change', 'volume', 'plot_symbol', 'reversal', 
                 'signal_name', 'high_point', 'last_high_point', 'prev_high_point', 'low_point', 'last_low_point', 'prev_low_point', 'entry_x', 'entry_o', 
                 'next_entry', 'stop_loss', 'target_price', 'last_updated_date','last_corp_action_date']
    
    df_last_records = pd.DataFrame(myresult, columns = myColumns) # Save the P&F records from the last trading date
    
    print("The shape of the last active records DF is", df_last_records.shape)

    return df_last_records


In [None]:
# Join the new price data with the last active records from the history database to get the starting P&F values
# for the new data.

@task
def join_records(df_price_data, df_last_records):
    
    global df_pnf_update

    df_pnf_update = pd.concat([df_price_data, df_last_records])
    df_pnf_update.sort_values(by = ['date', 'ticker', 'plot_symbol'], inplace = True)
    df_pnf_update.drop_duplicates(subset=['key_id'], keep = 'first', inplace = True)
    df_pnf_update.sort_values(by = ['ticker', 'date'], inplace = True)
    
    print("The shape of the new combined DF is", df_pnf_update.shape)
    
    return df_pnf_update


In [None]:
def generate_pnf_calcs(myFigi):
    
    boxSize = .02
    reversalBoxes = 3
    reversalAmount = boxSize * reversalBoxes

    new_data_list = []
    
    data = df_pnf_update.loc[df_pnf_update['figi'] == myFigi].copy()
    data.reset_index(drop = True, inplace = True)
    
    # Set all starting High Points and Low Points equal to the last record for each ticker from the historical data table.

    high_point = data['high_point'].iloc[0]
    low_point = data['low_point'].iloc[0]
    last_high_point = data['last_high_point'].iloc[0]
    last_low_point = data['last_low_point'].iloc[0]
    entry_x = data['entry_x'].iloc[0]
    entry_o = data['entry_o'].iloc[0]
    prev_high_point = data['prev_high_point'].iloc[0]
    prev_low_point = data['prev_low_point'].iloc[0]
    target_price = data['target_price'].iloc[0]

    # Start the loop on the second day, loop through each day's close price after that.
    for i in range(1, len(data)):

        if data['plot_symbol'].iloc[i - 1] == 'X':   #If previous Plot Symbol = "X", then:

            if data['close'].iloc[i] >= data['close'].iloc[i - 1]:     #If current price >= previous price, then:
                data.loc[i, 'plot_symbol'] = 'X'        # Today's Plot Symbol = "X".
                data.loc[i, 'signal_name'] = data['signal_name'].iloc[i - 1]    #and copy yesterday's signal to today.

                if data['close'].iloc[i] > high_point:    #And if today's price is higher than the most recent high price, 
                    high_point = data['close'].iloc[i]       #then make today's price the  high price,
                    data.loc[i, 'signal_name'] = data['signal_name'].iloc[i - 1]   #and copy yesterday's signal to today.

                if data['close'].iloc[i] > last_high_point:  #And if today's price is higher than the high point from the last X column,
                    data.loc[i, 'signal_name'] = "BUY"           #then today's signal = "BUY".

            elif data['close'].iloc[i] < high_point * (1 - reversalAmount):     #Else if today's price is less than the previous high times 1 - reversal,
                data.loc[i, 'plot_symbol'] = 'O'                                     #the Plot Symbol reverses to "O",
                low_point = data['close'].iloc[i]                                   #and the  low point is today's price,
                data.loc[i, 'reversal'] = 1                                         #and reversal = 1,
                prev_high_point = last_high_point                                        #and prev_high_point = last_high_point, saving this ValueSignal to use in the Target Price calc below
                last_high_point = high_point                                               #and last_high_point = most recent high point
                entry_o = data['close'].iloc[i - 1]                                 #and entry_o = previous day's closing price, used in next_entry and stop_loss calcs

                if data['close'].iloc[i] < last_low_point:   #And if today's price is lower than the low point from the last O column,
                    data.loc[i, 'signal_name'] = "SELL"          #then today's signal = "SELL".
                else:
                    data.loc[i, 'signal_name'] = data['signal_name'].iloc[i - 1]   #Else copy yesterday's signal to today.

            else:
                data.loc[i, 'plot_symbol'] = 'X'  #Else, Plot Symbol = "X" (price is down but not enough to triger a reversal)
                data.loc[i, 'signal_name'] = data['signal_name'].iloc[i - 1]   #and copy yesterday's signal to today.


        if data['plot_symbol'].iloc[i - 1] == 'O':   #If previous Plot Symbol = "O", then:

            if data['close'].iloc[i] < data['close'].iloc[i - 1]:            #If current price <= previous price, then:
                data.loc[i, 'plot_symbol'] = 'O'         # Today's Plot Symbol = "O".
                data.loc[i, 'signal_name'] = data['signal_name'].iloc[i - 1]

                if data['close'].iloc[i] < low_point:       #And if today's price is lower than the most recent low price, 
                    low_point = data['close'].iloc[i]         #then make today's price the  low price.
                    data.loc[i, 'signal_name'] = data['signal_name'].iloc[i - 1]   #and copy yesterday's signal to today.

                if data['close'].iloc[i] < last_low_point:   #And if today's price is lower than the low point from the last O column,
                    data.loc[i, 'signal_name'] = "SELL"         #then today's signal = "SELL".


            elif data['close'].iloc[i] > low_point * (1 + reversalAmount):       #Else if today's price is greater than the previous high, times 1 + reversal,
                data.loc[i, 'plot_symbol'] = 'X'                                       #the Plot Symbol reverses to "X",
                high_point = data['close'].iloc[i]                                    #and the  high point is today's price,
                data.loc[i, 'reversal'] = 1                                           #and reversal = 1,
                prev_low_point = last_low_point                                            ##and prev_low_point = last_low_point, saving this ValueSignal to use in the Target Price calc below
                last_low_point = low_point                                                   #and last_low_point = most recent low point
                entry_x = data['close'].iloc[i - 1]                                   #and entry_x = previous day's closing price, used in next_entry and stop_loss calcs

                if data['close'].iloc[i] > last_high_point:  #And if today's price is higher than the high point from the last X column,
                    data.loc[i, 'signal_name'] = "BUY"          #then today's signal = "BUY".

                else:
                    data.loc[i, 'signal_name'] = data['signal_name'].iloc[i - 1]     #Else copy yesterday's signal to today.

            else:
                data.loc[i, 'plot_symbol'] = 'O'  #Else, Plot Symbol = "O" (price is up but not enough to triger a reversal)
                data.loc[i, 'signal_name'] = data['signal_name'].iloc[i - 1]   #and copy yesterday's signal to today.

        data.loc[i, 'high_point'] = high_point            #high_point = current "high_point"
        data.loc[i, 'low_point'] = low_point             #low_point = current "low_point"
        data.loc[i, 'last_high_point'] = last_high_point  #last_high_point = current "last_high_point"
        data.loc[i, 'last_low_point'] = last_low_point    #last_low_point = current "last_low_point"
        data.loc[i, 'prev_high_point'] = prev_high_point  #prev_high_point = current "prev_high_point"
        data.loc[i, 'prev_low_point'] = prev_low_point    #prev_low_point = current "prev_low_point"

        if data['signal_name'].iloc[i] == "BUY":

            next_entry = entry_o * (1 + boxSize)         #Set next_entry at one box up from the price at the last reversal from X to O, which should be near the top of the previous X column
            data.loc[i, 'next_entry'] = next_entry
            stop_loss = entry_x * (1 - boxSize)          #Set the stop_loss at one box down from the price at the last reversal from O to X, which should be near the bottom of the previous O column
            data.loc[i, 'stop_loss'] = stop_loss

            if data['signal_name'].iloc[i - 1] == "SELL":
                target_price = ((last_high_point - prev_low_point) * reversalBoxes) + prev_low_point   #Upon reversal from SELL to BUY, set the target_price equal to the size of the previous X column,
                                                                                                # times the box size, added to the bottom of the previous X column. Once calculated, it does not
                                                                                                # change for the balance of the current BUY signal.
            data.loc[i, 'target_price'] = target_price

        else:
            next_entry = entry_x * (1 - boxSize)         #Set next_entry at one box down from the price at the last reversal from O to X, which should be near the bottom of the previous O column
            data.loc[i, 'next_entry'] = next_entry
            stop_loss = entry_o * (1 + boxSize)          #Set the stop_loss at one box up from the price at the last reversal from X to O, which should be near the top of the previous X column
            data.loc[i, 'stop_loss'] = stop_loss

            if data['signal_name'].iloc[i - 1] == "BUY":
                target_price = prev_high_point - ((prev_high_point - last_low_point) * reversalBoxes)  #Upon reversal from BUY to SELL, set the target_price equal to the size of the previous O column,
                                                                                                # times the box size, subtracted from the top of the previous O column. Once calculated, it does not
                                                                                                # change for the balance of the current SELL signal.
            data.loc[i, 'target_price'] = target_price
            
        data.loc[i, 'entry_x'] = entry_x            #entry_x = current "entry_x"
        data.loc[i, 'entry_o'] = entry_o            #entry_o = current "entry_o"

    data_list = data.values.tolist()
    new_data_list.extend(data_list)
    
    return new_data_list


In [None]:
# Run all the calculations and prepare final dataframe.

@task
def run_all_calcs(df_pnf_update):
    
    global df_pnf_update_load

    import multiprocessing
    from multiprocessing import Pool

    import time
    start_time = time.time()
    new_data_list = []

    figi_list = df_pnf_update['figi'].unique().tolist() # Get the list of FIGI codes to run the calculations against.

    p = Pool()
    result = p.map(generate_pnf_calcs, figi_list)  # Use multiprocessing pool to spread the work over all available chip cores
    p.close()
    p.join()

    end_time = time.time()
    elapsed_time = end_time - start_time

    print("Elapsed time was", round(elapsed_time/60, 2), "minutes.")

    new_data_list = []

    for i in range(0, len(figi_list)):  # Convert the MP pool results to a list of values
        data_list = result[i]
        new_data_list.extend(data_list)

    myColumns = ['key_id', 'date', 'figi', 'ticker', 'open', 'high', 'low', 'close', 'change', 'percent_change', 'volume', 'plot_symbol', 'reversal', 
                 'signal_name', 'high_point', 'last_high_point', 'prev_high_point', 'low_point', 'last_low_point', 'prev_low_point', 'entry_x', 'entry_o', 
                 'next_entry', 'stop_loss', 'target_price', 'last_updated_date','last_corp_action_date']

    df_pnf_data = pd.DataFrame(new_data_list, columns = myColumns)  # Save the pool results list to a dataframe

    # Save the dataframe to a CSV file in case you need to refer to it later.
    df_pnf_data.to_csv(path_or_buf = my_path + "/df_pnf_data_update_" + todayDate.strftime('%Y-%m-%d') + ".csv", index=False)

    print("The intermediate dataframe shape is ", df_pnf_data.shape)
    
    # Make sure the date column is in datetime format and remove the records from the last trading day so there is no overlap with the database.
    df_pnf_update_load = df_pnf_data.copy()  
    df_pnf_update_load['date'] = pd.to_datetime(df_pnf_update_load['date'])
    df_pnf_update_load['last_updated_date'] = pd.to_datetime(df_pnf_update_load['date'].max()).normalize()
    df_pnf_update_load = df_pnf_update_load[df_pnf_update_load['date'] != lastPnFUpdate.strftime('%Y-%m-%d')]
    
    print("The shape of the dataframe to load is ", df_pnf_update_load.shape)
    
    # Confirm that the date range for the new data is what you expect to see.
    startDate = df_pnf_update_load['date'].min().strftime('%Y-%m-%d')
    endDate = df_pnf_update_load['date'].max().strftime('%Y-%m-%d')
    print("The date range of the dataframe to load goes from ", startDate, " to ", endDate)
    
    return df_pnf_update_load


In [None]:
# Push the dataframe to CSV on S3 if you want to use AWS Lambda to take it from there and push it into 
# the RDS table.

@task
def push_pnf_data_to_S3(df_pnf_update_load):

    import io
    
    if len(df_pnf_update_load) > 0:

        # Create the AWS client
        client = boto3.client(
            's3',
            aws_access_key_id = aws_key,
            aws_secret_access_key = aws_secret_key,
            region_name = 'us-east-1'
        )

        myBucket = 'bns-intrinio-data'
        myFileLocation = "price-data-daily/df_pnf_update_load_" + todayDate.strftime('%Y-%m-%d') + ".csv"

        with io.StringIO() as csv_buffer:
            df_pnf_update_load.to_csv(csv_buffer, index=False)

            response = client.put_object(
                Bucket = myBucket, Key = myFileLocation, Body=csv_buffer.getvalue()
            )

            status = response.get("ResponseMetadata", {}).get("HTTPStatusCode")

            if status == 200:
                print(f"Successful S3 put_object response. Status - {status}")
            else:
                print(f"Unsuccessful S3 put_object response. Status - {status}")


In [None]:
# Use SQLAlchemy to push the final dataframe into SQL DB on AWS RDS:

@task
def push_pnf_data_to_RDS(df_pnf_update_load):
    
    if len(df_pnf_update_load) > 0:

        # Set database credentials.
        creds = {'usr': rds_user,
                 'pwd': rds_password,
                 'hst': rds_host,
                 'prt': 3306,
                 'dbn': rds_database}

        # MySQL conection string.
        connstr = 'mysql+mysqlconnector://{usr}:{pwd}@{hst}:{prt}/{dbn}'

        # Create sqlalchemy engine for MySQL connection.
        engine = create_engine(connstr.format(**creds))

        # Write DataFrame to MySQL using the engine (connection) created above.
        df_pnf_update_load.to_sql(name='base_pnf_data_historical', 
                                              con=engine, 
                                              if_exists='append', 
                                              index=False)

        print("The new data has been appended to RDS. The number of new rows added is", df_pnf_update_load.shape[0])
        

## Run the Prefect schedule process

In [None]:
# Set up the daily run schedule.

schedule = IntervalSchedule(
    start_date=pendulum.datetime(2021, 12, 21, 21, 0, 0, tz="America/New_York"),
    interval=timedelta(days=1)
)

In [None]:
# Run the ETL update flow.

if __name__ == "__main__":

    with Flow("Stock-Data-Update-ETL", schedule) as flow:

        # Stock price updates
        get_max_update_date = get_max_update_date()
        get_recent_price_data = get_recent_price_data(lastPriceUpdate, td_days, upstream_tasks=[get_max_update_date])
        get_latest_shares_out_data = get_latest_shares_out_data(df_price_update_total, upstream_tasks=[get_recent_price_data])
        create_complete_update_dataframe = create_complete_update_dataframe(df_latest_shares_out, 
                                                                            df_price_update_total, 
                                                                            upstream_tasks=[get_latest_shares_out_data])
        data_to_s3 = push_data_to_S3(df_price_update_complete, upstream_tasks=[create_complete_update_dataframe])
        data_to_rds = push_data_to_RDS(df_price_update_complete, upstream_tasks=[create_complete_update_dataframe])
        
        # Corp actions updates
        get_max_corax_date = get_max_corax_date(upstream_tasks=[data_to_rds])
        get_adjusted_tickers_figis = get_adjusted_tickers_figis(lastCoraxUpdate, td_days, upstream_tasks=[get_max_corax_date])
        get_adj_factors_splits = get_adj_factors_splits(df_adjusted_tickers_total,upstream_tasks=[get_adjusted_tickers_figis])
        get_adjusted_price_data = get_adjusted_price_data(df_adjusted_tickers_total, upstream_tasks=[get_adjusted_tickers_figis])
        get_historical_shares_out_data = get_historical_shares_out_data(df_adjusted_tickers_total, upstream_tasks=[get_adjusted_price_data])
        combine_transform_adjusted_data = combine_transform_adjusted_data(df_adjusted_prices_total, df_hist_shares_out, 
                                                                          upstream_tasks=[get_historical_shares_out_data])
        export_data_to_S3 = export_data_to_S3(df_adjusted_prices_complete, upstream_tasks=[combine_transform_adjusted_data])
        save_preadjusted_records_to_S3 = save_preadjusted_records_to_S3(df_adjusted_prices_complete, upstream_tasks=[export_data_to_S3])
        delete_preadjusted_records = delete_preadjusted_records(save_records_completion_status, upstream_tasks=[save_preadjusted_records_to_S3])
        insert_new_records = insert_new_records(delete_records_status, upstream_tasks=[delete_preadjusted_records])

        # Point & Figure data update
        get_max_pnf_date = get_max_pnf_date(upstream_tasks=[insert_new_records])
        get_price_data = get_price_data(todayDate, lastPnFUpdate, upstream_tasks=[get_max_pnf_date])
        get_last_records = get_last_records(lastPnFUpdate, upstream_tasks=[get_price_data])
        join_records = join_records(df_price_data, df_last_records, upstream_tasks=[get_last_records])
        run_all_calcs = run_all_calcs(df_pnf_update, upstream_tasks=[join_records])

        push_pnf_data_to_S3 = push_pnf_data_to_S3(df_pnf_update_load,upstream_tasks=[run_all_calcs])
        push_pnf_data_to_RDS = push_pnf_data_to_RDS(df_pnf_update_load, upstream_tasks=[run_all_calcs])
        
    flow.set_reference_tasks([data_to_rds])


In [None]:
flow.run()

## Extra or alternative functions if needed

In [None]:
insert_new_records.run(delete_records_status)

In [None]:
# Test Price Update ETL process.

import time
start_time = time.time()

# Stock price updates
get_max_update_date.run()
get_recent_price_data.run(lastPriceUpdate, td_days)
get_latest_shares_out_data.run(df_price_update_total)
create_complete_update_dataframe.run(df_latest_shares_out, df_price_update_total)
push_data_to_S3.run(df_price_update_complete)
push_data_to_RDS.run(df_price_update_complete)

# Corporate actions updates
get_max_corax_date.run()
get_adjusted_tickers_figis.run(lastCoraxUpdate, td_days)
get_adj_factors_splits.run(df_adjusted_tickers_total)
get_adjusted_price_data.run(df_adjusted_tickers_total)
get_historical_shares_out_data.run(df_adjusted_tickers_total)
combine_transform_adjusted_data.run(df_adjusted_prices_total, df_hist_shares_out)
export_data_to_S3.run(df_adjusted_prices_complete)
save_preadjusted_records_to_S3.run(df_adjusted_prices_complete)
delete_preadjusted_records.run(save_records_completion_status)
insert_new_records.run(delete_records_status)

# P&F data update
get_max_pnf_date.run()
get_price_data.run(todayDate, lastPnFUpdate)
get_last_records.run(lastPnFUpdate)
join_records.run(df_price_data, df_last_records)
run_all_calcs.run(df_pnf_update)
push_pnf_data_to_S3.run(df_pnf_update_load)
push_pnf_data_to_RDS.run(df_pnf_update_load)


end_time = time.time()
elapsed_time = end_time - start_time
print("Elapsed time was", round(elapsed_time/60, 2), "minutes.")


In [None]:
# Troubleshoot get shares out.

arg_list = list(df_price_update_total[['figi', 'ticker']].drop_duplicates().to_records(index = False))[0:500]

global shares_out_list
global shares_out_lists_combined

shares_out_lists_combined = []
bad_tickers = []

#identifier = myFigi
tag = 'adjweightedavedilutedsharesos'
frequency = ''
type = ''
start_date = ''
end_date = ''
sort_order = 'desc'
page_size = 2
next_page = ''

for item in arg_list:
    
    myFigi = item[0]
    myTicker = item[1]
    identifier = myFigi    
    
    try:
        response = intrinio.HistoricalDataApi().get_historical_data(identifier, tag, frequency=frequency, type=type, start_date=start_date, end_date=end_date, sort_order=sort_order, page_size=page_size, next_page=next_page)
        shares_out_data = response.historical_data

        shares_out_list = []

        for item in range(len(shares_out_data)):

            # Add the ticker and figi values to the results
            dict_item = shares_out_data[item].to_dict()
            dict_item['ticker'] = myTicker
            dict_item['figi'] = myFigi
            shares_out_list.append(dict_item)
            shares_out_lists_combined.extend(shares_out_list)

    except:

        # Track any tickers that do not have shares outstanding data available.
        bad_tickers.append(myTicker)
        pass

    time.sleep( 0.5 )

pd.DataFrame(shares_out_lists_combined)


In [None]:
# Alternative method for inserting new updated records in the SQL history table.

def insert_new_records(df_price_update_complete):

    import pymysql.cursors
    
    global insert_records_status

    rowCount = 0

    connection = pymysql.connect(host = rds_host,
                                 user = rds_user, 
                                 password = rds_password, 
                                 database = rds_database,
                                 charset = rds_charset,
                                 cursorclass=pymysql.cursors.DictCursor)

    mycursor = connection.cursor()

    sql_insert_query = """
    INSERT INTO price_data_historical 
    VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s)
    """

    df = df_price_update_complete.copy()

    def chunker(seq, size):
        return (seq[pos:pos + size] for pos in range(0, len(seq), size))

    for chunk in tqdm(chunker(df, int(len(df)/10))):

        data = chunk.values.tolist()
        mycursor.executemany(sql_insert_query, data)
        connection.commit()
        rowCount = rowCount + mycursor.rowcount

    print(rowCount, "Data records inserted.")
    insert_records_status = "Inserted. Done."

    
    return insert_records_status
