# ETF Research Data Project
### A Data Engineer Project

## Project Summary
As a Canadian Passive Investor who wants to invest in North America stocks. As there're thousands ETF available in the market, investor want to start a top-down analysis on different sector in North America. He/she want to do analysis on different sectors in both Canada and US.

### Glossary:
* North America: Only include US and Canada
* Passive Investor: Investors who will likely to invest in ETF or Mutual fund tracking certain index
* Benchmark: Benchmark is the standard to evaluate the performance of security, mutual fund, ETF, portfolio manager. Different benchmarks have different focus include different geographic locations, industry sectors, and investment styles, etc. It often constitute different individual securities or other benchmarks.
* NTR Index: Net Total Return Tracks all the capital gain as well as cash distribution income 

The project follows the follow steps:
* Step 1: Scope the Project and Gather Data
* Step 2: Explore and Assess the Data
* Step 3: Define the Data Model
* Step 4: Run ETL to Model the Data
* Step 5: Complete Project Write Up

In [1]:
!pip install quandl
import pandas as pd
import quandl
import os

Collecting quandl
  Downloading https://files.pythonhosted.org/packages/8b/2b/feefb36015beaecc5c0f9f2533e815b409621d9fa7b50b2aac621796f828/Quandl-3.6.1-py2.py3-none-any.whl
Collecting inflection>=0.3.1 (from quandl)
  Downloading https://files.pythonhosted.org/packages/59/91/aa6bde563e0085a02a435aa99b49ef75b0a4b062635e606dab23ce18d720/inflection-0.5.1-py2.py3-none-any.whl
Installing collected packages: inflection, quandl
Successfully installed inflection-0.5.1 quandl-3.6.1


## Project Scope
### Data Collection

All data are collected from Quandl with its Python API. 
Two datasets are used:

#### 1. Index Past Performance
When selecting industry and location, benchmarks for different industries and location are most suitable to track their past performance. NNASDAQ OMX Global Index Data is used for this project. 

There are two dataset used for Index data:
1. Past Daily Index Data
2. Index list data

##### Index Data Columns:
1. Date: Date in mm/dd/yyyy format
2. DATABASE_CODE: Database code
3. DATASET_CODE: Dataset code 
4. Index Value: Close value of given date
5. High: Highest value of given date
6. Low: Lowest value of given date
7. Total Market Value: Market value of index
8. Dividend Market Value: Total Dividend received

##### Index List Columns:
1. Name: Name of the index
2. SYMBOL: Current symbol
3. OLD SYMBOL: Previously used symbol

Source for Index List: https://www.quandl.com/data/NASDAQOMX-NASDAQ-OMX-Global-Index-Data/documentation

#### 2. Federal Reserve Economic Data
Economic data are highly valuable when conducting sector analysis. Traditional industry like consumer staples and consumer discretionary are highly correlated to economic data. Federal Reserve data is collected as well to supplement sector analysis.

There are two dataset used for Economic data:
1. Economic Indicator Schema
2. Economic Indicator with value

##### Economic Indicator Schema Columns:
1. CODE: Dataset code
2. INDICATOR: The name of Indicator
3. TYPE: Type of indicator

##### Economic Indicator with Value Columns:
1. Date: Date in mm/dd/yyyy format
2. DATABASE_CODE: Database code
3. DATASET_CODE: Dataset code 
4. Value: Value of the indicator

Source for Economoic Indicator List: https://www.quandl.com/data/FRED-Federal-Reserve-Economic-Data/documentation

### Expected Outcome
1. Update daily index value when market is open
2. Check if new Federal Reserve data is available and update it
3. Enable investor to conduct analysis with both index and economic data

Note: Quandl is a alternative data platform to provide different dataset for making investment decision.

### Step 2: Data Exploration and Cleaning

#### 2.1 Data Exploration

In [2]:
# Read Index Data
index_list = pd.read_csv('datasets/indexes_list.csv')
index_value = pd.read_csv('datasets/north_america_index.csv')

#### Checking Duplicate for index_list data 

In [3]:
index_list.head(5)

Unnamed: 0,#NAME,SYMBOL,OLD SYMBOL
0,Nasdaq Asia Basic Materials AUD Index,NQASIA55AUD,NQASIA1000AUD
1,Nasdaq Asia Basic Materials AUD NTR Index,NQASIA55AUDN,NQASIA1000AUDN
2,Nasdaq Asia Basic Materials AUD TR Index,NQASIA55AUDT,NQASIA1000AUDT
3,Nasdaq Asia Basic Materials CAD Index,NQASIA55CAD,NQASIA1000CAD
4,Nasdaq Asia Basic Materials CAD NTR Index,NQASIA55CADN,NQASIA1000CADN


In [4]:
# Identify Duplicate value for index list if any
index_list['#NAME'].value_counts().head(5)

Nasdaq Japan Food              36
Nasdaq Latin America Food      36
Nasdaq DM MEA Personal Care    36
Nasdaq DM ASPA Food            36
Nasdaq EM Asia Food            36
Name: #NAME, dtype: int64

In [5]:
index_list['SYMBOL'].value_counts().head(5)

NQES60EURN           1
NQUSL451010N         1
NQUSM45101015EURT    1
NQEU35CADT           1
NQASIA15LMN          1
Name: SYMBOL, dtype: int64

In [6]:
index_list['OLD SYMBOL'].value_counts().head(5)

NQCN4000AUDT      1
NQASIA6000JPYT    1
NQUSM2795GBPT     1
NQDM2300LMJPYN    1
NQUSL7535JPYT     1
Name: OLD SYMBOL, dtype: int64

Duplicate exists in #NAME Column so it needs to be removed

In [7]:
# Remove duplicate
index_list = index_list.drop_duplicates(subset=['#NAME'])

In [8]:
# Now check again if duplicate still exists
index_list['#NAME'].value_counts().head(5)

Nasdaq Hungary Utilities Large Mid Cap JPY NTR Index                1
Nasdaq US Large Cap Telecommunications Equipment CAD Index          1
Nasdaq Turkey Real Estate Large Mid Cap GBP Index                   1
Nasdaq Turkey Health Care Large Mid Cap GBP Index                   1
Nasdaq US Food Retailers and Wholesalers Large Mid Cap AUD Index    1
Name: #NAME, dtype: int64

#### Checking Missing Value for Index List

In [9]:
# Check Missing Value
index_list['#NAME'].isna().value_counts()

False    55406
Name: #NAME, dtype: int64

In [10]:
# Check Missing Value
index_list['SYMBOL'].isna().value_counts()

False    55338
True        68
Name: SYMBOL, dtype: int64

In [11]:
# Check Missing Value
index_list['OLD SYMBOL'].isna().value_counts()

False    42702
True     12704
Name: OLD SYMBOL, dtype: int64

As SYMBOL is used to extract data with API, and OLD SYMBOL is not used for majority of index. Missing value of 'SYMBOL' will be dropped.

In [12]:
index_list = index_list.dropna(axis = 0, subset=['SYMBOL'])

In [13]:
# Check Missing Value on SYMBOL again
index_list['SYMBOL'].isna().value_counts()

False    55338
Name: SYMBOL, dtype: int64

#### Checking Duplicate for index_list data 

In [14]:
index_value.head(5)

Unnamed: 0,Date,DATABASE_CODE,DATASET_CODE,Index Value,High,Low,Total Market Value,Dividend Market Value
0,9/21/2020,NASDAQOMX,NQCA55CADT,1498.71,1498.71,1498.71,348000000000.0,49505456
1,9/22/2020,NASDAQOMX,NQCA55CADT,1504.47,1504.47,1504.47,349000000000.0,0
2,9/23/2020,NASDAQOMX,NQCA55CADT,1426.91,1426.91,1426.91,331000000000.0,0
3,9/24/2020,NASDAQOMX,NQCA55CADT,1463.58,1463.58,1463.58,339000000000.0,0
4,9/25/2020,NASDAQOMX,NQCA55CADT,1462.38,1462.38,1462.38,339000000000.0,0


In [15]:
index_value.groupby(['Date','DATASET_CODE']).count().head(5)

Unnamed: 0_level_0,Unnamed: 1_level_0,DATABASE_CODE,Index Value,High,Low,Total Market Value,Dividend Market Value
Date,DATASET_CODE,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1,Unnamed: 6_level_1,Unnamed: 7_level_1
10/1/2020,NQCA10CADT,1,1,1,1,1,1
10/1/2020,NQCA10LMCADT,1,1,1,1,1,1
10/1/2020,NQCA15CADT,2,2,2,2,2,2
10/1/2020,NQCA20CADT,2,2,2,2,2,2
10/1/2020,NQCA30CADT,1,1,1,1,1,1


Duplicate for certain date & index combination exists

In [16]:
# Remove duplicate
index_value = index_value.drop_duplicates(subset=['Date','DATASET_CODE'])

In [17]:
# Check again
index_value.groupby(['Date','DATASET_CODE']).count().head(5)

Unnamed: 0_level_0,Unnamed: 1_level_0,DATABASE_CODE,Index Value,High,Low,Total Market Value,Dividend Market Value
Date,DATASET_CODE,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1,Unnamed: 6_level_1,Unnamed: 7_level_1
10/1/2020,NQCA10CADT,1,1,1,1,1,1
10/1/2020,NQCA10LMCADT,1,1,1,1,1,1
10/1/2020,NQCA15CADT,1,1,1,1,1,1
10/1/2020,NQCA20CADT,1,1,1,1,1,1
10/1/2020,NQCA30CADT,1,1,1,1,1,1


In [18]:
# Check if Index Value = High = Low
((index_value['Index Value'] == index_value['High']) & (index_value['Index Value'] == index_value['Low']) & (index_value['High'] == index_value['Low'])).value_counts()

True     47813
False       67
dtype: int64

As we can see from above comparison, only small portion of 3 columns are not equal. Index Value movement will not be signficant within one day and as long-term passive investor, we will not consider daily movement. As a result, only 'Index Value' column will be kept.

In [19]:
# Drop unnecessary columns and check
index_value = index_value.drop(columns=['High','Low'])
index_value.columns

Index(['Date', 'DATABASE_CODE', 'DATASET_CODE', 'Index Value',
       'Total Market Value', 'Dividend Market Value'],
      dtype='object')

#### Checking Missing value for index_value data

In [20]:
index_value.isna().any()

Date                     False
DATABASE_CODE            False
DATASET_CODE             False
Index Value              False
Total Market Value       False
Dividend Market Value    False
dtype: bool

No Missing Value for extracted index value

In [34]:
# Read in the data here
eco_indicator_mapping = pd.read_csv('datasets/economic indicator schema.csv')
eco_indicator_value = pd.read_csv('datasets/economic_indicator_value.csv')

#### Checking Duplicate for Economic Indicator List

In [35]:
eco_indicator_mapping.head(5)

Unnamed: 0,CODE,INDICATOR,TYPE
0,GDP,Gross Domestic Product,Growth
1,GDPC1,Real Gross Domestic Product,Growth
2,GDPPOT,Real Potential Gross Domestic Product,Growth
3,CPIAUCSL,Consumer Price Index for All Urban Consumers: ...,Prices and Inflation
4,CPILFESL,Consumer Price Index for All Urban Consumers: ...,Prices and Inflation


In [36]:
# Identify Duplicate value for economic indicator list if any
eco_indicator_mapping['CODE'].value_counts().head(5)

NROU      1
GDPPOT    1
T5YIE     1
HOUST     1
DGS30     1
Name: CODE, dtype: int64

In [37]:
eco_indicator_mapping['INDICATOR'].value_counts().head(5)

All Employees: Total nonfarm                         1
Effective Federal Funds Rate                         1
Industrial Production Index                          1
Real Median Household Income in the United States    1
M2 Money Stock                                       1
Name: INDICATOR, dtype: int64

In [38]:
eco_indicator_mapping['TYPE'].value_counts().head(5)

Employment                12
Other                     10
Interest Rates             8
Income and Expenditure     7
Money Supply               5
Name: TYPE, dtype: int64

No duplicate for 'CODE' and 'INDICATOR'. Duplicate for 'TYPE' is expected.

#### Checking Missing for Economic Indicator List

In [39]:
eco_indicator_mapping.isna().any()

CODE         False
INDICATOR    False
TYPE         False
dtype: bool

#### Checking Duplicate for Economic Indicator Value

In [40]:
eco_indicator_value.head(5)

Unnamed: 0,Date,DATABASE_CODE,DATASET_CODE,Value
0,1/1/1947,FRED,GDP,243.164
1,4/1/1947,FRED,GDP,245.968
2,7/1/1947,FRED,GDP,249.585
3,10/1/1947,FRED,GDP,259.745
4,1/1/1948,FRED,GDP,265.742


In [41]:
eco_indicator_value.groupby(['Date','DATASET_CODE']).count().head(5)

Unnamed: 0_level_0,Unnamed: 1_level_0,DATABASE_CODE,Value
Date,DATASET_CODE,Unnamed: 2_level_1,Unnamed: 3_level_1
1/1/1919,INDPRO,1,1
1/1/1920,INDPRO,1,1
1/1/1921,INDPRO,1,1
1/1/1922,INDPRO,1,1
1/1/1923,INDPRO,1,1


In [42]:
# Remove duplicate if any
eco_indicator_value = eco_indicator_value.drop_duplicates(subset=['Date','DATASET_CODE'])

#### Checking Missing for Economic Indicator Value

In [43]:
eco_indicator_value.isna().any()

Date             False
DATABASE_CODE    False
DATASET_CODE     False
Value            False
dtype: bool

#### Data Exploration and cleaning completed. Cleaned data will be saved to /output/

In [44]:
index_list.to_csv('output/index_list_updated.csv', index=False)
index_value.to_csv('output/index_value_updated.csv', index=False)
eco_indicator_mapping.to_csv('output/eco_indicator_mapping.csv', index=False)
eco_indicator_value.to_csv('output/eco_indicator_value.csv', index=False)

### 3. Data Model

#### 3.1 Data Model Design

#### Standard SQL Database vs NoSQL

Standard SQL Database is picked over NoSQL database with the following reason:
1. Due to scope of this analysis, only index and economoic indicators will be analyzed. Varieties of data are limited. 
2. Type of query can be varied. If NoSQL database like Cassandra is used, lots of type of query need to be created for analyzing investments and thus not very effective
3. Standard SQL database are more cost-effective for personal projects of small scale

Factors that affects variation of queries:
1. Time-Series - Different Time Period can be selected 
2. Geographical Location - US/Canada
3. For each sector, different economoic indicators combination may be used for analysis.
4. Correlation analysis between sector analysis/ Economoic Indicators

##### Star-Schema
Both economoic indicator and index are selected based on eco_indicator_list and index_list. However, when extracting out data, important information is missing from the 'value' table. Thus, a STAR schema is needed. Information in the list and value table need to combine as a fact table for easier accessible of all required information for research.

#### 3.2 Final Conceptual Data Model

<img src="images/ERD.jpg">.

#### Schema Design
The graph above shown the star schema including both dimension and fact table component of data modeling. When conducting research, index_fact and economic fact will be used.

##### Dimension Tables:

###### index_value:
1. date: Date in mm/dd/yyyy format (DATE) - Primary Key
2. dataset_code: Database code (VARCHAR) - Primary Key & Foreign Key(Reference index_list(symbol))
3. database_code: Dataset code (VARCHAR)
4. index_value: Close value of given date (NUMERIC)
5. total_market_value: Market value of index (NUMERIC)
6. dividend_market_value: Total Dividend received (NUMERIC)

###### index_list:
1. index_name: Name of the index (VARCHAR)
2. symbol: Current symbol (VARCHAR) - Primary Key
3. old_symbol: (VARCHAR) Previously used symbol

###### eco_indicator_list:
1. code: Dataset code (VARCHAR) - Primary Key
2. indicator_name: The name of Indicator (VARCHAR)
3. type: Type of indicator (VARCHAR)

###### eco_indicator_value:
1. date: Date in mm/dd/yyyy format (DATE) - Primary Key
2. database_code: Database code (VARCHAR) - Primary Key & Foreign Key(Reference eco_indicator_list(code))
3. dataset_code: Dataset code (VARCHAR)
4. indicator_value: Value of the indicator (NUMERIC)

##### Fact Tables:

When conducting sector analysis, the actual name of index is needed. It's not clear with only dataset_code. At this moment, only 1 type of index provider is used, we can ignore database_code. However, as index from other provider may be added in the future, database_code is kept to distinguish index data provider.

###### index_fact:
1. index_name 
2. date
3. dataset_code
4. index_value
5. total_market_value
6. dividend_market_value

When we conduct economical analysis, we need the sector information for economic indicators. 'indicator_name' and 'type' are added to the fact table.

###### eco_fact:
1. indicator_name
2. date
3. code
4. value
5. type


#### 3.3 Data Pipelines
1. Extract data from API 
2. Transform data to required foramt for schema
3. Create connection to data table 
4. Load initial data into the dimension tables
5. Load initial data into the fact tables with data from dimension table

### 4: Running Data Pipeline
#### 4.1 Create the data model

Please refer to creat_tables.py for data model creation processes

An example of data insertion is provided below. For detailed process, please refer to etl.py.

#### Example of insert index list

In [2]:
index_list = pd.read_csv('datasets/indexes_list.csv')
df_to_insert = pd.DataFrame(index_list, columns=['#NAME', 'SYMBOL', 'OLD SYMBOL'])
df_to_insert = df_to_insert[['SYMBOL','#NAME','OLD SYMBOL']]
df_to_insert = df_to_insert.rename(columns = {'SYMBOL':'symbol', '#NAME':'index_name', 'OLD SYMBOL':'old_symbol'})
df_to_insert.head(5)

Unnamed: 0,symbol,index_name,old_symbol
0,NQASIA55AUD,Nasdaq Asia Basic Materials AUD Index,NQASIA1000AUD
1,NQASIA55AUDN,Nasdaq Asia Basic Materials AUD NTR Index,NQASIA1000AUDN
2,NQASIA55AUDT,Nasdaq Asia Basic Materials AUD TR Index,NQASIA1000AUDT
3,NQASIA55CAD,Nasdaq Asia Basic Materials CAD Index,NQASIA1000CAD
4,NQASIA55CADN,Nasdaq Asia Basic Materials CAD NTR Index,NQASIA1000CADN


In [None]:
config = configparser.ConfigParser()
config.read('credential.cfg')

conn = psycopg2.connect("host={} dbname={} user={} password={} port={}".format(*config['CLUSTER'].values()))
cur = conn.cursor()

# Insert into tables
for row in df_to_insert.itertuples(index = False):
    cur.execute(index_list_insert, row)
conn.commit()

#### 4.2 Data Quality Checks
Explain the data quality checks you'll perform to ensure the pipeline ran as expected. These could include:
 * Integrity constraints on the relational database (e.g., unique key, data type, etc.)
 * Unit tests for the scripts to ensure they are doing the right thing
 * Source/Count checks to ensure completeness


The data checks are mainly completed through 2 components 
1. Initial duplication and data check(Please refer to part 1)
2. integrity constraints on the relational database during the insert base:

a. Unique key exists to identify unique value 
b. Date type is used for all date related columns 
c. NUMERIC data type are used for all value column and rest are all VARCHAR

#### 4.3 Data dictionary 
Please refer to data_dictionary.md for Data Dictionary.

### 5. Project Write Up

#### 5.1 Tools/Technology Selection

1. Data Exploration: 
Python Pandas Dataframe is used for initial exploration of raw data as it is easy to use for this size of data with good I/O compared to Spark.

2. Data Pipeline:
ETL are mainly completed with Python. The raw data are relatively clean after data exploration. Python pandas is more than enough to complete the task. Python also have easy access to connect to AWS Redshift. 

3. Data Storage:
AWS Redshift is the data storage tool selected for the project. It offers flexibility with resize/concurrency scaling feature that allows the database to expand capability. With SORTKEY & DISTKEY setup, the reading speed will be increased when large amount of data is stored in the schema.

#### 5.2 Data Update Frequency

It depends on analytics need. To get the most updated everyday, it can set to update everything daily as index value are updated every working day. However, economic indicator value are usually updated at least weekly. Another update strategy will be update everything on weekly when we need the data to do analysis. As a long-term investor, we tend not to do a lot of update on our investment strategy/decision a lot. I believe weekly update is more than enough to serve its purpose effectively.

#### 5.3 Plan for future

##### 5.3.1 Data Amount increase by 100x

If stock market in more area are needed to analyze, the data amount will be increased. Current database utilization is less than 1 GB. If the data amount increase by 100x as data accumulate over time and area to analyze expand, current Redshift configuration is still able to hold. If the data amount increases further, the size of REDSHIFT cluster can be resized to cater increasing data size demand.

##### 5.3.2 Data Populates a dashboard that must be updated on a daily basis by 7am everyday

Airflow will be a good choice to schedule everything to run daily basis by 7am as it offers:
1. One platform to have overview of data pipeline processes(DAG)
2. Task dependency easily managed by DAG
3. Easy to use scheduling capabilities including good retry set up
4. Alerting system inform data engineers with emails

The data pipeline for updating the dashboard will likely to include following steps 
1. Check most recent date for each index and economic indicator used(Python)
2. Download data starting after the most recent date(Python)
3. Load downloaded data into cluster (Airflow with Python)
4. Refresh dashboard with the new data

##### 5.3.3 The database needed to be accessed by 100+ people.

AWS Redshift has the concurrency scaling feature that allows Redshift to automatically add additional cluster when more cluster capacity is needed as more users are querying the database. It will be able to 100+ users at ease with flexibility to save cost incurred.