In [1]:
from flask import Flask, render_template, request
import pandas as pd
import json
import plotly
import plotly.express as px
import yfinance as yf
from datetime import datetime, timedelta
import yahooquery as yq
import finnhub
from stocksymbol import StockSymbol
import requests
from pymongo import MongoClient
from pymongo import DESCENDING
from pyspark.sql.functions import col
from pyspark.sql.types import StructType, StructField, DateType, DoubleType
from pyspark.sql import SparkSession
import os
import sys

In [4]:
# Set API keys
finnhub_client = finnhub.Client(api_key="cgp4v61r01qlmgv2d5rgcgp4v61r01qlmgv2d5s0") # Set Finnhub API key
alpha_api_key = "ACBH7HRXH41LYM33" # Set Alpha Vantage API key

# Create an instance of StockSymbol class and get the list of US stock symbols
ss = StockSymbol('833b416e-4456-4eba-a64e-8c55feb10154') # Create an instance of the StockSymbol class with a specific UUID
symbol_list_us = ss.get_symbol_list(market="US") # Get the list of US stock symbols using the get_symbol_list method

# Connect to MongoDB database
uri = "mongodb+srv://rn2578:Qwerty123@stock-dashboard.etjwqzz.mongodb.net/?retryWrites=true&w=majority"
client = MongoClient(uri) # Connect to a local instance of MongoDB
db = client.companies_db # Create a database named companies_db

# Create a Spark environment 
os.environ['PYSPARK_PYTHON'] = sys.executable # Set PYSPARK_PYTHON environment variable to the current Python executable path
os.environ['PYSPARK_DRIVER_PYTHON'] = sys.executable # Set PYSPARK_DRIVER_PYTHON environment variable to the current Python executable path
spark = SparkSession.builder.getOrCreate() # Create a SparkSession instance
sc = spark.sparkContext # Create a Spark context

This function is only called when a company does not exist in our database already. It will fetch the various static company info (that don't change much over time like sector, industry, etc.) using the stock ticker by making api calls to various sources like yahooquery, alphavantage. It will then make transformation on the data like calculate ROI, ROE and ROA and convert it to a uniform dictionary format which is then inserted into the 'company_info' collection.

In [5]:
def getCompanyInfo(symbol, api_key):
    # Retrieve company data from Yahoo Finance using yfinance library
    data = yq.Ticker(symbol).asset_profile[symbol]

    # Handle exceptions in case some company data is missing
    try:
        state = data['state']
    except:
        state = ''
    try:
        address2 = data['address2']
    except:
        address2 = ''

    # Find the short name and long name of the company from the symbol_list_us
    filtered_list = [(company['shortName'], company['longName']) for company in symbol_list_us if company['symbol'] == symbol]
    short_name, long_name = filtered_list[0]

    # Define the URLs to retrieve the financial statement data from Alpha Vantage API
    bs_url = f"https://www.alphavantage.co/query?function=BALANCE_SHEET&symbol={symbol}&apikey={api_key}"
    is_url = f"https://www.alphavantage.co/query?function=INCOME_STATEMENT&symbol={symbol}&apikey={api_key}"
    cf_url = f"https://www.alphavantage.co/query?function=CASH_FLOW&symbol={symbol}&apikey={api_key}"

    # Retrieve the financial statement data from Alpha Vantage API and convert it to pandas DataFrames
    bs_data = requests.get(bs_url).json()
    bs_df = pd.DataFrame(bs_data["annualReports"]).T
    is_data = requests.get(is_url).json()
    is_df = pd.DataFrame(is_data["annualReports"]).T
    cf_data = requests.get(cf_url).json()
    cf_df = pd.DataFrame(cf_data["annualReports"]).T

    # Calculate financial ratios from the financial statement data
    net_income = float(is_df.loc['netIncome',0])
    total_assets = float(bs_df.loc['totalAssets', 0])
    total_equity = float(bs_df.loc['totalAssets', 0])
    cash_flow_operating = float(cf_df.loc['operatingCashflow',0])

    roi = net_income / total_assets
    roa = net_income / total_equity
    roe = cash_flow_operating / total_equity

    # Create a dictionary of company information
    company_dict = {"symbol": symbol, "shortName": short_name, "longName": long_name, "address1": data['address1'], "address2": address2, "city": data['city'], 
                    "state": state, "zip" : data['zip'], "country": data['country'], "website": data['website'], "industry": data['industry'], 
                    "sector": data['sector'], "longBusinessSummary": data['longBusinessSummary'], "ROI": round(roi,3), "ROA": round(roa,3), 
                    "ROE": round(roe,3), "CEO": data['companyOfficers'][0]['name']}

    # Insert the company information dictionary into a MongoDB collection named "company_info"
    collection = db['company_info']
    collection.insert_one(company_dict)
    print(f"Created new entry for {symbol}.")


We use this function to get the dynamic data which changes everyday, like stock prices, volume and news. If the company does not exist, a collection will be created and the corresponsding data for that from 2010 to today would be inserted in the collection. If the company already exists, it will set the start time as 1 day after the last date of entry and end date to today's date and fetch all the data and insert the latest records.

In [6]:
def getDaily(companyName):
    # If the company name is 'FB', change it to 'META'
    if companyName == 'FB':
        companyName = 'META'
        
    # Check if the collection already exists
    if companyName in db.list_collection_names():
        print("The collection already exists.")
        
        # Find the most recent document in the collection
        my_collection = db[companyName]
        most_recent_document = my_collection.find().sort('Date', DESCENDING).limit(1)
        
        # Set the start time for the next data retrieval to one day after the most recent document
        start_time = (most_recent_document[0]['Date'] + timedelta(days=1)).strftime('%Y-%m-%d')
        
        # Set the end time for the next data retrieval to today
        end_time = (datetime.today()).strftime('%Y-%m-%d')
        
        # If start time is equal to end time, return (no need to retrieve data)
        if start_time == end_time:
            return
        
        # Print start and end time for the next data retrieval
        print(start_time, end_time)
        
    else:
        # Create the collection if it doesn't already exist
        db.create_collection(companyName)
        print("The collection has been created.")
        
        # Set the start time for the first data retrieval to January 1st, 2010
        start_time='2010-01-01'
        
        # Set the end time for the first data retrieval to yesterday's date
        end_time = (datetime.today() - timedelta(days=1)).strftime('%Y-%m-%d')
        
    # Try to download the stock data for the specified company and time range
    try:
        stockData = yf.download(companyName, start=start_time, end=end_time)
    except:
        # If the download fails, set the start time to January 1st, 2015 and try again
        start_time='2015-01-01'
        stockData = yf.download(companyName, start=start_time, end=end_time)
    
    try:
        # Reset the index of the stock data
        stockData = stockData.reset_index()
        
        # Keep only the columns for date, adjusted close price, and trading volume
        stockData = stockData.loc[:, ['Date', 'Adj Close', 'Volume']]
        
        # Convert the stock data to a dictionary of records and keep only the most recent record
        json_dict = stockData.to_dict(orient='records')
        for i in json_dict[-1:]:
            
            # Set the start and end dates for the news retrieval to five days before and the date of the most recent stock record, respectively
            end_date = i['Date'].strftime('%Y-%m-%d')
            start_date = (i['Date'] - timedelta(days=5)).strftime('%Y-%m-%d')
            
            # Try to retrieve news articles for the specified company and time range
            try:
                news = finnhub_client.company_news(companyName, _from=start_date, to=end_date)
                
                # Convert the news articles to a pandas DataFrame and keep only the most recent five articles
                df = pd.DataFrame(news[:5])
                
                # Convert the datetime column to a pandas datetime object
                df['datetime'] = df['datetime'].apply(lambda x: datetime.fromtimestamp(x))
                
                # Keep only the columns for datetime, headline, image, summary, and url
                df = df.loc[:, ['datetime', 'headline', 'image', 'summary', 'url']]
                
                # Convert the news articles to a dictionary of records and add them
                i['news'] = df.to_dict(orient='records')
            except:
                i['news'] = [] # If there was an exception, assign an empty list to i['news']
        collection = db[companyName] # Get the collection for the current company name
        if len(json_dict)==1:  # If only one record was inserted into the collection
            collection.insert_many(json_dict) # Insert the record into the collection
            print(f"Latest info for {companyName} updated.") # Print message indicating latest info was updated
        else: # If more than one record was inserted into the collection
            collection.insert_many(json_dict)  # Insert all records into the collection
            print(f"All {companyName} data inserted.")  # Print message indicating all data was inserted
    except:
        return

This function fetches the data from our database and creates a graph of the date vs volume of stock using plotly that is returned on the front-end.

In [7]:
def volume_graph(stock, startdate, enddate):
    # If stock is FB, change to META
    if stock == 'FB':
        stock = 'META'
        
    # Get the collection from MongoDB using the stock name
    collection = db[stock]
    
    # Query to find documents between startdate and enddate
    query = {"Date":{ "$gte":datetime.strptime(startdate, '%Y-%m-%d'), '$lt':datetime.strptime(enddate, '%Y-%m-%d')}}
    
    # Find documents with fields Date and Volume only
    result = collection.find(query, { "_id": 0, "Date": 1, "Volume": 1 })

    #using Pandas
    # Convert query result to a pandas dataframe
    df = pd.DataFrame(result)
    # Compute the maximum and minimum volume values in the DataFrame
    max = (df['Volume'].max())
    min = (df['Volume'].min())

    #using spark
    # Define schema with DateType and DoubleType
    # schema = StructType([
    #      StructField("Date", DateType()),
    #      StructField("Volume", DoubleType())
    #  ])
    # Convert result to a Spark DataFrame using the defined schema
    # df = spark.createDataFrame(result, schema)
    # max = df.agg({"Volume": "max"}).collect()[0][0]
    # min = df.agg({"Volume": "min"}).collect()[0][0]

    # Compute the range and margin to adjust the y-axis range of the plot
    range = max - min
    margin = range * 0.05
    max = max + margin
    min = min - margin

    # Create an area plot using the Plotly Express library
    fig = px.area(df, x='Date', y="Volume",
            hover_data=("Date","Volume"), 
            range_y=(min,max), template="plotly_dark" )

    # Convert the plot to a JSON-encoded string using the PlotlyJSONEncoder class
    graphJSON = json.dumps(fig, cls=plotly.utils.PlotlyJSONEncoder)

    # Return the JSON-encoded plot string
    return graphJSON

This function fetches the data from our database and creates a graph of the date vs adjusted close price of stock using plotly that is returned on the front-end.

In [8]:
# Define function to generate graph data
def close_graph(stock, startdate, enddate):
    
    # Rename stock symbol if it is 'FB'
    if stock == 'FB':
        stock = 'META'
    
    # Connect to database and query for stock data between given dates
    collection = db[stock]    # Connect to MongoDB collection for the given stock symbol
    query = {"Date":{ "$gte":datetime.strptime(startdate, '%Y-%m-%d'), '$lt':datetime.strptime(enddate, '%Y-%m-%d')}}    # Define query to retrieve stock data between given start and end dates
    result = collection.find(query, { "_id": 0, "Date": 1, "Adj Close": 1 })    # Execute query and retrieve date and adjusted close price data for the given stock symbol
    
    #using Pandas
    # Convert query result to a pandas dataframe
    df = pd.DataFrame(result)
    # Compute the maximum and minimum volume values in the DataFrame
    max = (df['Adj Close'].max())
    min = (df['Adj Close'].min())

    #using spark
    # Define schema with DateType and DoubleType
    # schema = StructType([
    #      StructField("Date", DateType()),
    #      StructField("Adj Close", DoubleType())
    #  ])
    # Convert result to a Spark DataFrame using the defined schema
    # df = spark.createDataFrame(result, schema)
    # max = df.agg({"Adj Close": "max"}).collect()[0][0]
    # min = df.agg({"Adj Close": "min"}).collect()[0][0]
    
    # Calculate range and margin for adjusted close prices
    range = max - min
    margin = range * 0.05
    max = max + margin
    min = min - margin
    
    # Generate and customize graph using Plotly Express
    fig = px.area(df, x='Date', y="Adj Close",
        hover_data=("Date","Adj Close"), 
        range_y=(min,max), template="plotly_dark" )
    
    # Convert graph data to JSON format
    graphJSON = json.dumps(fig, cls=plotly.utils.PlotlyJSONEncoder)
    
    # Return graph data as a JSON string
    return graphJSON

This function fetches the latest new data stored in our database when the getNews end point is called from thr front-end.

In [9]:
# Define function to retrieve news articles for a given stock symbol
def getNews(stock):
    
    # Rename stock symbol if it is 'FB'
    if stock == 'FB':
        stock = 'META'
    
    # Connect to MongoDB collection for the given stock symbol and retrieve the latest document
    collection = db[stock]    # Connect to MongoDB collection for the given stock symbol
    last_document = collection.find().sort('Date', DESCENDING).limit(1)    # Sort documents in descending order by 'Date' field and retrieve the latest document
    
    # Retrieve news articles from the latest document
    news = last_document[0]['news']
    
    # Format datetime field in each news article to a more readable format
    for item in news:
        item['datetime'] = item['datetime'].strftime('%Y-%m-%d %H:%M:%S')
    
    # Return list of news articles
    return news

This function ensures that there is no duplicate insertion in the database.

In [10]:
# Define function to check if stock symbol is in the MongoDB collection 'company_info'
def check_db(stock):
    
    # Rename stock symbol if it is 'FB'
    if stock == 'FB':
        stock = 'META'
    
    # Connect to MongoDB collection 'company_info' and check if the stock symbol exists in the collection
    collection = db['company_info']
    query = {"symbol": stock}
    result = collection.find_one(query, { "_id": 0})
    
    # If stock symbol exists in the collection, retrieve its daily stock prices
    if result is not None:
        print(f"{stock} is in the collection")
        getDaily(stock)
    
    # If stock symbol does not exist in the collection, retrieve its company information and daily stock prices
    else:
        print(f"{stock} is not in the collection")
        getCompanyInfo(stock, alpha_api_key)    # Retrieve company information using Alpha Vantage API
        getDaily(stock)    # Retrieve daily stock prices using Alpha Vantage API
        
        # Check if stock symbol now exists in the collection
        query = {"symbol": stock}
        result = collection.find_one(query, { "_id": 0})
    
    # Return the company information for the stock symbol
    return result

In [11]:
# Initialize Flask app and set the path for the static files
app = Flask(__name__, static_folder='static')

# Define route for the root directory
@app.route('/', methods=['GET'])

# Define function for rendering the index.html template with a list of stock symbols
def index():
    return render_template('index.html', symbols = symbol_list_us)

In [12]:
# Define route for displaying a particular endpoint
@app.route('/display/<endpoint>')

# Define function for handling different endpoints
def display(endpoint):   
    if endpoint == "getClose":
        # If endpoint is 'getClose', return the result of the 'close_graph' function with the specified stock and date range
        return close_graph(request.args.get('stock'),request.args.get('startdate'),request.args.get('enddate'))
    elif endpoint == "getVolume":
        # If endpoint is 'getVolume', return the result of the 'volume_graph' function with the specified stock and date range
        return volume_graph(request.args.get('stock'),request.args.get('startdate'),request.args.get('enddate'))
    elif endpoint == "getInfo":
        # If endpoint is 'getInfo', get the stock information for the specified stock from the database and return it as a JSON object
        stock = request.args.get('stock')       
        return json.dumps(check_db(stock))
    elif endpoint =="getNews":
        # If endpoint is 'getNews', get the latest news for the specified stock from the database and return it as a JSON object
        stock = request.args.get('stock')   
        return json.dumps(getNews(stock))
    else:
        # If endpoint is not recognized, return an error message
        return "Bad endpoint", 400

In [13]:
app.run(host='localhost', port=5020)

 * Serving Flask app '__main__'
 * Debug mode: off


 * Running on http://localhost:5020
Press CTRL+C to quit
127.0.0.1 - - [03/May/2023 18:20:00] "GET / HTTP/1.1" 200 -
127.0.0.1 - - [03/May/2023 18:20:01] "GET /lib/chart/chart.min.js HTTP/1.1" 404 -
127.0.0.1 - - [03/May/2023 18:20:01] "GET /lib/easing/easing.min.js HTTP/1.1" 404 -
127.0.0.1 - - [03/May/2023 18:20:01] "GET /lib/waypoints/waypoints.min.js HTTP/1.1" 404 -
127.0.0.1 - - [03/May/2023 18:20:01] "GET /lib/owlcarousel/owl.carousel.min.js HTTP/1.1" 404 -
127.0.0.1 - - [03/May/2023 18:20:01] "GET /lib/tempusdominus/js/moment.min.js HTTP/1.1" 404 -
127.0.0.1 - - [03/May/2023 18:20:01] "GET /lib/tempusdominus/js/moment-timezone.min.js HTTP/1.1" 404 -
127.0.0.1 - - [03/May/2023 18:20:01] "GET /static/lib/tempusdominus/css/tempusdominus-bootstrap-4.min.css HTTP/1.1" 200 -
127.0.0.1 - - [03/May/2023 18:20:01] "GET /static/lib/owlcarousel/assets/owl.carousel.min.css HTTP/1.1" 200 -
127.0.0.1 - - [03/May/2023 18:20:01] "GET /static/css/bootstrap.min.css HTTP/1.1" 200 -
127.0.0.1 - - [

AAPL is in the collection
The collection already exists.
2023-04-22 2023-05-03
TSLA is not in the collection
Created new entry for TSLA.
The collection has been created.
[*********************100%***********************]  1 of 1 completed
[*********************100%***********************]  1 of 1 completed
[*********************100%***********************]  1 of 1 completed


127.0.0.1 - - [03/May/2023 18:20:15] "GET /display/getInfo?stock=AAPL HTTP/1.1" 200 -


All AAPL data inserted.


127.0.0.1 - - [03/May/2023 18:20:22] "GET /display/getInfo?stock=TSLA HTTP/1.1" 200 -


All TSLA data inserted.


127.0.0.1 - - [03/May/2023 18:20:26] "GET /display/getClose?stock=TSLA&startdate=2023-04-03&enddate=2023-05-03 HTTP/1.1" 200 -
127.0.0.1 - - [03/May/2023 18:20:26] "GET /display/getClose?stock=AAPL&startdate=2023-04-03&enddate=2023-05-03 HTTP/1.1" 200 -
127.0.0.1 - - [03/May/2023 18:20:27] "GET /display/getVolume?stock=TSLA&startdate=2023-04-03&enddate=2023-05-03 HTTP/1.1" 200 -
127.0.0.1 - - [03/May/2023 18:20:27] "GET /display/getVolume?stock=AAPL&startdate=2023-04-03&enddate=2023-05-03 HTTP/1.1" 200 -
127.0.0.1 - - [03/May/2023 18:20:27] "GET /display/getNews?stock=TSLA HTTP/1.1" 200 -
127.0.0.1 - - [03/May/2023 18:20:27] "GET /display/getNews?stock=AAPL HTTP/1.1" 200 -


## ETL Diagram

![ETLdiagram.png | width=10](attachment:ETLdiagram.png)

## Screenshot of Interface for interacting with the application

![ScreenshotOfUI.png | width=10](attachment:ScreenshotOfUI.png)