# Data Cleaning (ETL)
**Input**: Raw data stored in MongoDB

**Output**: Clean data stored in PostgreSQL

The Data Cleaning step consists in multiple data transformations with the goal of making data "ready to talk" :)

Some of the transformations involved include:
- Convert from dictionary to tabular data structure
- Remove duplicates (intra-series and inter-series)
- Check and convert data types and formats
- Null handling
- Data interpolation (we have data collected with different frequencies and we want to uniform all data to have monthly frequency)

After the step is done, we want to save our transformed data in PostgreSQL, that serves as our Data Warehouse.

We evaluated different options to choose where to save our data:
- AWS Aurora Serverless / On-demand DB instance
- AWS RDS PostgreSQL On-demand DB instance
- AWS Redshift

Performance-wise redshift would probably be the better choice, it is a columnar database optimized for analytics able to do parallel processing.
We could also use Apache Spark for data transformation to improve performance.

In this project, as said before, we tried to keep things simple. So we choose PostgreSQL as our data destination and the pandas library as our ETL engine.

### FRED Series Selection
First thing we export all the ids, titles and notes of the series collected in MongoDB.

In [1]:
# move to root to simplify imports
%cd ..

C:\Users\marco\PycharmProjects\portfolio-optimization


In [2]:
from pymongo import MongoClient
import pandas as pd

connection_string = f"mongodb://localhost:27017"
client = MongoClient(connection_string)

database = client['portfolio']
collection = database['fred_series']
datasets = collection.find({})

df_l = []

for d in datasets:
        
    _id = d["_id"] if "_id" in d else None
    title = d["title"] if "title" in d else None
    notes = d["notes"] if "notes" in d else None
        
    if d["popularity"] >= 30:
        df_l.append({"id": _id, "title": title, "notes": notes})

df = pd.DataFrame(df_l, columns=["id", "title", "notes"])

print("# Series:", len(df))
df.head()

# Series: 1810


Unnamed: 0,id,title,notes
0,NROU,Noncyclical Rate of Unemployment,"Starting with the July, 2021 report: An Update..."
1,NROUST,Natural Rate of Unemployment (Short-Term) (DIS...,"This series last appeared in the February, 202..."
2,BABANAICSRETSAUS,Business Applications: Retail Trade in the Uni...,The core business applications series that cor...
3,BABATOTALSAUS,Business Applications: Total for All NAICS in ...,The core business applications series that cor...
4,EXPINF10YR,10-Year Expected Inflation,The Federal Reserve Bank of Cleveland estimate...


To shortlist the series available from the FRED we took the following decisions:
- Median over Mean
- Seasonally Adjusted over Non Seasonally Adjusted
- Series taken from OECD excluded
- Single state data excluded (we are interested only in federal USA)
- Monthly granularity over others
- Industry/sector detailed data excluded
- DISCONTINUED series excluded
- Countries other than USA excluded
- Series with data starting after 1995 excluded

We then save a list of the "good" series in a new document in MongoDB.

In [3]:
collection = database['fred_datasets']
datasets = collection.find({"_id":"shortlist_fred"}).next()["shortlist"]
print("# Series:", len(datasets))

# Series: 175


### OECD Series Selection
As we did for FRED, we export all the different combinations of indicator, measure, subject. In this case the dataset is only one (DP_LIVE), but contains many different combinations for each indicator. We want to shortlist those.

In [4]:
from configparser import ConfigParser
parser = ConfigParser()
_ = parser.read("credentials.cfg")
username = parser.get("mongo_db", "username")
password = parser.get("mongo_db", "password")

connection_string = f"mongodb+srv://{username}:{password}@cluster0.3dxfmjo.mongodb.net/?" \
                    f"retryWrites=true&w=majority"
client = MongoClient(connection_string)

database = client['portfolio']
oecd_collection = database['oecd_datasets']

dataset = oecd_collection.find({"_id":"DP_LIVE"}).next()

def mongo_to_dataframe(dataset):
    
    data = dataset['dataset']
    values = []
    obs = data['dataSets'][0]['observations']

    for e in obs:
        i = [int(x) for x in e.split(":")]
        i.append(obs[e][0])
        values.append(i)

    obs_cols = data['structure']['dimensions']['observation']
    replacement = {}
    cols = []
    for c in obs_cols:
        n = c["name"]
        replacement[n] = {i: x["name"] for i, x in enumerate(c['values'])}
        cols.append(n)

    df = pd.DataFrame(values, columns=cols + ["Value"])
    for c in cols:
        df[c] = df[c].replace(replacement[c])
    
    return df

df = mongo_to_dataframe(dataset)
df = df[["Indicator","Subject","Measure"]].drop_duplicates()

print("# Series:", len(df))
df.head()

# Series: 988


Unnamed: 0,Indicator,Subject,Measure
0,Agricultural support,Total support (TSE),% of GDP
36,Agricultural support,Producer support (PSE),% of gross farm receipts
72,Crop production,Rice,Tonnes/hectare
113,Crop production,Wheat,Tonnes/hectare
154,Gross domestic product (GDP),Total,Million US dollars


To shortlist the series available from the OECD we took the following decisions:
- General "Subject" over segmented "Subject" (ex. Total over Male/Female, Financial/Non-financial)
- Industry/sector detailed data excluded
- Series with data starting after 1995 excluded

We then save a list of the "good" series in a new document in MongoDB.

In [5]:
datasets = oecd_collection.find({"_id":"oecd_shortlist"}).next()["data"]
print("# Series:", len(datasets))

# Series: 106


### FRED + OECD Series union
At this point we "merge" the features coming from the FRED and the OECD and remove the duplicate ones.

When in doubt we keep the one that has more data or the one with the lowest granularity (eg. monthly over quarterly).

We save the new shortlisted list of series in 2 documents name fred_shortlist_v2 and oecd_shortlist_v2.

In [6]:
fred_collection = database['fred_datasets']
fred_shortlist = fred_collection.find({"_id":"shortlist_fred_v2"}).next()["shortlist"]
print("# Series FRED:", len(fred_shortlist))

oecd_shortlist = oecd_collection.find({"_id":"oecd_shortlist_v2"}).next()["data"]
print("# Series OECD:", len(oecd_shortlist))

# Series FRED: 142
# Series OECD: 67


## ETL
Now we are ready for the ETL process (Extract, Transform, Load)
- Extract (selected) data from MongoDB, that contains our raw data
- Transform data using pandas
- Load into postgreSQL

### FRED

In [7]:
import requests

fred_documents = fred_collection.find({"_id":{"$in": fred_shortlist}})
fred_api_key = parser.get("fred", "fred_api_key")

def retrieve_series_metadata(series_id, api_key):
    url = f"https://api.stlouisfed.org/fred/series?series_id={series_id}" \
            f"&api_key={api_key}&file_type=json"
    
    from portfolio_optimization.data_collection import request_with_retries
    
    r = request_with_retries(url)
    return r.json()['seriess'][0]

# We create a transformed Dataframe for each document and then concat them together
list_of_dfs = []

for d in fred_documents:

    # Retrieve Metadata (Unit of measure and frequency)
    try:
        metadata = retrieve_series_metadata(d["_id"], fred_api_key)
    except:
        continue
        
    units = metadata["units"]
    frequency = metadata["frequency"]
    
    # Build Dataframe
    tmp = []
    for o in d["observations"]:
        tmp.append({key: o[key] for key in ["date","value"]})
    df = pd.DataFrame(tmp, columns=["date","value"])
    
    # Convert Types
    df["value"] = pd.to_numeric(df["value"], errors="coerce")
    df["date"] = pd.to_datetime(df["date"])
                  
    # Adjust % values
    if units is not None and "Percent" in units:
        df["value"] /= 100
    
    # Interpolate values
    # Starting from weekly frequency we take the mean to get the monthly value
    # Starting from higher granularity we use ffill to "copy" the value in the missing months.
    # We use this method instead of linear interpolation because Quarterly and Annual data is 
    # already made up from the average of the monthly observations! This is a way to keep at 
    # least the same average before and after the transformation
    if "Weekly" in frequency:
        df = df.set_index(["date"]).resample('MS').mean().reset_index()
    if "Quarterly" in frequency or "Annual" in frequency:
        df = df.set_index(["date"]).resample('MS').ffill().reset_index()
    
    # Remove rows with any null present
    df = df.dropna()
    df["name"] = d["_id"]
                  
    list_of_dfs.append(df)

df = pd.concat(list_of_dfs)
df["source"] = "FRED"
print("# features", len(df["name"].unique()))
print("# rows:", len(df))
df.tail()

# features 123
# rows: 80441


Unnamed: 0,date,value,name,source
386,2021-09-01,0.321,WFRBST01134,FRED
387,2021-10-01,0.322,WFRBST01134,FRED
388,2021-11-01,0.322,WFRBST01134,FRED
389,2021-12-01,0.322,WFRBST01134,FRED
390,2022-01-01,0.319,WFRBST01134,FRED


### OECD

In [8]:
dataset = oecd_collection.find({'_id':"DP_LIVE"}).next()
df = mongo_to_dataframe(dataset)

# Transform date
# monthly data has format 'YYYY-MM-DD' and is already fine
# quarterly data has format 'YYYY-Q1' and must be transformed
# yearly data has format 'YYYY' and must be transformed
from portfolio_optimization.helper import oecd_time_to_datetime
df["Time"] = df.apply(lambda row: oecd_time_to_datetime(row, "Time"), axis=1)

# We create a transformed Dataframe for each combination of indicator,subject,measure
# and then concat them together
list_of_dfs = []

for s in oecd_shortlist:
    
    # Take only shortlisted combination of indicator,subject,measure
    temp_df = df[(df["Indicator"] == s["INDICATOR"]) & 
                 (df["Subject"] == s["SUBJECT"]) & 
                 (df["Measure"] == s["MEASURE"])]
    
    # Interpolate values
    # Each combination can have one or multiple different frequencies.
    # If we have the monthly frequency, we just take that
    # If we don't, we use the same strategy as for FRED, we use ffill to "copy" the value
    frequencies = temp_df["Frequency"].unique()
    if "Monthly" in frequencies:
        temp_df = temp_df[temp_df["Frequency"] == "Monthly"]
    elif "Quarterly" in frequencies:
        temp_df = temp_df[temp_df["Frequency"] == "Quarterly"]
        temp_df = temp_df.set_index(["Time"]).resample('MS').ffill().reset_index()
    elif "Annual" in frequencies:
        temp_df = temp_df[temp_df["Frequency"] == "Annual"]
        temp_df = temp_df.set_index(["Time"]).resample('MS').ffill().reset_index()
        
    list_of_dfs.append(temp_df)

df = pd.concat(list_of_dfs)
df = df.dropna()

# Adjust % values
def convert_percentage(row):
    for el in ["%", "percentage"]:
        if el in row["Measure"].lower():
            return row["Value"] / 100
    return row["Value"]

df["Value"] = df.apply(lambda row: convert_percentage(row), axis=1)

# Concat combination into a single column
def concat_column_values(row, columns):
    return ' | '.join(list(row[columns]))

df['name'] = df.apply(lambda row: concat_column_values(row, ["Indicator", "Subject", "Measure"]), 
                      axis=1)

# Drop useless columns
df = df.drop(["Indicator", "Subject", "Measure", "Country", "Frequency"], axis=1)
df = df.reset_index(drop=True)

df['source'] = 'OECD'
df = df.rename(columns={"Time": "date", "Value":"value"})
df = df[["date", "value", "name", "source"]]

print("# features", len(df["name"].unique()))
print("# rows:", len(df))
df.tail()

# features 67
# rows: 42125


Unnamed: 0,date,value,name,source
42120,2022-06-01,120.8771,Unit labour costs | By persons employed | 2015...,OECD
42121,2022-07-01,122.7363,Unit labour costs | By persons employed | 2015...,OECD
42122,2022-08-01,122.7363,Unit labour costs | By persons employed | 2015...,OECD
42123,2022-09-01,122.7363,Unit labour costs | By persons employed | 2015...,OECD
42124,2022-10-01,123.7721,Unit labour costs | By persons employed | 2015...,OECD


### YahooFinance

In [9]:
# Take data from MongoDB
yf_collection = database['yf_target_datasets']
yf_datasets = yf_collection.find({})

# Build Dataframe
list_of_df = []

for d in yf_datasets:
    tmp = []
    for dd in d["data"]:
        tmp.append({"date":dd["Date"], "value":dd["Close"], "name":dd["ticker"]})
    list_of_df.append(pd.DataFrame(tmp, columns=["date","value","name"]))

df = pd.concat(list_of_df)

# Convert Types
df["value"] = pd.to_numeric(df["value"], errors="coerce")
df["date"] = pd.to_datetime(df["date"],unit='ms')

# Drop nulls
df = df.dropna()

df["source"] = "yahoo_finance"
df = df[["date", "value", "name", "source"]]

# For the most recent month we get 2 values, 1 for the start of month, 1 for today's date.
# We just want the start of month
df = df.set_index(["date"]).groupby(["name","source"]).resample('MS')\
    .mean().reset_index()


print("# targets", len(df["name"].unique()))
print("# rows:", len(df))
df.tail()

# targets 11
# rows: 2878


Unnamed: 0,name,source,date,value
2873,^TNX,yahoo_finance,2022-12-01,3.879
2874,^TNX,yahoo_finance,2023-01-01,3.529
2875,^TNX,yahoo_finance,2023-02-01,3.916
2876,^TNX,yahoo_finance,2023-03-01,3.494
2877,^TNX,yahoo_finance,2023-04-01,3.413


### Investing.com

In [10]:
# Take data from MongoDB
investing_collection = database['investing_target_datasets']
investing_datasets = investing_collection.find({})

# Build Dataframe
list_of_df = []

for d in investing_datasets:
    
    name = d["_id"]
    tmp = []
    for dd in d["data"]:
        tmp.append({"date":dd["Date"], "value":dd["Price"], "name":name})

    list_of_df.append(pd.DataFrame(tmp, columns=["date","value","name"]))

df = pd.concat(list_of_df)

# Convert Types
df["value"] = df["value"].astype(str)
df["value"] = pd.to_numeric(df["value"].str.replace(",",""), errors="coerce")
df["date"] = pd.to_datetime(df["date"])

# Drop nulls
df = df.dropna()

df["source"] = "investing"
df = df[["date", "value", "name", "source"]]

# For the most recent month we get 2 values, 1 for the start of month, 1 for today's date.
# We just want the start of month
df = df.set_index(["date"]).groupby(["name","source"]).resample('MS')\
    .mean().reset_index()

print("# targets", len(df["name"].unique()))
print("# rows:", len(df))
df.tail()

# targets 3
# rows: 1516


Unnamed: 0,name,source,date,value
1511,WHEAT,investing,2022-11-01,795.5
1512,WHEAT,investing,2022-12-01,792.0
1513,WHEAT,investing,2023-01-01,761.25
1514,WHEAT,investing,2023-02-01,705.5
1515,WHEAT,investing,2023-03-01,686.4


### Storing Data in PostgreSQL
We save transformed data in PostgreSQL on AWS RDS, which we use as a Data Warehouse.

https://aws.amazon.com/rds/postgresql/

To store a pandas Dataframe we use psycopg2 library.

Below an example of how to connect to PostgreSQL and insert a Dataframe into a specific table.

In [11]:
DB_USER = parser.get("postgresql", "DB_USER")
DB_PASS = parser.get("postgresql", "DB_PASS")
DB_HOST = parser.get("postgresql", "DB_HOST")
DB_NAME = parser.get("postgresql", "DB_NAME")
DB_PORT = parser.get("postgresql", "DB_PORT")

import psycopg2
from psycopg2 import extras

def insert_df_into_table(df, tablename):
    # PostgreSQL does not like nan
    df = df.where(pd.notnull(df), None)

    tuples = [tuple(x) for x in df.to_numpy()]
    cols = ','.join(list(df.columns))

    query = "INSERT INTO %s(%s) VALUES %%s ON CONFLICT DO NOTHING;" % (tablename, cols)

    # This calls psycopg2.connect passing connection parameters
    conn = get_connection()
    cursor = conn.cursor()
    try:
        extras.execute_values(cursor, query, tuples)
        conn.commit()
    except (Exception, psycopg2.DatabaseError) as error:
        traceback.print_exc()
        eprint(f"Error: {error}")
        conn.rollback()
        cursor.close()
    finally:
        cursor.close()

At this point we have our transformed data stored in PostgreSQL and we can start analysing it.

Let's continue on the next step EDA (Exploratory data analysis).

[Go to EDA](eda.ipynb)