# Data Engineering Capstone Project

#### Project Summary
The Capstone project analyses and process data from Brazilian stock market since 2013 until now. Also,
the Brazilian economic data from [World Bank](https://data.worldbank.org/).

With those data in mind, it is possible to evaluate and take insights, relate country data with stock market.
Basically to answer questions like:
* Does the stock market influence real economy?
* Is the population's education rate related to the increase in GDP?
* The raising of company in stock market help us somehow?


The project follows the follow steps:
* Step 1: Scope the Project and Gather Data
* Step 2: Explore and Assess the Data
* Step 3: Define the Data Model
* Step 4: Run ETL to Model the Data
* Step 5: Complete Project Write Up

In [1]:
!pip install bovespa
!pip install pyspark
!pip install wbgapi



In [2]:
import os

import bovespa
import etl_functions
import wbgapi as wb
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, DateType, DoubleType, MapType
from pyspark.sql import types as T
from pyspark.sql import functions as F

### Step 1: Scope the Project and Gather Data

#### Scope
The Capstone project analyses and process data from Brazilian stock market since 2013 until now. Also,
the Brazilian economic data from [World Bank](https://data.worldbank.org/).

#### Describe and Gather Data
Describe the data sets you're using. Where did it come from? What type of information is included?

Bovespa is the main Brazilian stock Exchange. Here is listed all trading data since 2013.

It includes the day of the trading, company name, stock id, stock data, such as: price open, price close, price high, price low.

World Bank collects all data from all country in the world, since 1960.
It has information about environment, education, GDP, economy, social.
http://www.b3.com.br/en_us/market-data-and-indices/data-services/market-data/historical-data/equities/historical-quotes/
https://api.worldbank.org/v2/en/country/BRA?downloadformat=csv

Result will be written in disk, to make it simpler.

In [3]:
os.environ['JAVA_HOME'] = '/home/charl3ff/.sdkman/candidates/java/current'
spark = SparkSession.builder.enableHiveSupport()\
    .appName("Capstone")\
    .getOrCreate()

### ETL stock market

In [4]:
# Load data trading data from bovespa

input_data = "sample_data/"
trading_file = input_data + "COTAHIST*.txt"

df_data = spark.read.text(trading_file)

In [5]:
schema = StructType([
    StructField('date', DateType(), True),
    StructField('year', IntegerType(), True),
    StructField('month', IntegerType(), True),
    StructField('day', IntegerType(), True),
    StructField('money_volume', T.DoubleType(), True),
    StructField('volume', T.IntegerType(), True),
    StructField('stock_code', StringType(), True),
    StructField('company_name', StringType(), True),
    StructField('price_open', DoubleType(), True),
    StructField('price_close', DoubleType(), True),
    StructField('price_mean', DoubleType(), True),
    StructField('price_high', DoubleType(), True),
    StructField('price_low', DoubleType(), True),
    StructField('variation', DoubleType(), True)
])

In [6]:
# Parse data
@F.udf(returnType=schema)
def record_to_dict(row):
    """
    Transform string into bovespa.Record
    :param row: (string) position string from bovespa.
    :return: parsed Record
    """
    try:
        record = bovespa.Record(row)
    except:
        return None
    return {
        'date': record.date, 'year': record.date.year,
        'month': record.date.month, 'day': record.date.day,
        'money_volume': record.volume,
        'volume': record.quantity,
        'stock_code': record.stock_code, 'company_name': record.company_name,
        'price_open': record.price_open, 'price_close': record.price_close,
        'price_mean': record.price_mean, 'price_high': record.price_high,
        'price_low': record.price_low
    }

In [7]:
dict_df = df_data.select(record_to_dict('value').alias('dict'))
trading_df = dict_df.select("dict.*", "*").drop('dict')

In [8]:
trading_df =trading_df.withColumn('variation', (trading_df['price_close'] - trading_df['price_open']) / 100)

In [9]:
trading_df.printSchema()

root
 |-- date: date (nullable = true)
 |-- year: integer (nullable = true)
 |-- month: integer (nullable = true)
 |-- day: integer (nullable = true)
 |-- money_volume: double (nullable = true)
 |-- volume: integer (nullable = true)
 |-- stock_code: string (nullable = true)
 |-- company_name: string (nullable = true)
 |-- price_open: double (nullable = true)
 |-- price_close: double (nullable = true)
 |-- price_mean: double (nullable = true)
 |-- price_high: double (nullable = true)
 |-- price_low: double (nullable = true)
 |-- variation: double (nullable = true)



In [10]:
trading_df = trading_df.repartition("stock_code")

In [11]:
trading_df.orderBy(F.col('variation'), ascending=False).show(10)

+----------+----+-----+---+------------+------+----------+------------+----------+-----------+----------+----------+---------+---------+
|      date|year|month|day|money_volume|volume|stock_code|company_name|price_open|price_close|price_mean|price_high|price_low|variation|
+----------+----+-----+---+------------+------+----------+------------+----------+-----------+----------+----------+---------+---------+
|2020-03-11|2020|    3| 11|   1866254.0|   120|  IBOVP102|       IBOVE|   13800.0|    21500.0|  15552.11|   21500.0|  13000.0|     77.0|
|2020-03-09|2020|    3|  9|  1.429667E8|  8260|  IBOVP105|       IBOVE|   15728.0|    22000.0|  17308.31|   22000.0|  15328.0|    62.72|
|2020-03-11|2020|    3| 11| 5.5915076E7|  5015|   IBOVO96|       IBOVE|    7740.0|    14000.0|  11149.56|   14000.0|   7320.0|     62.6|
|2020-03-18|2020|    3| 18|    484500.0|    28|   IBOVP85|       IBOVE|   16000.0|    22000.0|  17303.57|   22000.0|  16000.0|     60.0|
|2020-03-11|2020|    3| 11|    280428.0| 

In [12]:
trading_df.orderBy(F.col('money_volume'), ascending=False).show(10)

+----------+----+-----+---+---------------+---------+----------+------------+----------+-----------+----------+----------+---------+--------------------+
|      date|year|month|day|   money_volume|   volume|stock_code|company_name|price_open|price_close|price_mean|price_high|price_low|           variation|
+----------+----+-----+---+---------------+---------+----------+------------+----------+-----------+----------+----------+---------+--------------------+
|2020-12-16|2020|   12| 16| 4.298379966E10|   365580|    IBOV11|    IBOVESPA|  117577.0|   117577.0|  117577.0|  117577.0| 117577.0|                 0.0|
|2020-04-15|2020|    4| 15|3.2720218273E10|   412109|    IBOV11|    IBOVESPA|   79397.0|    79397.0|   79397.0|   79397.0|  79397.0|                 0.0|
|2020-02-12|2020|    2| 12|2.2856435636E10|   195436|    IBOV11|    IBOVESPA|  116951.0|   116951.0|  116951.0|  116951.0| 116951.0|                 0.0|
|2020-06-17|2020|    6| 17|2.0103332942E10|   209231|    IBOV11|    IBOVESPA

In [13]:
trading_df.show(10)

+----------+----+-----+---+------------+--------+----------+------------+----------+-----------+----------+----------+---------+--------------------+
|      date|year|month|day|money_volume|  volume|stock_code|company_name|price_open|price_close|price_mean|price_high|price_low|           variation|
+----------+----+-----+---+------------+--------+----------+------------+----------+-----------+----------+----------+---------+--------------------+
|2020-01-02|2020|    1|  2|2.80483738E8|13591400|     GGBR4|      GERDAU|     20.13|      20.76|     20.63|     20.79|    20.12|0.006300000000000025|
|2020-01-02|2020|    1|  2|    23011.02|    1751|    NVHO11|FII NOVOHORI|     13.01|      13.02|     13.14|     13.25|     13.0|9.999999999999786E-5|
|2020-01-02|2020|    1|  2|    10306.76|      73|    CGAS3F|      COMGAS|    142.88|      140.3|    141.18|    142.88|    140.0|-0.02579999999999984|
|2020-01-02|2020|    1|  2|     34460.3|   11854|    DMMO3F|       DOMMO|       3.0|       2.92|    

### So, we can take all insights from it.
Such as:

In [14]:
# Most traded stocks in terms of money
stocks = trading_df.orderBy('money_volume', ascending=False).show(10)

+----------+----+-----+---+---------------+---------+----------+------------+----------+-----------+----------+----------+---------+--------------------+
|      date|year|month|day|   money_volume|   volume|stock_code|company_name|price_open|price_close|price_mean|price_high|price_low|           variation|
+----------+----+-----+---+---------------+---------+----------+------------+----------+-----------+----------+----------+---------+--------------------+
|2020-12-16|2020|   12| 16| 4.298379966E10|   365580|    IBOV11|    IBOVESPA|  117577.0|   117577.0|  117577.0|  117577.0| 117577.0|                 0.0|
|2020-04-15|2020|    4| 15|3.2720218273E10|   412109|    IBOV11|    IBOVESPA|   79397.0|    79397.0|   79397.0|   79397.0|  79397.0|                 0.0|
|2020-02-12|2020|    2| 12|2.2856435636E10|   195436|    IBOV11|    IBOVESPA|  116951.0|   116951.0|  116951.0|  116951.0| 116951.0|                 0.0|
|2020-06-17|2020|    6| 17|2.0103332942E10|   209231|    IBOV11|    IBOVESPA

In [15]:
# Most up variation in a day
trading_df.orderBy('variation', ascending=False).show(10)

+----------+----+-----+---+------------+------+----------+------------+----------+-----------+----------+----------+---------+---------+
|      date|year|month|day|money_volume|volume|stock_code|company_name|price_open|price_close|price_mean|price_high|price_low|variation|
+----------+----+-----+---+------------+------+----------+------------+----------+-----------+----------+----------+---------+---------+
|2020-03-11|2020|    3| 11|   1866254.0|   120|  IBOVP102|       IBOVE|   13800.0|    21500.0|  15552.11|   21500.0|  13000.0|     77.0|
|2020-03-09|2020|    3|  9|  1.429667E8|  8260|  IBOVP105|       IBOVE|   15728.0|    22000.0|  17308.31|   22000.0|  15328.0|    62.72|
|2020-03-11|2020|    3| 11| 5.5915076E7|  5015|   IBOVO96|       IBOVE|    7740.0|    14000.0|  11149.56|   14000.0|   7320.0|     62.6|
|2020-03-18|2020|    3| 18|    484500.0|    28|   IBOVP85|       IBOVE|   16000.0|    22000.0|  17303.57|   22000.0|  16000.0|     60.0|
|2020-03-11|2020|    3| 11|    280428.0| 

In [16]:
# Rows
print('Amount of rows', trading_df.count())

Amount of rows 1251648


### ETL World Bank

The `wbgapi` package is the official Python/R library to interact with the [World Bank open data](https://data.worldbank.org/),
it is useful to easily retrieve data, instead of download it ourselves.

The purpose is to fetch data from Brazil, then it will be possible to compare the stock market and the real economy.

In [17]:
# Where data comes from
wb.source.info().items[:3]

[{'id': '1',
  'lastupdated': '2019-10-23',
  'name': 'Doing Business',
  'code': 'DBS',
  'databid': '3001',
  'description': '',
  'url': '',
  'dataavailability': 'Y',
  'metadataavailability': 'Y',
  'concepts': '3'},
 {'id': '2',
  'lastupdated': '2021-04-26',
  'name': 'World Development Indicators',
  'code': 'WDI',
  'databid': '2',
  'description': '',
  'url': '',
  'dataavailability': 'Y',
  'metadataavailability': 'Y',
  'concepts': '3'},
 {'id': '3',
  'lastupdated': '2020-09-28',
  'name': 'Worldwide Governance Indicators',
  'code': 'WGI',
  'databid': '1181',
  'description': '',
  'url': '',
  'dataavailability': 'Y',
  'metadataavailability': 'Y',
  'concepts': '3'}]

In [18]:
metric_to_schema = {
    'GC.DOD.TOTL.GD.ZS': 'debt',
    'GC.XPN.TOTL.CN': 'total_expense',
    'NY.GDP.MKTP.KD.ZG': 'gdp_growth',
    'NY.GDP.MKTP.CD': 'gdp',
    'NY.GDP.PCAP.CD': 'gdp_per_capita',
    'SP.POP.TOTL': 'population',
    'SP.DYN.LE00.IN': 'life_expectancy',
    'GC.XPN.TOTL.GD.ZS': 'expense_per_gdp',
    'FI.RES.TOTL.CD': 'total_reserves',
    'SE.ADT.LITR.ZS': 'pop_literacy_rate',
    'SE.XPD.TOTL.GD.ZS': 'expenditure_education_per_gdp',
    'BX.KLT.DINV.CD.WD': 'foreign_investment'
}

In [19]:
wb.series.info().items[:3]

[{'id': 'AG.AGR.TRAC.NO', 'value': 'Agricultural machinery, tractors'},
 {'id': 'AG.CON.FERT.PT.ZS',
  'value': 'Fertilizer consumption (% of fertilizer production)'},
 {'id': 'AG.CON.FERT.ZS',
  'value': 'Fertilizer consumption (kilograms per hectare of arable land)'}]

In [20]:
# Create spark dataframe
df_raw = wb.data.DataFrame(metric_to_schema.keys(), economy='BRA', numericTimeKeys=True)
economy_data = df_raw.rename(metric_to_schema, axis=0)
years= list(economy_data.columns)
economy_data = economy_data.transpose()
economy_data['year'] = years
economy_data

series,foreign_investment,total_reserves,debt,total_expense,expense_per_gdp,gdp,gdp_growth,gdp_per_capita,pop_literacy_rate,expenditure_education_per_gdp,life_expectancy,population,year
1960,,4.328488e+08,,,,1.516557e+10,,210.109899,,,54.143,72179226.0,1960
1961,,4.711562e+08,,,,1.523685e+10,10.275912,205.040768,,,54.634,74311343.0,1961
1962,,3.360009e+08,,,,1.992629e+10,5.216059,260.425653,,,55.130,76514328.0,1962
1963,,3.550423e+08,,,,2.302148e+10,0.874673,292.252136,,,55.627,78772657.0,1963
1964,,2.454876e+08,,,,2.121189e+10,3.485582,261.666620,,,56.121,81064571.0,1964
...,...,...,...,...,...,...,...,...,...,...,...,...,...
2016,7.429462e+10,3.649840e+11,73.418804,2.285774e+12,36.459641,1.795700e+12,-3.275917,8710.096690,92.808441,6.31404,75.230,206163058.0,2016
2017,6.888507e+10,3.739555e+11,,2.387158e+12,36.260700,2.062831e+12,1.322869,9925.386238,93.075821,6.32255,75.456,207833831.0,2017
2018,7.816257e+10,3.747097e+11,,2.481853e+12,36.025396,1.885483e+12,1.317224,9001.234249,93.227501,,75.672,209469333.0,2018
2019,6.917415e+10,3.568864e+11,,2.572835e+12,35.453513,1.839758e+12,1.136586,8717.186278,,,75.881,211049527.0,2019


In [21]:
economy_df = spark.createDataFrame(economy_data)
economy_df.printSchema()

root
 |-- foreign_investment: double (nullable = true)
 |-- total_reserves: double (nullable = true)
 |-- debt: double (nullable = true)
 |-- total_expense: double (nullable = true)
 |-- expense_per_gdp: double (nullable = true)
 |-- gdp: double (nullable = true)
 |-- gdp_growth: double (nullable = true)
 |-- gdp_per_capita: double (nullable = true)
 |-- pop_literacy_rate: double (nullable = true)
 |-- expenditure_education_per_gdp: double (nullable = true)
 |-- life_expectancy: double (nullable = true)
 |-- population: double (nullable = true)
 |-- year: long (nullable = true)



In [22]:
economy_df.show(10)

+------------------+--------------+----+-------------+---------------+-------------------+-----------------+----------------+-----------------+-----------------------------+---------------+-----------+----+
|foreign_investment|total_reserves|debt|total_expense|expense_per_gdp|                gdp|       gdp_growth|  gdp_per_capita|pop_literacy_rate|expenditure_education_per_gdp|life_expectancy| population|year|
+------------------+--------------+----+-------------+---------------+-------------------+-----------------+----------------+-----------------+-----------------------------+---------------+-----------+----+
|               NaN|    4.328488E8| NaN|          NaN|            NaN|1.51655699125199E10|              NaN|210.109899384623|              NaN|                          NaN|         54.143|7.2179226E7|1960|
|               NaN|   4.7115615E8| NaN|          NaN|            NaN| 1.5236854859469E10|  10.275911554301| 205.04076826426|              NaN|                          NaN

### Step 2: Explore and Assess the Data
#### Explore the Data
Identify data quality issues, like missing values, duplicate data, etc.

#### Cleaning Steps
Document steps necessary to clean the data

In [23]:
# Not necessary


### Step 3: Define the Data Model
#### 3.1 Conceptual Data Model
Map out the conceptual data model and explain why you chose that model

### Trading Table

| Feature        | Type
| ------------- |:-------------:|
| date         | date    |
| year         | integer |
| month        | integer |
| day          | integer |
| money_volume | double  |
| volume       | integer |
| stock_code   | string  |
| company_name | string  |
| price_open   | double  |
| price_close  | double  |
| price_mean   | double  |
| price_high   | double  |
| price_low    | double  |
| variation    | double  |

### Economy Table

| Feature        | Type  |
| ------------- |:-------------:|
| foreign_investment            | double |
| total_reserves                | double |
| debt                          | double |
| total_expense                 | double |
| expense_per_gdp               | double |
| gdp                           | double |
| gdp_growth                    | double |
| gdp_per_capita                | double |
| pop_literacy_rate             | double |
| expenditure_education_per_gdp | double |
| life_expectancy               | double |
| population                    | double |
| year                          | long   |

#### 3.2 Mapping Out Data Pipelines

1. Read trading data from bovespa
2. Parse it then load into spark
3. Process data creating custom columns
4. Write it as parquet
6. Read trading data from world bank as pandas
7. Parse it then load into spark
8. Process data creating custom columns
9. Write it as parquet


### Step 4: Run Pipelines to Model the Data 
#### 4.1 Create the data model
Build the data pipelines to create the data model.

Complete flow can be found in `etl.py`

In [24]:
# Write code here

def process_economy_data(spark, output_data):
    """
    ETL Brazilian economy data from world bank
    :param spark: (SparkSession) spark session instance
    :param output_data: (string) output file path
    :return: spark dataframe representing economy table
    """

    economy_df = etl_functions.create_economy_df()
    economy_df = spark.createDataFrame(economy_df)
    return etl_functions.create_economy_table(economy_df, output_data)


def process_trading_data(spark, trading_files, output_data):
    """
    ETL trading data.
    :param spark: (SparkSession) spark session instance
    :param trading_files: (string) input file path
    :param output_data: (string) output file path
    :return: spark dataframe of trading data
    """

    trading_df = spark.read.text(paths=trading_files)

    trading_df = etl_functions.raw_trading_to_spark(trading_df)
    trading_df = etl_functions.trading_columns(trading_df)

    return etl_functions.create_trading_table(trading_df, output_data)

def etl():
    input_data = "sample_data"
    output_data = "sample_data/output"

    trading_files = os.path.join(input_data, "COTAHIST_A*.txt")

    trading_df = process_trading_data(spark, trading_files, output_data)
    economy_df = process_economy_data(spark, output_data)

    etl_functions.quality_check(economy_df, 'economy')
    etl_functions.quality_check(trading_df, 'trading')
    etl_functions.quality_check_column(trading_df, 'stock_code')
    etl_functions.quality_check_column(trading_df, 'date')
    etl_functions.quality_check_column(trading_df, 'volume')
    etl_functions.quality_check_column(economy_df, 'gdp')
    etl_functions.quality_check_column(economy_df, 'year')


#### 4.2 Data Quality Checks
Explain the data quality checks you'll perform to ensure the pipeline ran as expected. These could include:
 * Integrity constraints on the relational database (e.g., unique key, data type, etc.)
 * Unit tests for the scripts to ensure they are doing the right thing
 * Source/Count checks to ensure completeness
 
Run Quality Checks

In [25]:
# Perform quality checks here
etl_functions.quality_check(economy_df, 'economy')
etl_functions.quality_check(trading_df, 'trading')
etl_functions.quality_check_column(trading_df, 'stock_code')
etl_functions.quality_check_column(trading_df, 'date')
etl_functions.quality_check_column(trading_df, 'volume')
etl_functions.quality_check_column(economy_df, 'gdp')
etl_functions.quality_check_column(economy_df, 'year')

Data quality check passed for economy with 61 records.
Data quality check passed for trading with 1251648 records.
Data quality check passed for table stock_code with 1251648 records.
Data quality check passed for table date with 1251648 records.
Data quality check passed for table volume with 1251646 records.
Data quality check passed for table gdp with 61 records.
Data quality check passed for table year with 61 records.


#### 4.3 Data dictionary 
Create a data dictionary for your data model. For each field, provide a brief description of
what the data is and where it came from. You can include the data dictionary in the notebook or in a separate file.

### Trading Table

Data comes from [Bovespa](http://www.b3.com.br/en_us/), it is the only regulated Brazilian exchange.

| Feature      | Description  |
| ------------:|:------------:|
| date         | date that stock is being traded                      |
| year         | year that stock is being traded                      |
| month        | month that stock is being traded                     |
| day          | day that stock is being traded                       |
| money_volume | total amount of money in all transactions            |
| volume       | total number traded stocks                           |
| stock_code   | unique stock code                                    |
| company_name | name of the company that is represented by the stock |
| price_open   | price when trading opened                            |
| price_close  | price when trading closed                            |
| price_mean   | price mean of the stock                              |
| price_high   | price high of the stock                              |
| price_low    | price low of the stock                               |
| variation    | the variation between price close and price open     |

### Economy Table

Data comes from [World Bank Open Data](https://data.worldbank.org/).

| Feature        | Description  |
| ------------- |:-------------:|
| foreign_investment            | investment made by others countries                                       |
| total_reserves                | total reserve of money owned by the government (usually dollars and gold) |
| debt                          | total country debt (external and internal)                                |
| total_expense                 | total money spent in a year by the government                             |
| expense_per_gdp               | percentage of expense per total gdp                                       |
| gdp                           | total gdp (in dolllars)                                                   |
| gdp_growth                    | gdp growth percentage                                                     |
| gdp_per_capita                | total gdp divided by total population                                     |
| pop_literacy_rate             | rate of people knows how to read                                          |
| expenditure_education_per_gdp | amount of money spent in education divided by total gdp                   |
| life_expectancy               | population life expectancy                                                |
| population                    | total population                                                          |
| year                          | year that metrics was collected                                           |

#### Step 5: Complete Project Write Up
* Clearly state the rationale for the choice of tools and technologies for the project.
* Propose how often the data should be updated and why.
* Write a description of how you would approach the problem differently under the following scenarios:

#### Scenarios

* The data was increased by 100x.
Create airflow data pipelines to run spark partitioned by date, currently it is not so well optimized for multiple worker nodes.
Airflow can easily handle with multiple integrations, it integrates with Spark, S3, a large amount of databases, also there is a
user-friendly front end.

* The data populates a dashboard that must be updated on a daily basis by 7am every day.
Create an AWS lambda that would be triggered at that time, or an airflow pipeline.

* The database needed to be accessed by 100+ people.
Send data to an AWS Kinesis, and connect it to a data visualization tool, such as Tableu, or PowerBI, what would offload database from heavy aggregations at the same time.

##### Technologies

**Spark** is way faster and simpler then all other data processing systems, like pandas for example,
it also is very flexible when it comes to apply functions. The best of all, it is possible to use it like SQL.

Data can be easily loaded into s3 or any other data lake, spark makes it easy.

##### Data update

Data may be updated yearly, because economy data takes time to be collected by the World Bank.
Trading data is only updated monthly by the Bovespa exchange.

##### Data schema

Star schema, besides it increase data redundancy, what usually is an anti-pattern, it is used because aggregation performance
hugely increases. The is join, but the is not so much need of it as in snowflake data schema.
The data analysis I propose heavily rely on groupby, orderby, etc.
