# Udacity DE Nanodegree - Capstone Project - ETL

# Introduction

This is the Jupyter notebook that has the entire ETL code for the Udacity DE Nanodegree - Capstone Project.

# Library initialization

In [1]:
import os.path
import time

import numpy as np
import pandas as pd
from sqlalchemy import create_engine

import proj_funcs



# Data gathering

## ETH-USD Trading Data
In order to gather minute-by-minute ETH-USD Trading Data, it will be necessary to download it from [CryptoDataDownload's website](https://www.cryptodatadownload.com/data/bitstamp/), which will be done in the following section:

## Data download

Let's download the ETH-USD Trading Data CSV's:

In [2]:
# Download Ethereum Price and Volume data is it doesn't exist

eth_bitstamp_urls = [
    "https://www.cryptodatadownload.com/cdd/Bitstamp_ETHUSD_2021_minute.csv",
    "https://www.cryptodatadownload.com/cdd/Bitstamp_ETHUSD_2020_minute.csv",
    "https://www.cryptodatadownload.com/cdd/Bitstamp_ETHUSD_2019_minute.csv"
]

proj_funcs.getTradingData(eth_bitstamp_urls)

## Data exploration

Now, let's open the recently downloaded CSV files and describe them:

In [3]:
proj_funcs.describeTradingData("eth-usd")


****ETHUSD_2019_minute.csv****


  proj_funcs.describeTradingData("eth-usd")


         https://www.CryptoDataDownload.com
count                              525600.0
unique                             299560.0
top                                     0.0
freq                               182088.0
missing                                 0.0
   
****ETHUSD_2020_minute.csv****


  proj_funcs.describeTradingData("eth-usd")


         https://www.CryptoDataDownload.com
count                              487723.0
unique                             332567.0
top                                     0.0
freq                               141599.0
missing                                 0.0
   
****ETHUSD_2021_minute.csv****


  proj_funcs.describeTradingData("eth-usd")


         https://www.CryptoDataDownload.com
count                              484275.0
unique                             449260.0
top                                     0.0
freq                                29311.0
missing                                 0.0
   


It seems there's something wrong with these files. By taking a look at one of these files, it can be noticed that the first line of the file does not correspond to the headers or to relevant data:

![title](./img/img_1.png)

Thus, it'll be removed using the ```skiprows``` argument from ```pd.read_csv``` method, which will be injected through ```sk_rows``` argument. Let's try that:

In [4]:
proj_funcs.describeTradingData("eth-usd", sk_rows=1)


****ETHUSD_2019_minute.csv****
                 unix                 date   symbol           open  \
count    5.255990e+05               525599   525599  525599.000000   
unique            NaN               525599        1            NaN   
top               NaN  2019-12-31 23:59:00  ETH/USD            NaN   
freq              NaN                    1   525599            NaN   
mean     1.562069e+09                  NaN      NaN     180.735784   
std      9.103650e+06                  NaN      NaN      50.623448   
min      1.546301e+09                  NaN      NaN     100.530000   
25%      1.554185e+09                  NaN      NaN     141.160000   
50%      1.562069e+09                  NaN      NaN     173.500000   
75%      1.569953e+09                  NaN      NaN     209.020000   
max      1.577837e+09                  NaN      NaN     363.170000   
missing  0.000000e+00                    0        0       0.000000   

                  high            low          close     V

Good news here: There are no missing values in the entirety of the files. Now, let's analyze above's output, variable by variable:

- **unix**: this is the UNIX timestamp of the time the data row was generated. Its max and min values are in line with plausible values for a UNIX timestamp;
- **date**: this is the datetime timestamp of the time the data row was generated. The number of unique values is equal to the number of rows, so all of the values in this column are unique. Nevertheless, we need to run further testing on this column regarding the formatting of the date;
- **symbol**: the currency pair of the data. In this case, it should always be ```ETH/USD```, which is the case for all of the files, as per above's output;
- **open**,  **high**, **low** and **close**: these are the opening, highest, lowest and closing prices (USD per ETH) for the minute timeframe starting at **unix**. The maximum and minimum prices sem to be in line with general domain knowledge of ```ETH/USD``` market;
- **Volume ETH** and **Volume USD**: the number of ETH and USD that were transacted on the minute timeframe starting at **unix**.

Now, let's go to the data testing section.


### Data quality testing

The following data quality tests will be performed after changing column names to ones that are more descriptive:

- *Data types must match the following, respectively*: int64, object, object, float64, float64, float64, float64, float64, float64;
- *Unique value for the currency_pair column should be 'ETH/USD'*;
- *Unix timestamps can't be greater than now's timestamp*;
- *Unix timestamp must correspond to its respective datetime timestamp (in line with previous concerns regarding the **date** field)*;
- *The timelapse between two consecutive data rows should be 60 seconds*.

In [5]:

dfs = [pd.read_csv("eth-usd/" + file, skiprows=1)
        for file in os.listdir("eth-usd")]
eth_data = pd.concat(dfs, 
                        axis=0).rename(
                                        columns={
                                                "unix":"unix_timestamp",
                                                "date":"date_time",
                                                "symbol":"currency_pair",
                                                "open":"open_price",
                                                "high":"high_price",
                                                "low":"low_price",
                                                "close":"close_price",
                                                'Volume ETH':'volume_1',
                                                'Volume USD':'volume_2'})

# Order data for further timelapse data test
eth_data = eth_data.reset_index(drop=True)
eth_data.sort_values(by='unix_timestamp', ascending=True, inplace=True)
eth_data = eth_data.reset_index(drop=True)

# Data types must match expected ones, as per data_types list:
data_types = [np.int64, object, object, np.float64, np.float64,
                np.float64, np.float64, np.float64, np.float64]
check_data_types = proj_funcs.checkDataTypes(eth_data, data_types)

# Unique value for the currency_pair column should be 'ETH/USD':
check_unique_val = proj_funcs.checkUniqueVal(eth_data, 'currency_pair', 'ETH/USD')

# Unix timestamps can't be greater than now's timestamp:
check_max_unix_timestamp = proj_funcs.checkMaxUnixTimestamp(
                                                            eth_data, 
                                                            'unix_timestamp')

# Unix timestamp must correspond to datetime timestamp
eth_data['unix_date'] = pd.to_datetime(
                                        eth_data['unix_timestamp'], 
                                        unit = 's'
                                    ) 
check_unix_with_time_cols = proj_funcs.checkUnixWithTimeCols(
                                                             eth_data, 
                                                             'unix_date',
                                                             'date_time'
                                                            )

# The timelapse between two consecutive data rows should be 60 seconds
# NOTE: the column 'diff' will have a value of nan in the first row because
# there are no rows before the first one.
eth_data['diff'] = eth_data['unix_timestamp'].diff()
check_diff_equal_to = proj_funcs.checkDiffEqualTo(eth_data, 'diff', 60)

if all([check_data_types, check_unique_val, check_max_unix_timestamp, 
        check_unix_with_time_cols, check_diff_equal_to]):

        print("All tests passed!")
else:
        print("Not all tests passed.")




Data types check OK!
Unique value check for column 'currency_pair' OK!
Unix Timestamp value check for column 'unix_timestamp' OK!
Unix Timestamp value check for columns 'unix_date' and 'date_time' OK!
Error: The difference between two consecutive data rows should be 60 for column diff.
Not all tests passed.


All but the timelapse data test have been passed. Let's delve into the infringing rows to see what happens (let's not forget that the first row of the ```diff``` column in the above code snippet has a value of ```nan```, so it will be removed from this analysis):

In [6]:
unique_diff = list(eth_data['diff'][1:eth_data.shape[0]-1].unique())
unique_diff = [i for i in unique_diff if i != 60]

for i in unique_diff:    
    print("--------------------------------------------------")
    print("Rows with diff = " + str(int(i)))
    print(eth_data[eth_data['diff'] == i])
    print("")


--------------------------------------------------
Rows with diff = 120
         unix_timestamp            date_time currency_pair  open_price  \
525599       1577836860  2020-01-01 00:01:00       ETH/USD      128.63   
1013321      1609459260  2021-01-01 00:01:00       ETH/USD      738.63   

         high_price  low_price  close_price   volume_1      volume_2  \
525599       128.63     128.63       128.63   0.000000      0.000000   
1013321      740.50     738.63       740.50  15.054058  11147.529979   

                  unix_date   diff  
525599  2020-01-01 00:01:00  120.0  
1013321 2021-01-01 00:01:00  120.0  

--------------------------------------------------
Rows with diff = 2359080
        unix_timestamp            date_time currency_pair  open_price  \
996515      1608450840  2020-12-20 07:54:00       ETH/USD      656.15   

        high_price  low_price  close_price   volume_1      volume_2  \
996515      656.36     656.15       656.36  18.000252  11814.645298   

          

There are 2 values other than 60 seconds: 120 and 2359080. Let's explore the vicinities of the infringing rows:

In [7]:
for i in unique_diff:  
    indexes = eth_data[eth_data['diff'] == i].index
    print("Vicinities for instances of diff = ", str(i))
    for j in indexes:
        print(eth_data.loc[j-3:j+3, ['unix_timestamp', 'date_time', 'diff']]) 
    print("")

Vicinities for instances of diff =  120.0
        unix_timestamp            date_time   diff
525596      1577836620  2019-12-31 23:57:00   60.0
525597      1577836680  2019-12-31 23:58:00   60.0
525598      1577836740  2019-12-31 23:59:00   60.0
525599      1577836860  2020-01-01 00:01:00  120.0
525600      1577836920  2020-01-01 00:02:00   60.0
525601      1577836980  2020-01-01 00:03:00   60.0
525602      1577837040  2020-01-01 00:04:00   60.0
         unix_timestamp            date_time   diff
1013318      1609459020  2020-12-31 23:57:00   60.0
1013319      1609459080  2020-12-31 23:58:00   60.0
1013320      1609459140  2020-12-31 23:59:00   60.0
1013321      1609459260  2021-01-01 00:01:00  120.0
1013322      1609459320  2021-01-01 00:02:00   60.0
1013323      1609459380  2021-01-01 00:03:00   60.0
1013324      1609459440  2021-01-01 00:04:00   60.0

Vicinities for instances of diff =  2359080.0
        unix_timestamp            date_time       diff
996512      1606091640  2020-11-

By taking a look at above input, there are a couple of conclusions that can be derived:
- The 120 ```diff``` instances correspond to the transition between different years. It seems there's no data for ```2020-01-01 00:00:00``` and ```2021-01-01 00:00:00```. It is possible that the exchange's API stops working due to some unintended bug or some scheduled maintenance at the end of the year;
- The 2359080 ```diff``` instance definitely corresponds to a big gap in the data that should be accounted for and addressed at a further stage as future work.

Data will be persisted in the next section.
Now, let's persist these data in a containerized, local Postgres DB. NOTE: this can be done on any postgres-based DB; simply change the endpoint data accordingly:

### Data persistance

Let's persist these data in a containerized, local Postgres DB. NOTE: this can be done on any postgres-based DB; simply change the endpoint data accordingly:

In [8]:
# Create data schema and trading table

endpoint = 'postgresql+psycopg2://postgres:postgres@localhost:5432/postgres'
query = """
                  DROP SCHEMA IF EXISTS crypto CASCADE;
                  CREATE SCHEMA IF NOT EXISTS crypto;
                  --DROP TABLE IF EXISTS crypto.staging_trading
                  CREATE TABLE IF NOT EXISTS crypto.staging_trading (    
                  unix_timestamp BIGINT
                  , date_time TIMESTAMP
                  , currency_pair VARCHAR(20)
                  , open_price NUMERIC
                  , high_price NUMERIC
                  , low_price NUMERIC
                  , close_price NUMERIC
                  , volume_1 NUMERIC
                  , volume_2 NUMERIC
                  );
                  """
eth_data = eth_data[["unix_timestamp", "date_time", "currency_pair",
    "open_price", "high_price", "low_price", "close_price",
    'volume_1', 'volume_2']]
table = "staging_trading"
table_schema = "crypto"

proj_funcs.persistToDb(endpoint, query, eth_data, table, table_schema)


Now that the ether trading data source has benn taken care of, let's proceed with the etherscan datasource.

## Etherscan Network Data

### Data Download

This datasource comes from an API ([Documentation](https://docs.etherscan.io/)). In order to gather data from it, a request should be made to the corresponding endpoint by using a start date, an end date and an API key previously gathered from Etherscan's website. For this, the ```getEtherScanResults``` function was created. Let's test this function and its outputs:

In [9]:
start_date = '2019-02-01'
end_date = '2019-02-27'
API_KEY = "2URV7G2WBBE7INN2EV1N83D4YAX65NVY7Q"
url_template = 'https://api.etherscan.io/api?module=stats&action=chainsize&startdate={}&enddate={}&clienttype=geth&syncmode=default&sort=asc&apikey={}'

print(proj_funcs.getEtherScanResults(start_date, end_date, API_KEY, url_template))


   blockNumber chainTimeStamp     chainSize clientType syncMode
0      7156164     2019-02-01  184726421279       Geth  Default
1      7161012     2019-02-02  184726654448       Geth  Default
2      7165949     2019-02-03  184726769049       Geth  Default
3      7170805     2019-02-04  186974874323       Geth  Default
4      7175752     2019-02-05  186974913559       Geth  Default
5      7180654     2019-02-06  186974923662       Geth  Default
6      7185533     2019-02-07  186974939161       Geth  Default
7      7190465     2019-02-08  186974941698       Geth  Default
8      7195382     2019-02-09  188036647969       Geth  Default
9      7200281     2019-02-10  188036745122       Geth  Default
10     7204582     2019-02-11  189825908951       Geth  Default
11     7208771     2019-02-12  189825138765       Geth  Default
12     7213043     2019-02-13  191029008588       Geth  Default
13     7217203     2019-02-14  191029044688       Geth  Default
14     7221377     2019-02-15  191029043

Now that the function has been tested, let's gather all the data from 2019-01-01 onwards:

In [10]:
etherscan_data = pd.DataFrame(columns = [
                                          'blockNumber',
                                          'chainTimeStamp', 
                                          'chainSize', 
                                          'clientType', 
                                          'syncMode'
                                          ]
                              )

max_abs_date = max(eth_data['date_time']).split(' ')[0]
min_abs_date = min(eth_data['date_time']).split(' ')[0]
datelist = pd.date_range(start=min_abs_date,end=max_abs_date).tolist()

for i in range((len(datelist)//30) + 1):
  
    date_range = [i.strftime('%Y-%m-%d') for i in datelist[(i*30):(((i+1)*30))]]
    start_date = date_range[0]
    end_date = date_range[-1]
    response_df = proj_funcs.getEtherScanResults(start_date, end_date, 
                                        API_KEY, url_template)
    if response_df is str and response_df == "invalid response":
        continue
    etherscan_data = etherscan_data.append(response_df, ignore_index=True)
    time.sleep(0.3)


etherscan_data = etherscan_data.\
                  rename(columns={
                                  "blockNumber":"block_number",
                                  "chainTimeStamp":"chain_time_stamp",
                                  "chainSize":"chain_size",
                                  "clientType":"client_type",
                                  "syncMode":"sync_mode"}
                        )
etherscan_data


Unnamed: 0,block_number,chain_time_stamp,chain_size,client_type,sync_mode
0,7111289,2019-01-23,180427595683,Geth,Default
1,7116274,2019-01-24,180427599016,Geth,Default
2,7121375,2019-01-25,181437999437,Geth,Default
3,7126462,2019-01-26,181438824915,Geth,Default
4,7131517,2019-01-27,181439384907,Geth,Default
...,...,...,...,...,...
1042,13705423,2021-11-29,1170199295600,Geth,Default
1043,13711627,2021-11-30,1172134023323,Geth,Default
1044,13717878,2021-12-01,1174116456497,Geth,Default
1045,13724121,2021-12-02,1176081509097,Geth,Default


Let's go, then, to the Data Exploration section.

### Data exploration

Let's see how does the data look:

In [11]:
### Data exploration

desc_data = etherscan_data.describe(include='all')
desc_null = etherscan_data.isnull().sum().to_frame(name = 'missing').T
print(pd.concat([desc_data, desc_null]))



        block_number chain_time_stamp    chain_size client_type sync_mode
count           1047             1047          1047        1047      1047
unique          1047             1046          1040           1         1
top          7111289       2020-08-01  229824047924        Geth   Default
freq               1                2             4        1047      1047
missing            0                0             0           0         0


Again, good news: no missing values in this dataframe. Now, let's analyze above's output, variable by variable:

- **block_number**: the number of blocks generated in the Ethereum network, as of **chain_time_stamp** date (hence the "Blockchain" moniker). This column seems to be in line with the general domain knowledge about what that number should be;
- **chain_time_stamp**: the date corresponding to the moment the data row was gathered. It seems there's duplicate data for one of the days, as the count of rows is greater than the count of unique values by 1;
- **chain_size**: the size (in bytes) of the Ethereum blockchain. All of the values should be unique here (it's nearly impossible that the Ethereum blockchain remains the same size from a given day to the next, as there're always some transactions being performed);
- **client_type**: the Ethereum blockchain client that was used to extract the data row from the Ethereum blockchain (a list of the clients can be found [here](https://media.consensys.net/an-definitive-list-of-ethereum-developer-tools-2159ce865974)). Geth was chosen due to familiarity with it, but any client can be chose. All of the rows have the same ```Geth``` value, so no concerns here;
- **sync_mode**: the way the Ethereum network's state is recreated (```FAST```, by default, i.e. the Ethereum blockchain is not generated incrementally, block by block). Similar to **client_type**, all of the rows have the same ```default``` value, so no concerns here.

Now, let's go to the data testing section.

### Data quality testing

First, the concerns found in the data exploration section will be addressed: 

- Duplicate **chain_time_stamp** values;
- Duplicate **chain_size** values.

Let's begin with the first concern:

In [12]:
etherscan_dt_count = pd.value_counts(etherscan_data.chain_time_stamp).to_frame().reset_index()
repeat_date = etherscan_dt_count[etherscan_dt_count['chain_time_stamp'] == 2]['index']
etherscan_data[etherscan_data['chain_time_stamp'] == repeat_date[0]]


Unnamed: 0,block_number,chain_time_stamp,chain_size,client_type,sync_mode
556,10570514,2020-08-01,473642779532,Geth,Default
557,10570758,2020-08-01,473651278977,Geth,Default


There seems to be some sort of double counting in that day. There are a couple of ways to deal with this:

- Ask the source of these data for clarifications: the best way, but it's uncertain whether if the source is reachable or not;
- Interpolate **block_number** and **chain_size** values: it may be a good way of combining the two rows, but the resulting row is not the truth;
- Eliminate one of the rows: easy, retains one of the two versions of the truth, but completely ignores the other.

For the sake of this project, let's go with the third alternative:

In [13]:
etherscan_data = etherscan_data[etherscan_data['block_number'] != 10570514].reset_index(drop=True)


One of the rows has been erased successfully. Now, let's delve into the second concern:

In [14]:
etherscan_chain_count = pd.value_counts(etherscan_data.chain_size).to_frame().reset_index().head(10)
repeat_chain = etherscan_chain_count[etherscan_chain_count['chain_size'] > 1]['index']
etherscan_data[~etherscan_data['chain_size'].isin(repeat_chain)]


Unnamed: 0,block_number,chain_time_stamp,chain_size,client_type,sync_mode
0,7111289,2019-01-23,180427595683,Geth,Default
1,7116274,2019-01-24,180427599016,Geth,Default
2,7121375,2019-01-25,181437999437,Geth,Default
3,7126462,2019-01-26,181438824915,Geth,Default
4,7131517,2019-01-27,181439384907,Geth,Default
...,...,...,...,...,...
1042,13705423,2021-11-29,1170199295600,Geth,Default
1043,13711627,2021-11-30,1172134023323,Geth,Default
1044,13717878,2021-12-01,1174116456497,Geth,Default
1045,13724121,2021-12-02,1176081509097,Geth,Default


Most of the duplicate chain_size cases have to do with consecutive days, which suggests there may be some technical bug related to the client, or maybe some human error that is not immediately verifiable. Thus, for the sake of this project, it will be left that way, but on a real-life scenario, it is advised to further delve into these inconsistencies (including the ones discussed in the first concern).

Regarding the formal data quality tests to be performed in this data source, these will be performed:

- *Data types must match the following, respectively*: int64, object, object, float64, float64, float64, float64, float64, float64;
- *Unique value for the client_type and sync_mode columns should be 'Geth' and 'Default', respectively*;
- *chain_time_stamp cannot be more recent than now*;
- *The timelapse between two date-ordered, consecutive data rows should be 1 day*;
- *Given the order in the previous item, both block_number and chain_size should grow (or at least remain the same) as time passes*.

In [15]:

# Order data for further timelapse data test
etherscan_data = etherscan_data.reset_index(drop=True)
etherscan_data.sort_values(by='chain_time_stamp', ascending=True, inplace=True)
etherscan_data = etherscan_data.reset_index(drop=True)

#Type conversions for better workability
etherscan_data = etherscan_data.astype({
                                        'block_number': 'int64',
                                        'chain_size': 'int64' 
                                        })

# Data types must match expected ones, as per data_types list:
columns = list(etherscan_data.columns)    
data_types = [np.int64, object, np.int64, object, object] #pd.Timestamp
check_data_types = proj_funcs.checkDataTypes(etherscan_data, data_types)


# Unique value for the client_type and sync_mode columns 
# should be 'Geth' and 'Default', respectively:
check_unique_val_1 = proj_funcs.checkUniqueVal(etherscan_data, 
                                                'client_type', 'Geth')
check_unique_val_2 = proj_funcs.checkUniqueVal(etherscan_data, 
                                                'sync_mode', 'Default')

# chain_time_stamp cannot be more recent than now
etherscan_data_dupl = etherscan_data.copy()
etherscan_data_dupl['chain_time_stamp'] = \
    pd.to_datetime(etherscan_data_dupl['chain_time_stamp'])
etherscan_data_dupl['chain_time_stamp'] = \
    (etherscan_data_dupl['chain_time_stamp'] -
         pd.Timestamp("1970-01-01")) // pd.Timedelta('1s')  
check_max_unix_timestamp = \
    proj_funcs.checkMaxUnixTimestamp(etherscan_data_dupl, 'chain_time_stamp')

# block_number should grow (or at least remain the same) as time passes 
etherscan_data['block_diff'] = etherscan_data['block_number'].diff()
check_diff_greater_equal_to_1 = \
    proj_funcs.checkDiffGreaterEqualTo(etherscan_data, 'block_diff', 0)

# chain_size should grow (or at least remain the same) as time passes 
etherscan_data['chain_diff'] = etherscan_data['chain_size'].diff()
check_diff_greater_equal_to_2 = \
    proj_funcs.checkDiffGreaterEqualTo(etherscan_data, 'chain_diff', 0)

if all([check_data_types, check_unique_val_1, check_unique_val_2, 
        check_max_unix_timestamp, check_diff_greater_equal_to_1,
        check_diff_greater_equal_to_2]):

        print("All tests passed!")
else:
        print("Not all tests passed.")

Data types check OK!
Unique value check for column 'client_type' OK!
Unique value check for column 'sync_mode' OK!
Unix Timestamp value check for column 'chain_time_stamp' OK!
Difference between values in the block_diff column equal to 0: OK!
Error: The difference between two consecutive data rows should be 0 for column chain_diff.
Not all tests passed.


All but the monotonically increasing chain size check have passed. It can be seen that there are some occasions in which the chain size decreases from one day to another. Let's explore these:

In [16]:
etherscan_data[etherscan_data['chain_diff']<0]


Unnamed: 0,block_number,chain_time_stamp,chain_size,client_type,sync_mode,block_diff,chain_diff
20,7208771,2019-02-12,189825138765,Geth,Default,4189.0,-7.701860e+05
23,7221377,2019-02-15,191029043090,Geth,Default,4174.0,-1.598000e+03
26,7233897,2019-02-18,193137877427,Geth,Default,4147.0,-2.737614e+07
33,7263727,2019-02-25,196296727498,Geth,Default,4063.0,-2.890617e+07
43,7318905,2019-03-07,201604491690,Geth,Default,6414.0,-2.674129e+08
...,...,...,...,...,...,...,...
965,13214122,2021-09-13,1031570854573,Geth,Default,6544.0,-7.280540e+08
987,13355750,2021-10-05,1072281226367,Geth,Default,6372.0,-5.715962e+08
1012,13515242,2021-10-30,1117086238770,Geth,Default,6370.0,-1.219012e+09
1024,13591634,2021-11-11,1138287080480,Geth,Default,6379.0,-1.299758e+09


There seems to be no evident pattern for this behavior. For the seka of this project, we'll leavet the way it is, but in a real-life scenario, this should be taken care of. Let's persist the data, then.

### Data persistance

Let's persist these data in the same DB as with the trading data (an ```id``` surrogate key will be added):

In [17]:
# Create data schema and etherscan table

endpoint = 'postgresql+psycopg2://postgres:postgres@localhost:5432/postgres'
query ="""
        DROP TABLE IF EXISTS crypto.staging_etherscan;
        CREATE TABLE IF NOT EXISTS crypto.staging_etherscan(
            id SERIAL,
            block_number INT,
            date_time TIMESTAMP,
            chain_size BIGINT,
            client_type VARCHAR(50),
            sync_mode VARCHAR(50)
            );
        """   
etherscan_data = etherscan_data[['block_number', 
                                'chain_time_stamp', 
                                'chain_size', 
                                'client_type', 
                                'sync_mode']]
etherscan_data = etherscan_data.rename(columns={'chain_time_stamp':
                                                'date_time'})
table = "staging_etherscan"
table_schema = "crypto"

proj_funcs.persistToDb(endpoint, query, etherscan_data, table, table_schema)

# Data Model Creation

A new view will be created for the main fact table, which will be the table to go whenever analytical needs must be addressed:

In [18]:
# Create data schema and fact table
engine = create_engine(
          'postgresql+psycopg2://postgres:postgres@localhost:5432/postgres'
          )
fact_tbl_creation = """
                        DROP VIEW IF EXISTS crypto.fact_analytical;
                        CREATE VIEW crypto.fact_analytical AS(

                            WITH etherscan AS (
                                SELECT 
                                    e.block_number			
                                    , DATE_TRUNC('day', e.date_time) date_truncated			
                                    , e.chain_size
                                FROM 
                                    crypto.staging_etherscan e
                            ),
                            
                            trading AS (
                                SELECT 
                                    t.date_time
                                    , DATE_TRUNC('day', t.date_time) date_truncated
                                    , t.open_price
                                    , t.high_price
                                    , t.low_price
                                    , t.close_price
                                    , t.volume_1
                                    , t.volume_2			
                                    
                                FROM 
                                    crypto.staging_trading t
                            )
                            
                            SELECT 
                                *
                            FROM 
                                trading t 
                            
                            LEFT JOIN 
                                etherscan e	 
                            
                            USING 
                                (date_truncated)
                        );
                         """
with engine.connect() as con:
    con.execute(fact_tbl_creation)

Let's run a sample query based on the wiew created above to check if it's working fine:

In [19]:
# Create data schema and fact table
engine = create_engine(
          'postgresql+psycopg2://postgres:postgres@localhost:5432/postgres'
          )
test_query = """
                -- Select the block number from the time Ethereum had the highest price
                SELECT 
                    block_number 
                    
                FROM 
                    crypto.fact_analytical 

                WHERE 
                    high_price = (
                        SELECT 
                            max(high_price) 
                        FROM 
                            crypto.fact_analytical
                    );
                    """
with engine.connect() as con:
    results = pd.read_sql(test_query, con=con)  
    
results

Unnamed: 0,block_number
0,13585255


It can be seen that the block number was 13585255 when Ethereum had its highest price in the data. Thus, it's working fine!