<a href="https://colab.research.google.com/github/BClarke94/MSCAI/blob/main/B9AI108_RobertClarke_CA2.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

**Data Aquisition Pipeline**

Robert Clarke 10596311

https://colab.research.google.com/drive/1_2LHqX-MpuzyfxiGUK3XEwj2PfZ6bqSA?usp=sharing

Goal of data aquisition pipeline is to gather data on 15minute stock price's for use later. Adding in technical indicators to the data on a rolling basis for that 15 minute window and proceeding to update a GFS MYSQL database for later retrieval. 

Using mysql-connector for database interactions

In [None]:
#downloading latest version of mysql-connector-python
%%sh
pip install mysql-connector-python



Data Aquisition section of the project

In [None]:
#Imports and defining user request for desired stock data
import numpy as np
import pandas as pd
pd.set_option('display.max_columns', None)
import csv
import requests


def get_ticker():
  print('Please enter stock tickers seperated by a space:')
  print('Maximum of 2 at a time:')
  print('e.g. MSFT TSLA: ')
  global ticker_list
  ticker_list = input()
  ticker_list = ticker_list.split()
  return ticker_list

In [None]:
#Verifying that the user input is strictly alphabetical and is less than a certain number of characters 

get_ticker()
while len(ticker_list) > 2:
  print("Please enter a valid number of stock tickers: ")
  get_ticker()

for i in ticker_list:
  if (len(i) > 6 or i.isalpha() == False):
    print(i, "Not a valid stock name removing from list")
    ticker_list.remove(i)

Please enter stock tickers seperated by a space:
Maximum of 2 at a time:
e.g. MSFT TSLA: 
MSFT NVDA


In [None]:
#Code to get 15 minute time frame data on a stock of user choice

#API only allows 15 minute calls for 1 month at a time, getting the previous 60 days worth of data using the following timeFrame list 
#Can easily add or retract dates and times in case more are needed however free rates of the api usage still apply and as such more is not reccomended 
timeFrame = ['year1month1', 'year1month2']
def get_stock_data(ticker):
    my_list = []
    with requests.Session() as s:
      #Reading in the code and converting it into a list that can be used for creating a dataframe
      for i in timeFrame:
          download = s.get('https://www.alphavantage.co/query?function=TIME_SERIES_INTRADAY_EXTENDED&symbol=' + ticker + '&interval=15min&slice='+ i +'&apikey=33ELI0Y2XQJY92EA')    
          decoded_content = download.content.decode('utf-8')
          cr = csv.reader(decoded_content.splitlines(), delimiter=',')
          cr =list(cr)
          my_list.append(cr)
      flat_list = [item for sublist in my_list for item in sublist]
    return flat_list    

In [None]:
#Defining the data preperation step. Adding in technical indicators for a more thorough evaluation of the market line by line
def dataPrep(api_list):
  df = pd.DataFrame(api_list)
  df.columns = df.iloc[0]
  delRow = df.loc[df['time'] == 'time']
  #Removing the column headers that get passed in through the list
  for row in delRow.index:
    df = df.drop([row, (row+1)])

#Technical indicators include bollinger band and simple moving average for varying time peroids
  df['BollingerBand_Upper'] = df['open'].rolling(10).mean() + 2*df['open'].rolling(10).std()
  df['BollingerBand_Lower'] = df['open'].rolling(10).mean() - 2*df['open'].rolling(10).std()
  df['SimpleMovingAverage50'] = df['open'].rolling(50).mean()
  df['SimpleMovingAverage30'] = df['open'].rolling(30).mean()
  df['SimpleMovingAverage20'] = df['open'].rolling(20).mean()
  df['SimpleMovingAverage10'] = df['open'].rolling(10).mean()
  df["volume"] =pd.to_numeric(df["volume"])

  #The oldest 50 lines will be removed or about a half day's worth of data due to the rolling functionality
  df=df.dropna()

  return df  


#df = dataPrep(flat_list)  

In [None]:
#Creating a dictionary with the different dataframes of data stored under ticker name in a dictionary
#calling the dataPrep and get_stock_data together for updating the dictionary with the relevant data
#User inputed data will be used as a key which is a security concern although as they are limited to alphabetical characters exposure should be limited
ticker_dict = {}
for i in ticker_list:
  ticker_dict[i] = dataPrep(get_stock_data(i))


In [None]:
#Verifying output looks as expected
for key in ticker_dict:
  print(ticker_dict[key])


0                    time           open           high            low  \
51    2021-12-20 07:30:00          319.8          319.8         319.38   
52    2021-12-20 07:15:00          319.8          319.9          319.2   
53    2021-12-20 07:00:00         319.36         319.49         318.81   
54    2021-12-20 06:45:00          319.3          319.3          319.3   
55    2021-12-20 06:30:00         318.98         318.98         318.98   
...                   ...            ...            ...            ...   
2528  2021-10-22 05:15:00  310.282722082  310.282722082  310.232813328   
2529  2021-10-22 05:00:00  310.232813328  310.582174604  310.232813328   
2530  2021-10-22 04:45:00  310.152959322  310.152959322  310.152959322   
2531  2021-10-22 04:30:00   310.13299582   310.13299582  309.893433802   
2532  2021-10-22 04:15:00  309.703780538  309.703780538  309.703780538   

0             close  volume  BollingerBand_Upper  BollingerBand_Lower  \
51           319.65   16688           

**Uploading the data to MYSQL version 5.7 on GFS**

In [None]:
#Defining the connection parameters for connecting to GFS MYSQL database 
#note that SSL Tokens which have been provided must be added into ssl folder or the path must be adapted 
#for remote login to the database 
import mysql.connector
from mysql.connector.constants import ClientFlag

config = {
    'user': 'root',
    'password': 'Bobby@Clarke1',
    'host': '34.142.43.249',
    'client_flags': [ClientFlag.SSL],
    'ssl_ca': 'ssl/server-ca.pem',
    'ssl_cert': 'ssl/client-cert.pem',
    'ssl_key': 'ssl/client-key.pem'
}
try:
  cnxn = mysql.connector.connect(**config)   
except :
  print('There was an error with login please check password and username or SSL Certs are correctly implemented')

In [None]:
#Used to create the stock table in the database initially, unnecessary to recreate as all data will be 
#stored on tables within this database
"""cursor = cnxn.cursor()
for i in ticker_list:
  cursor.execute('CREATE DATABASE stockdata' )
cnxn.close()"""

"cursor = cnxn.cursor()\nfor i in ticker_list:\n  cursor.execute('CREATE DATABASE stockdata' )\ncnxn.close()"

In [None]:
#Updating Config to include the database we are targetting 
config['database'] = 'stockdata'

In [None]:
#Setting up cursor and connection using already defined config 
cnxn = mysql.connector.connect(**config)
cursor = cnxn.cursor(buffered=True)


In [None]:
#Cleaning some old data and can be used in case of table needing to be dropped

#cursor.execute("DROP TABLE IF EXISTS NVDA;")
#cnxn.commit()


In [None]:
#Creating a table named after the ticker for each value of ticker_list
for i in ticker_list:
  try:
      cursor.execute("CREATE TABLE " + i + " ("
                  "time VARCHAR(255),"
                  "open VARCHAR(255),"
                  "high VARCHAR(255),"
                  "low VARCHAR(255),"
                  "close VARCHAR(255),"
                  "volume VARCHAR(255),"
                  "BollingerBand_Upper VARCHAR(255),"
                  "BollingerBand_Lower VARCHAR(255),"
                  "SimpleMovingAverage50 VARCHAR(255), "
                  "SimpleMovingAverage30 VARCHAR(255), "
                  "SimpleMovingAverage20 VARCHAR(255), "
                  "SimpleMovingAverage10 VARCHAR(255) )")

      cnxn.commit()
  except:
    print("Something went wrong with " + i +" continuing through the list")
    print("Please check if table exists")
    pass

In [None]:
#Showing tables to ensure that creation was succussful or that the tables already existed
cursor.execute("SHOW TABLES")
cnxn.commit()
for table_name in cursor:
   print(table_name)
   


('AAPL',)
('MSFT',)
('NVDA',)
('WDAY',)


In [None]:
#Adding in a check to see if the latest stock data matches the last update time for database data
#Then removing any that have the same last update 
to_be_removed = []
for i in ticker_list:
  cursor.execute("SELECT * FROM " + i +" LIMIT 1;")
  result = cursor.fetchall()
  last_df_time = ticker_dict[i]['time'].iloc[0]
  last_db_time=result[0][0]
  if (last_df_time == last_db_time):
    print(i + " data is up to date removing from update list")
    to_be_removed.append(i)
for i in to_be_removed:
  ticker_list.remove(i)
if len(ticker_list) == 0: 
  print('All stock data up to date, please refrain from updating the database')

All stock data up to date, please refrain from updating the database


In [None]:
print(ticker_list)

['MSFT', 'NVDA']


In [None]:
#Updating the database itself with the data gathered from the API call

for i in ticker_list:
  query = ("INSERT INTO " + i + " ( time, open, high, low, close, volume, BollingerBand_Upper, BollingerBand_Lower, SimpleMovingAverage50, SimpleMovingAverage30, SimpleMovingAverage20, SimpleMovingAverage10)"
          "VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s)")
  string_dataframe = ticker_dict[i].astype(str)
  for line in string_dataframe.to_records(index=False):
    line=list(line)
    cursor.execute(query, line)
  cnxn.commit()  
  print("Records inserted in " + i + " table")

Records inserted in MSFT table
Records inserted in NVDA table


In [None]:
#Printing the output of the table updaet
for i in ticker_list:
  cursor.execute("SELECT * FROM " + i +" LIMIT 5;")
  result = cursor.fetchall()
  print(i)
  for x in result:
    print(x)