# Project Title
### Data Engineering Capstone Project

#### Project Summary
--describe your project at a high level--

The project follows the follow steps:
* Step 1: Scope the Project and Gather Data
* Step 2: Explore and Assess the Data
* Step 3: Define the Data Model
* Step 4: Run ETL to Model the Data
* Step 5: Complete Project Write Up

In [1]:
# Do all imports and installs here
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import simfin as sf

In [2]:
# Local spark cluster specific imports
import findspark
findspark.init()

In [3]:
import pyspark
from pyspark.sql import SparkSession
spark = SparkSession.builder.getOrCreate()
df = spark.sql("select 'spark' as hello ")
df.show()

+-----+
|hello|
+-----+
|spark|
+-----+



In [4]:
# import of functions from custom modules
from get_prices import *
from get_fundamentals import *
from get_peers import *
from get_company_info import *
from analysis_utils import *

## Step 1: Scope the Project and Gather Data

### Scope 
Explain what you plan to do in the project in more detail. What data do you use? What is your end solution look like? What tools did you use? etc>

### Describe and Gather Data 
Describe the data sets you're using. Where did it come from? What type of information is included? 

In [None]:
# IEX API calls


In [None]:
# Simfin API calls

## Step 2: Explore and Assess the Data
#### Explore the Data 
Identify data quality issues, like missing values, duplicate data, etc.

#### Cleaning Steps
Document steps necessary to clean the data

#### Configuration ####
Define:
- time period for analysis

In [5]:
period_dict = {'start_date':2010,
                'end_date':2019}

#### 1.1 Stock information
- Ticker symbol list: "http://www.nasdaqtrader.com/dynamic/SymDir/nasdaqtraded.txt"
    - Symbol Look-Up/Directory Data Fields & Definitions: http://www.nasdaqtrader.com/trader.aspx?id=symboldirdefs
- Company information: Kaggle dataset from 2019 https://www.kaggle.com/marketahead/all-us-stocks-tickers-company-info-logos 
- Peer group information: IEX Cloud API https://iexcloud.io/docs/api/#peer-groups

##### 1.1.1 Ticker Symbol List

In [6]:
# get ticket symbol list
symbol_df = pd.read_csv("http://www.nasdaqtrader.com/dynamic/SymDir/nasdaqtraded.txt", sep='|')
# exclude test issues
symbol_df = symbol_df[(symbol_df['Test Issue'] == 'N')]
# exclude companies that are bankrupt
symbol_df = symbol_df[symbol_df['Financial Status'].isna() | (symbol_df['Financial Status']=='N')]
# exclude ETFs
symbol_df = symbol_df[symbol_df['ETF']=='N']
symbol_list = symbol_df['NASDAQ Symbol'].tolist()
print('total number of symbols traded = {}'.format(len(symbol_list)))

Unnamed: 0,Nasdaq Traded,Symbol,Security Name,Listing Exchange,Market Category,ETF,Round Lot Size,Test Issue,Financial Status,CQS Symbol,NASDAQ Symbol,NextShares
0,Y,A,"Agilent Technologies, Inc. Common Stock",N,,N,100.0,N,,A,A,N
1,Y,AA,Alcoa Corporation Common Stock,N,,N,100.0,N,,AA,AA,N


##### 1.1.2 Company Info

In [8]:
company_info_df = load_company_info_from_disk(symbol_list)
company_info_df.head(1)

Unnamed: 0,ticker,company name,short name,industry,description,website,logo,ceo,exchange,market cap,sector,tag 1,tag 2,tag 3
0,A,Agilent Technologies Inc.,Agilent,Medical Diagnostics & Research,Agilent Technologies Inc is engaged in life sc...,http://www.agilent.com,A.png,Michael R. McMullen,New York Stock Exchange,24218070000.0,Healthcare,Healthcare,Diagnostics & Research,Medical Diagnostics & Research


In [None]:
create_pandas_profiling_report(company_info_df, 'company_info_df')

In [9]:
ticker_count = company_info_df['ticker'].nunique()
print(f'Number of companies in company_info_df: {ticker_count}')
# reduce symbol_list to those where company information is available
symbol_list = company_info_df['ticker'].unique().tolist()

Number of companies in company_info_df: 4545


In [None]:
# tmp - download stats
# stats_df = download_advanced_stats_data(symbol_list)

## TESTING: limit symbol list - to be deleted later

In [10]:
# combine data
symbol_list = ['AAPL', 'GOOG']

##### 1.1.3 Peer group information


In [17]:
# initial download of peer data from API
#peer_df = download_peer_data(symbol_list)

In [11]:
# load downloaded peer data from disk
peer_df = get_peer_data_from_disk(symbol_list)
peer_df_shape = peer_df.shape
print(f'Shape of peer_df: {peer_df_shape}')

Shape of peer_df: (2, 3)


In [12]:
company_info_df_cols = company_info_df.columns
peer_df_cols = peer_df.columns
print(f'company_info_df: {company_info_df_cols}')
print()
print(f'peer_df: {peer_df_cols}')

company_info_df: Index(['ticker', 'company name', 'short name', 'industry', 'description',
       'website', 'logo', 'ceo', 'exchange', 'market cap', 'sector', 'tag 1',
       'tag 2', 'tag 3'],
      dtype='object')

peer_df: Index(['ticker', 'peer_string', 'peer_list'], dtype='object')


In [13]:
# add peer data to company info
company_info_df = company_info_df.merge(peer_df, 
                                        on = 'ticker',
                                        how='left',
                                        validate='1:1')
company_info_df.head(3)

Unnamed: 0,ticker,company name,short name,industry,description,website,logo,ceo,exchange,market cap,sector,tag 1,tag 2,tag 3,peer_string,peer_list
0,A,Agilent Technologies Inc.,Agilent,Medical Diagnostics & Research,Agilent Technologies Inc is engaged in life sc...,http://www.agilent.com,A.png,Michael R. McMullen,New York Stock Exchange,24218070000.0,Healthcare,Healthcare,Diagnostics & Research,Medical Diagnostics & Research,,
1,AA,Alcoa Corporation,Alcoa,Metals & Mining,Alcoa Corp is an integrated aluminum company. ...,http://www.alcoa.com,AA.png,Roy Christopher Harvey,New York Stock Exchange,5374967000.0,Basic Materials,Basic Materials,Aluminum,Metals & Mining,,
2,AAC,AAC Holdings Inc.,AAC,Health Care Providers,AAC Holdings Inc provides inpatient and outpat...,http://www.americanaddictioncenters.org,,Michael T. Cartwright,New York Stock Exchange,63720100.0,Healthcare,Healthcare,Medical Care,Health Care Providers,,


#### 1.2 Fundamental indicators 
- Return on Investment Capital (ROIC)
    - Source: Income Statement
        - net income: n_i -> column 'Net Income'
        - dividend: di -> column 'Dividends Paid'
        - debt: de -> summed up with equity in column 'Total Liabilities & Equity'
        - equity: eq
    - Definition: ROIC = (ni - di) / (de + eq)
- Sales Growth Rate
    - Source: Income Statement
    - Definition:  Sales is equal to column 'revenue'
- Earnings per Share Growth Rate
    - Source: Income Statement
        - Earnings: ea -> column 'Net Income'
        - number of shares : sh -> column 'Shares (Basic)'
    - Definition: ea / sh
- Book Value per Share Growth Rate
    - Source: Balance Sheet
        - Total Equity: t_e -> column 'Total Equity'
        - Prefered Equity: p_e (not available)
        - number of shares : sh -> column 'Shares (Basic)'
    - Definition: (t_e - p_e) / sh
- Free Cash Flow Growth Rate
    - Source: 
        - Cash Flow: 
            - Cashflow from Operating Activities: cf_oa -> column 'Net Cash from Operating Activities'
            - Capital Expenditure: capex -> column 'Net Cash from Investing Activities'
        - Income Statement:
            - Interest Expenses: i_e -> exclude for simplicity, column 'Interest Expense, Net'
            - Tax shield on Interest Expense: t_i_e  -> exclude for simplicity
    - Definition: Free Cashflow (f_cf) = cf_oa + i_e - t_i_e - capex

In [9]:
# initialize simfin API
init_simfin_api()

In [3]:
market='us'
variant='annual'
# download cashflow data from the SimFin server and load into a Pandas DataFrame.
cashflow_df = sf.load_cashflow(variant=variant, market=market)
cashflow_df = cashflow_df.reset_index()
# Download the data from the SimFin server and load into a Pandas DataFrame.
income_sm_df = sf.load_income(variant=variant, market=market)
income_sm_df = income_sm_df.reset_index()
# Download the data from the SimFin server and load into a Pandas DataFrame.
balance_st_df = sf.load_balance(variant=variant, market=market)
balance_st_df = balance_st_df.reset_index()
# create Pandas Profiling Report for each DataFrame
for df, df_name in zip([cashflow_df, income_sm_df, balance_st_df],
                        ['cashflow_df', 'income_sm_df', 'balance_st_df']):
    create_pandas_profiling_report(df, df_name)

Exception: The simfin data directory has not been set by the user. Please call the function sf.set_data_dir() first.

In [18]:
# combine fundamentals and calculate top5 kpis
fundamental_df = combine_fundamentals(symbol_list, period_dict)
fundamental_df = calculate_top5_kpi(fundamental_df)
fundamental_df.tail(3)

Dataset "us-cashflow-annual" on disk (19 days old).
- Loading from disk ... Done!
Dataset "us-income-annual" on disk (19 days old).
- Loading from disk ... Done!
Dataset "us-balance-annual" on disk (17 days old).
- Loading from disk ... Done!
Combined all fundamental data from financial statements to one Dataframe.
Calculated roic and added it to Dataframe
Calculated eps and added it to Dataframe
Calculated bvps and added it to Dataframe
Calculated fcf and added it to Dataframe
top5 KPIs added to fundamental data


Unnamed: 0,Ticker,Report Date_is,SimFinId,Currency,Fiscal Year,Fiscal Period_is,Publish Date_is,Restated Date_is,Shares (Basic)_is,Shares (Diluted)_is,...,Share Capital & Additional Paid-In Capital,Treasury Stock,Retained Earnings,Total Equity,Total Liabilities & Equity,Dividends Paid_clean,roic,eps,bvps,fcf
17,GOOG,2017-12-31,18,USD,2017,FY,2018-02-06,2018-02-06,692901000.0,750730000.0,...,40247000000.0,,113247000000.0,152502000000.0,197295000000,0.0,0.064178,18.273895,220.092048,69911000000.0
18,GOOG,2018-12-31,18,USD,2018,FY,2019-02-06,2019-02-06,695140000.0,750149000.0,...,45049000000.0,,134885000000.0,177628000000.0,232792000000,0.0,0.132032,44.215554,255.528383,76475000000.0
19,GOOG,2019-12-31,18,USD,2019,FY,2020-02-04,2020-02-04,692596000.0,745083000.0,...,50552000000.0,,152122000000.0,201442000000.0,275909000000,0.0,0.124472,49.585906,290.850655,84011000000.0


#### 1.3 Pricing information ####
The purpose of this data is to evaluate the so-called sticker prices and margin of safety based on current stock prices. It can also be used for backtesting criteria on historic data.
- Sticker price calculation
    - future period fp, by default 10 years
    - Sticker price = future market price / (1 + exp_rr)^fp
    - expected annual return rate exp_rr, by default 15%
    - future market price = future P/E * estimated future EPS
        - future P/E = min(pe_default, pe_5yr_avg)
            - default price per earnings pe_default: 2* rule #1 growth rate (see below)
            - 5 year average of annual price per earnings pe_5yr_avg
                - annual price per earnings pe = price/ eps
                    - annual price = mean of daily low prices in month December
        - estimated future EPS f_eps = current EPS * (1+ rule1_gr)^fp
            - rule #1 growth rate rule1_gr = min(bvps_gr_5yr, eps_gr_5yr)

- Margin of safety: half the the sticker price.

In [14]:
# Download historic stock prices for symbols
download_ticker_prices(symbol_list)

Total number of valid symbols downloaded = 2


In [19]:
# load ticker prices for symbols
price_df = load_ticker_prices(spark, symbol_list)
price_df.toPandas().tail()

root
 |-- Date: date (nullable = true)
 |-- Ticker: string (nullable = true)
 |-- Open: double (nullable = true)
 |-- High: double (nullable = true)
 |-- Low: double (nullable = true)
 |-- Close: double (nullable = true)
 |-- Adj Close: double (nullable = true)
 |-- Volume: double (nullable = true)



Unnamed: 0,Date,Ticker,Open,High,Low,Close,Adj Close,Volume
5029,2021-04-28,GOOG,2407.14502,2452.37793,2374.850098,2379.909912,2379.909912,2986400.0
5030,2021-04-29,GOOG,2410.330078,2436.52002,2402.280029,2429.889893,2429.889893,1977700.0
5031,2021-04-30,GOOG,2404.48999,2427.139893,2402.159912,2410.120117,2410.120117,1956700.0
5032,2021-05-03,GOOG,2402.719971,2419.699951,2384.5,2395.169922,2395.169922,1688900.0
5033,2021-05-04,GOOG,2369.73999,2379.26001,2311.699951,2326.334961,2326.334961,1072839.0


In [20]:
# calculate annual price from historic price data
ann_price_df = calculate_annual_price(spark, price_df, period_dict)
ann_price_df.tail(3)

Unnamed: 0,year,Ticker,mean_low_price
15,2018,GOOG,1023.568163
16,2019,AAPL,68.467024
17,2019,GOOG,1334.03919


In [21]:
# calculate annual price per earnings
fundamental_df = calculate_annual_pe(ann_price_df, fundamental_df)
fundamental_df.tail(3)

Unnamed: 0,Ticker,Report Date_is,SimFinId,Currency,Fiscal Year,Fiscal Period_is,Publish Date_is,Restated Date_is,Shares (Basic)_is,Shares (Diluted)_is,...,Retained Earnings,Total Equity,Total Liabilities & Equity,Dividends Paid_clean,roic,eps,bvps,fcf,mean_low_price,pe
17,GOOG,2017-12-31,18,USD,2017,FY,2018-02-06,2018-02-06,692901000.0,750730000.0,...,113247000000.0,152502000000.0,197295000000,0.0,0.064178,18.273895,220.092048,69911000000.0,1036.52785,56.72178
18,GOOG,2018-12-31,18,USD,2018,FY,2019-02-06,2019-02-06,695140000.0,750149000.0,...,134885000000.0,177628000000.0,232792000000,0.0,0.132032,44.215554,255.528383,76475000000.0,1023.568163,23.149505
19,GOOG,2019-12-31,18,USD,2019,FY,2020-02-04,2020-02-04,692596000.0,745083000.0,...,152122000000.0,201442000000.0,275909000000,0.0,0.124472,49.585906,290.850655,84011000000.0,1334.03919,26.903596


In [22]:
# calculate growth kpi df
growth_df = calculate_growth_rates(fundamental_df, agg_func='mean')
growth_df.tail(3)

Calculated KPI growth from year to year.
Calculated 5 and 10 year growth rate


Unnamed: 0,Ticker,revenue_gr_curr,eps_curr,roic_gr_5yr,revenue_gr_5yr,eps_gr_5yr,bvps_gr_5yr,fcf_gr_5yr,pe_5yr,yrs_in_5yr,...,pe_default_5yr,roic_gr_10yr,revenue_gr_10yr,eps_gr_10yr,bvps_gr_10yr,fcf_gr_10yr,pe_10yr,yrs_in_10yr,rule1_gr_10yr,pe_default_10yr
0,AAPL,-0.020411,2.991446,0.02,0.08,0.15,0.02,-0.12,16.07,5,...,4.0,-0.02,0.19,0.24,0.14,0.1,15.23,10,0.14,28.0
1,GOOG,0.183001,49.585906,0.12,0.2,0.3,0.14,0.14,33.1,5,...,28.0,0.04,0.21,0.16,0.1,0.18,27.76,10,0.1,20.0


In [24]:
growth_df = calculate_sticker_price(growth_df, fp=10, exp_rr=0.15)
growth_df.tail(3)

Unnamed: 0,Ticker,revenue_gr_curr,eps_curr,roic_gr_5yr,revenue_gr_5yr,eps_gr_5yr,bvps_gr_5yr,fcf_gr_5yr,pe_5yr,yrs_in_5yr,...,fcf_gr_10yr,pe_10yr,yrs_in_10yr,rule1_gr_10yr,pe_default_10yr,pe_future,eps_future,price_future,sticker_price,mos
0,AAPL,-0.020411,2.991446,0.02,0.08,0.15,0.02,-0.12,16.07,5,...,0.1,15.23,10,0.14,28.0,4.0,3.646556,14.586223,3.605491,1.802746
1,GOOG,0.183001,49.585906,0.12,0.2,0.3,0.14,0.14,33.1,5,...,0.18,27.76,10,0.1,20.0,28.0,183.825927,5147.12595,1272.290815,636.145408


In [29]:
price_df.printSchema()

root
 |-- Date: date (nullable = true)
 |-- Ticker: string (nullable = true)
 |-- Open: double (nullable = true)
 |-- High: double (nullable = true)
 |-- Low: double (nullable = true)
 |-- Close: double (nullable = true)
 |-- Adj Close: double (nullable = true)
 |-- Volume: double (nullable = true)



In [25]:
# filter for verification of mean calculation
# price_df.filter(
#                     (price_df['Date']>=F.to_date(F.lit('2020-12-01'))) &\
#                     (price_df['Date']<=F.to_date(F.lit('2020-12-31'))) &\
#                     (price_df['Ticker']=='AAPL')
#                 ).toPandas()['Low'].mean()

In [26]:
# Performing cleaning tasks here





## Step 3: Define the Data Model
#### 3.1 Conceptual Data Model
Map out the conceptual data model and explain why you chose that model
- Dimensional tables
    - company information, including peers
- Fact tables:
    - fundamental indicators
    - price history
    - growth KPIs
    - screener results

#### 3.2 Mapping Out Data Pipelines
List the steps necessary to pipeline the data into the chosen data model
- Data extraction pipeline: extract data from sources via APIs and store results in staging tables.
    - NASDAAQ: stock symbol list
    - IEX Cloud source:
        - Company information data
        - Peer group data
    - Simfin source: fundamental data
        - Annual cashflow data
        - Annual income statement data
        - Annual balance sheet data
    - Yfinance source: historical stock price data
- Data processing pipeline: process data from staging tables to dimension and fact tables.
    - create company information dimension table with symbol list.
    - create price facts table
    - create fundamental facts table
    - create growth KPI facts table
    - create screener results KPI table

### Step 4: Run Pipelines to Model the Data 
#### 4.1 Create the data model
Build the data pipelines to create the data model.

In [6]:
%%time
# download data from sources to staging folders
pipeline_staging()

Symbol data extracted...
total number of symbols traded = 8060
Company data loaded from disk...
Ticker price data extracted...
Total number of valid symbols downloaded = 4511
Wall time: 1h 22min 57s


In [6]:
%%time
period_dict = {'start_date':2010,
                'end_date':2019}

company_info_df, fundamental_df, growth_df, screener_df = pipeline_processing(spark, period_dict)

Number of stocks symbols in list: 6368
Company data loaded from disk...
Dataset "us-cashflow-annual" on disk (22 days old).
- Loading from disk ... Done!
Dataset "us-income-annual" on disk (22 days old).
- Loading from disk ... Done!
Dataset "us-balance-annual" on disk (20 days old).
- Loading from disk ... Done!
Combined all fundamental data from financial statements to one Dataframe.
Calculated roic and added it to Dataframe
Calculated eps and added it to Dataframe
Calculated bvps and added it to Dataframe
Calculated fcf and added it to Dataframe
top5 KPIs added to fundamental data
root
 |-- Date: date (nullable = true)
 |-- Ticker: string (nullable = true)
 |-- Open: double (nullable = true)
 |-- High: double (nullable = true)
 |-- Low: double (nullable = true)
 |-- Close: double (nullable = true)
 |-- Adj Close: double (nullable = true)
 |-- Volume: double (nullable = true)

Calculated KPI growth from year to year.
Calculated 5 and 10 year growth rate
Wall time: 11min 13s


In [8]:
screener_df.head()

Unnamed: 0,Ticker,last_date,last_low_price,price_future,sticker_price,mos
0,ALXN,2021-05-05,168.789993,11071.69,2736.751,1368.376
1,CARS,2021-05-05,12.95,1809293000.0,447229600.0,223614800.0
2,ABMD,2021-05-05,299.779999,2648.559,654.6833,327.3416
3,OPRX,2021-05-05,47.619999,6767.95,1672.934,836.4668
4,SABR,2021-05-05,12.41,11930.05,2948.925,1474.463


In [17]:
growth_df[growth_df['Ticker']=='CARS'].iloc[0]

Ticker                          CARS
revenue_gr_curr            -0.083738
eps_curr                   -6.647123
roic_gr_5yr                -5.420000
revenue_gr_5yr             -0.010000
eps_gr_5yr                 -4.530000
bvps_gr_5yr                      inf
fcf_gr_5yr                 -0.140000
pe_5yr                     15.900000
yrs_in_5yr                  4.000000
rule1_gr_5yr               -4.530000
pe_default_5yr           -906.000000
roic_gr_10yr               -5.420000
revenue_gr_10yr            -0.010000
eps_gr_10yr                -4.530000
bvps_gr_10yr                     inf
fcf_gr_10yr                -0.140000
pe_10yr                    15.900000
yrs_in_10yr                        4
rule1_gr_10yr              -4.530000
pe_default_10yr          -906.000000
pe_future                -906.000000
eps_future           -1997012.410272
price_future       1809293243.706757
sticker_price       447229618.733932
mos                 223614809.366966
Name: 298, dtype: object

In [14]:
company_info_df[company_info_df['tbicker']=='CARS']

Unnamed: 0,ticker,company name,short name,industry,description,website,logo,ceo,exchange,market cap,sector,tag 1,tag 2,tag 3,peer_string,peer_list
898,CARS,Cars.com Inc.,Cars.com,Autos,Cars.com Inc is an online destination for buyi...,https://www.cars.com,CARS.png,Thomas Alex Vetter,New York Stock Exchange,1875338000.0,Consumer Cyclical,Consumer Cyclical,Auto & Truck Dealerships,Autos,"PBYA,PAXH,PERI,PHA-CA","[PBYA, PAXH, PERI, PHA-CA]"


In [7]:
# DEBUGGING BELOW +++++++++++++++++++++++++++++++++++++++++++++++++++++

# def calculate_growth_summary(df, gr_kpi_list, agg_func='mean'):
#     """[summary]

#     Args:
#         df ([type]): [description]
#         gr_kpi_list ([type]): [description]
#         agg_func (str, optional): [description]. Defaults to 'mean'.

#     Returns:
#         [type]: [description]
#     """    
#     # calculate 10 year and 5 year average
#     # get current year
#     curr_year = df['Fiscal Year'].max()
#     df_list = []
#     for period in [5, 10]:
#         start_year = curr_year - period
#         df_filtered = df[df['Fiscal Year']>start_year]
#         kpi_sum_list = [kpi + '_' + str(period) + 'yr' for kpi in gr_kpi_list]
#         rename_dict = dict(zip(gr_kpi_list, kpi_sum_list))
#         rename_dict['Fiscal Year'] = 'yrs_in_' + str(period) + 'yr'
#         agg_dict = {gr_kpi:agg_func for gr_kpi in gr_kpi_list}
#         agg_dict['Fiscal Year'] ='count'
#         df_filtered = df_filtered.groupby(['Ticker'])\
#                                         .agg(agg_dict)\
#                                         .reset_index()\
#                                         .round(2)
#         df_filtered = df_filtered.rename(columns=rename_dict)
#         # calculate rule1 growth rate
#         kpi_list = ['bvps_gr_', 'eps_gr_']
#         kpi_list = [(kpi + str(period) + 'yr') for kpi in kpi_list]
#         df_filtered[('rule1_gr_' + str(period)+ 'yr')] = df_filtered[kpi_list].min(axis=1)
#         df_filtered[('rule1_gr_' + str(period)+ 'yr')] = df_filtered[('rule1_gr_' + str(period)+ 'yr')].apply(lambda value: value if value > 0 else np.nan)
#         df_filtered['pe_default_'+ str(period)+ 'yr'] = 2*100*df_filtered[('rule1_gr_' + str(period)+ 'yr')]
#         df_list.append(df_filtered)
#     df_sum = pd.merge(df_list[0], df_list[1], how='outer', on='Ticker')
#     print('Calculated 5 and 10 year growth rate')
#     return df_sum

# def get_current_yr_kpi(df):
#     col_list = ['Ticker', 'revenue_gr', 'eps']
#     curr_yr = df['Fiscal Year'].max()
#     rename_dict = {col:(col + '_curr') for col in ['revenue_gr', 'eps']}
#     df_curr_yr = df[df['Fiscal Year']==curr_yr][col_list].reset_index(drop=True)
#     df_curr_yr = df_curr_yr.rename(columns=rename_dict)
#     return df_curr_yr

# def calculate_growth_rates(df, agg_func='mean'):
#     """[summary]

#     Args:
#         df ([type]): [description]
#         agg_func (str, optional): [description]. Defaults to 'mean'.

#     Returns:
#         [type]: [description]
#     """    
#     group_col_list = ['Ticker',
#                     'Fiscal Year']
#     kpi_col_list = ['roic',
#                     'revenue',
#                     'eps',
#                     'bvps',
#                     'fcf'
#                     ]
#     df = df.rename(columns={'Revenue':'revenue'})
#     column_list = group_col_list + kpi_col_list 
#     # add price per earnings to list, but do not calculate change
#     pe_df = df[group_col_list + ['pe']]
#     df = df[column_list]
#     # sort values to ensure that growth rate is calculated correctly
#     df = df.sort_values(by=group_col_list, ascending = True)
#     gr_kpi_list = [kpi + '_gr' for kpi in kpi_col_list]
#     # create new columns for kpi growth rate
#     df[gr_kpi_list] = df.groupby(['Ticker'])[kpi_col_list].pct_change()
#     df = df.merge(pe_df, on = group_col_list, how= 'left', validate='1:1')
#     gr_kpi_list.append('pe')
#     print('Calculated KPI growth from year to year.')
#     # extract current year kpi data
#     df_curr_yr = get_current_yr_kpi(df)
#     # calculate growth summary
#     df_growth = calculate_growth_summary(df, gr_kpi_list, agg_func=agg_func)
#     df = df_curr_yr.merge(df_growth, how='outer', on='Ticker',  validate='1:1')
#     # data cleansing: remove inf values from numerical columns
#     number_cols = df.select_dtypes(include='number').columns
#     df[number_cols] = df[number_cols].replace([np.inf, -np.inf], np.nan) 
#     return df

# def find_stocks_below_mos(spark, price_df, growth_df):
#     """[summary]

#     Args:
#         spark ([type]): [description]
#         price_df ([type]): [description]
#         growth_df ([type]): [description]

#     Returns:
#         [type]: [description]
#     """    
#     # price_df : group by Ticker and filter price_df for max date, keep low price
#     curr_price_df = price_df.select('Date', 'Ticker')\
#                             .groupBy('Ticker')\
#                             .agg(F.max('Date').alias('Date'))
#     curr_price_df = curr_price_df.join(price_df.select('Date', 'Ticker', 'Low'), 
#                                         ['Date', 'Ticker'], 
#                                         'left_outer')
#     curr_price_df = curr_price_df.withColumnRenamed('Date', 'last_date')\
#                                .withColumnRenamed('Low', 'last_low_price') 
#     # create Spark df from Pandas df growth_df
#     growth_df_spark = spark.createDataFrame(\
#                             growth_df[['Ticker', 'price_future', 'sticker_price', 'mos']])
#     # filter on condition price < mos
#     curr_price_df = curr_price_df.join(growth_df_spark,
#                                         'Ticker',
#                                         'left_outer')
#     curr_price_df = curr_price_df.filter(curr_price_df['last_low_price'] <= curr_price_df['mos'])
#     # drop rows with missing values
#     curr_price_df = curr_price_df.na.drop()
#     # convert to Pandas df
#     screener_df = curr_price_df.toPandas()
#     # return Pandas df
#     return screener_df

# def pipeline_processing(spark, period_dict, fp=10, exp_rr=0.15, symbol_list=['_all']):
#     # create company info_df
#     company_info_df = load_company_info_from_disk(symbol_list=symbol_list)
#     symbol_list = company_info_df['ticker'].unique().tolist()
#     peer_df = get_peer_data_from_disk(symbol_list)
#     company_info_df = company_info_df.merge(peer_df, 
#                                         on = 'ticker',
#                                         how='left',
#                                         validate='1:1')
#     # combine fundamentals and calculate top5 kpis
#     fundamental_df = combine_fundamentals(symbol_list, period_dict)
#     fundamental_df = calculate_top5_kpi(fundamental_df)
#     # load ticker prices for symbols
#     price_df = load_ticker_prices(spark, symbol_list)
#     # calculate annual price from historic price data
#     ann_price_df = calculate_annual_price(spark, price_df, period_dict)
#     # calculate annual price per earnings
#     fundamental_df = calculate_annual_pe(ann_price_df, fundamental_df)
#     # calculate growth kpi df
#     growth_df = calculate_growth_rates(fundamental_df, agg_func='mean')
#     growth_df = calculate_sticker_price(growth_df, fp=fp, exp_rr=exp_rr)
#     screener_df = find_stocks_below_mos(spark, price_df, growth_df)
#     return company_info_df, fundamental_df, growth_df, screener_df



period_dict = {'start_date':2010,
                'end_date':2019}

company_info_df, fundamental_df, growth_df, screener_df = pipeline_processing(spark, period_dict)
growth_df[growth_df['Ticker']=='CARS'].iloc[0]

Number of stocks symbols in list: 6368
Company data loaded from disk...
Dataset "us-cashflow-annual" on disk (23 days old).
- Loading from disk ... Done!
Dataset "us-income-annual" on disk (23 days old).
- Loading from disk ... Done!
Dataset "us-balance-annual" on disk (20 days old).
- Loading from disk ... Done!
Combined all fundamental data from financial statements to one Dataframe.
Calculated roic and added it to Dataframe
Calculated eps and added it to Dataframe
Calculated bvps and added it to Dataframe
Calculated fcf and added it to Dataframe
top5 KPIs added to fundamental data
Calculated KPI growth from year to year.
Calculated 5 and 10 year growth rate


Ticker                  CARS
revenue_gr_curr    -0.083738
eps_curr           -6.647123
roic_gr_5yr        -5.420000
revenue_gr_5yr     -0.010000
eps_gr_5yr         -4.530000
bvps_gr_5yr              NaN
fcf_gr_5yr         -0.140000
pe_5yr             15.900000
yrs_in_5yr          4.000000
rule1_gr_5yr             NaN
pe_default_5yr           NaN
roic_gr_10yr       -5.420000
revenue_gr_10yr    -0.010000
eps_gr_10yr        -4.530000
bvps_gr_10yr             NaN
fcf_gr_10yr        -0.140000
pe_10yr            15.900000
yrs_in_10yr                4
rule1_gr_10yr            NaN
pe_default_10yr          NaN
pe_future          15.900000
eps_future               NaN
price_future             NaN
sticker_price            NaN
mos                      NaN
Name: 298, dtype: object

In [9]:
screener_df.head()

Unnamed: 0,Ticker,last_date,last_low_price,price_future,sticker_price,mos
0,ALXN,2021-05-05,168.789993,11071.685791,2736.751398,1368.375699
1,ABMD,2021-05-05,299.779999,2648.558973,654.683271,327.341636
2,SABR,2021-05-05,12.41,11930.046852,2948.925125,1474.462563
3,FND,2021-05-05,111.550003,2714.119248,670.888769,335.444384
4,EA,2021-05-05,137.630005,1500.221008,370.831689,185.415845


In [12]:
eval_df = screener_df.merge(company_info_df.rename(columns={'ticker':'Ticker'}),
                        how='left',
                        on='Ticker',
                        validate='1:1')
eval_df.head(2)

Unnamed: 0,Ticker,last_date,last_low_price,price_future,sticker_price,mos,company name,short name,industry,description,...,logo,ceo,exchange,market cap,sector,tag 1,tag 2,tag 3,peer_string,peer_list
0,ALXN,2021-05-05,168.789993,11071.685791,2736.751398,1368.375699,Alexion Pharmaceuticals Inc.,Alexion Pharmaceuticals,Biotechnology,Alexion Pharmaceuticals Inc is a biopharmaceut...,...,ALXN.png,Ludwig N. Hantson,Nasdaq Global Select,27608210000.0,Healthcare,Healthcare,Biotechnology,,"GILD,REGN,VRTX,BIIB,QGEN,AGIO,RARE,SRPT,BIO.B,JNJ","[GILD, REGN, VRTX, BIIB, QGEN, AGIO, RARE, SRP..."
1,ABMD,2021-05-05,299.779999,2648.558973,654.683271,327.341636,ABIOMED Inc.,ABIOMED,Medical Devices,Abiomed Inc is a medical device company. It pr...,...,ABMD.png,Michael R. Minogue,Nasdaq Global Select,14887630000.0,Healthcare,Healthcare,Medical Devices,,"TFX,BSX,STXS,ATRC","[TFX, BSX, STXS, ATRC]"


In [25]:
eval_df.to_excel('../data/4_data_analysis/' + str(pd.to_datetime('today'))[:10] + '_eval_df.xlsx', index=False)

In [27]:
eval_df['mos'].describe(percentiles=[.25, .5, .75, .8, .9, .95])

count    8.900000e+01
mean     1.932307e+23
std      1.822935e+24
min      4.376009e+00
25%      9.204744e+01
50%      3.273416e+02
75%      1.474463e+03
80%      2.305800e+03
90%      8.875529e+03
95%      7.727330e+06
max      1.719753e+25
Name: mos, dtype: float64

In [28]:
growth_df[growth_df['Ticker']=='FIVN'].iloc[0]

Ticker                    FIVN
revenue_gr_curr       0.272999
eps_curr             -0.075400
roic_gr_5yr           2.740000
revenue_gr_5yr        0.260000
eps_gr_5yr            3.300000
bvps_gr_5yr           0.450000
fcf_gr_5yr            5.060000
pe_5yr            -2357.840000
yrs_in_5yr            5.000000
rule1_gr_5yr          0.450000
pe_default_5yr       90.000000
roic_gr_10yr          2.220000
revenue_gr_10yr       0.250000
eps_gr_10yr           2.610000
bvps_gr_10yr         -0.040000
fcf_gr_10yr           4.080000
pe_10yr           -1965.590000
yrs_in_10yr                  7
rule1_gr_10yr              NaN
pe_default_10yr            NaN
pe_future         -2357.840000
eps_future           -3.097804
price_future       7304.125676
sticker_price      1805.468159
mos                 902.734079
Name: 641, dtype: object

#### 4.2 Data Quality Checks
Explain the data quality checks you'll perform to ensure the pipeline ran as expected. These could include:
 * Integrity constraints on the relational database (e.g., unique key, data type, etc.)
 * Unit tests for the scripts to ensure they are doing the right thing
 * Source/Count checks to ensure completeness
 
Run Quality Checks

In [28]:
# Perform quality checks here

#### 4.3 Data dictionary 
Create a data dictionary for your data model. For each field, provide a brief description of what the data is and where it came from. You can include the data dictionary in the notebook or in a separate file.

#### Step 5: Complete Project Write Up
* Clearly state the rationale for the choice of tools and technologies for the project.
* Propose how often the data should be updated and why.
* Write a description of how you would approach the problem differently under the following scenarios:
 * The data was increased by 100x.
 * The data populates a dashboard that must be updated on a daily basis by 7am every day.
 * The database needed to be accessed by 100+ people.