In [None]:
#
# imports:
# forex_python is included in the Pipfile
# run 'pipenv install' then 'pipenv shell' and finally 'jupyter lab'
#
import logging
from forex_python.converter import CurrencyRates
import csv
from datetime import datetime
from datetime import strptime
import decimal
from pprint import pprint

# setup logging
log = logging.getLogger()
handler = logging.StreamHandler()
formatter = logging.Formatter('%(asctime)s %(name)-12s %(levelname)-8s %(message)s')
handler.setFormatter(formatter)
log.addHandler(handler)
log.setLevel(logging.INFO)

ImportError: cannot import name 'strptime' from 'datetime' (/usr/local/Cellar/python@3.10/3.10.7/Frameworks/Python.framework/Versions/3.10/lib/python3.10/datetime.py)

In [None]:
#
# loading the csv file into a list of dicts
#
def load_transactions_csv(filename):
    with open(filename,'r') as csvfile:
        reader = csv.DictReader(csvfile)
        transactions = list(reader)
        log.info(f"loaded {len(transactions)} transaction records from {filename}") 
        return transactions

In [None]:
def save_transactions_csv(filename):
    with open(filename, 'w', newline='') as csvfile:
        # we're going to add some fields to this CSV as we go, let's account for them in here
        fieldnames = ['id', 'customer_id', 'customer_country_code', 'processed_at', 'currency', 'amount', 'transaction_date', 'daily_exchange_rate', 'calculated_value_in_usd', 'inflation_adjust_value_in_usd']
        writer = csv.DictWriter(csvfile, fieldnames=fieldnames, delimiter=',')
        writer.writeheader()
        for transaction in transactions:
           writer.writerow(transaction)

In [None]:
transactions = load_transactions_csv('transactions_amended.csv')

In [None]:
#
# take a look at a snippet of what we have
#
for transaction in transactions[0:2]:
    print(transaction)

In [None]:
#
# Call this method to check on our progress getting through the ordered list of transactions
#
def rows_remaining(key_to_check):
    count = 0
    for transaction in transactions:
        if transaction[key_to_check] == '':
            count += 1
    log.info(f"Transactions missing a {key_to_check}: {count} out of {len(transactions)} total")

In [None]:
#
# the currency column looks a little choppy. let's see what we're dealing with
#
def list_currencies():
    currencies = []
    for transaction in transactions:
        if transaction['currency'] not in currencies:
            currencies.append(transaction['currency'])

    print(currencies)
    
list_currencies()

In [None]:
#
# normalize the choppy currency column so that it uses the forex-python standard currency codes
# we also have the 'customer_country' column to go off of, but we can't assume customers in Britain
# always pay in pounds, and the same goes for customers in germany always paying with euros 
#
def convert_currency_code(code):
    code = code.lower()
    normalized_code = ""
    if code in ["€", "eur", "euro",]:
        normalized_code = "EUR"
    elif code in ["$", "usd", "us dollars", "dollars",]:
        normalized_code = "USD"
    elif code in ["£", "gbp", "british pound", "pounds",]:
        normalized_code = "GBP"
    elif code in ["¥", "jpy", "japanese yen", "yen",]:
        normalized_code = "JPY"
    # add more currency codes as needed
    else:
        log.warning(f"Currency '{code}' not recognized")
        normalized_currency_code = code # return something if code is not recognized
        
    return normalized_code

In [None]:
#
# for those transaction where the amount is 0.0, '', or None, we'll remove the transaction from our list
#
def zero_value_transaction(amount):
    if amount == '0.0' or amount == '' or amount == None:
        return True
    else:
        return False

In [None]:
#
# separate date and time. these look pretty standard, so let's just try/catch in case a future CSV has
# some rough data in this column. we wouldn't want to propagate that data to the next steps of the data pipeline
#
def date_from_timestamp(timestamp):
    try:
        dt = datetime.strptime(timestamp, '%Y-%m-%d %H:%M:%S %Z')
        iso_date = dt.date().isoformat()
        iso_time = dt.time().strftime('%H:%M:%S')
    except:
        print("Error converting ", timestamp)
        raise SystemExit()
    else:
        return iso_date

In [None]:
def calculate_value_in_usd(exchange_rate, amount):
    try:
        value_in_usd = float(exchange_rate) * float(amount)
    except:
        log.error(f"Error converting {exchange_rate} or {amount} to float")
    return str(value_in_usd)

In [None]:
#
# do some preprocessing:
#   - change currency codes to conform to API standards (EUR, GBP, USD)
#   - remove zero-value (0.0, '') transactions
#   - add a date field with only the day, since we don't need the time for this forex conversion
#
#for transaction in transactions:
#    transaction['currency'] = convert_currency_code(transaction['currency'])
#    if zero_value_transaction(transaction['amount']):
#        transactions.remove(transaction)
#    transaction.update({"transaction_date": date_from_timestamp(transaction['processed_at'])})        

In [None]:
#
# checkpoint
#
save_transactions_csv('transactions_amended.csv')
transactions = load_transactions_csv('transactions_amended.csv')

null_transactions = 0
for transaction in transactions:
    if transaction['amount'] == '0.0' or transaction['amount'] == '':
        null_transactions += 1

log.info(f"There are currently {null_transactions} transactions with an invalid amount field in the list")
log.info(f"The currency codes listed in the transactions are {list_currencies()}")


In [None]:
save_transactions_csv('transactions_amended.csv')

In [47]:
#
# tabulate inflation adjusted USD value for each transaction
#   - add a field to each transaction - 'daily_exchange_rate' containing the historical exchange rate on that day
#   - add a field to each transaction - 'calculated_value_in_usd' based on the exchange rate on the day
#   - add a field to each transaction - 'inflation_adjust_value_in_usd' adjusted for inflation
#
# we'll store the exchange rates we lookup in the file 'transactions_amended.csv'
#
forex_converter = CurrencyRates()

for i, transaction in enumerate(transactions):
    #
    # do some preprocessing:
    #   - change currency codes to conform to API standards (EUR, GBP, USD)
    #   - remove zero-value (0.0, '') transactions
    #   - add a date field with only the day, since we don't need the time for this forex conversion
    #   - sort the list of transactions by 'transaction_date' and 'currency'
    
    #
    transaction['currency'] = convert_currency_code(transaction['currency'])
    if zero_value_transaction(transaction['amount']):
        log.info(f"Removing transaction {transaction['id']} because it has a zero value")
        transactions.remove(transaction)
        break
    transaction.update({"transaction_date": date_from_timestamp(transaction['processed_at'])})        
    transactions = sorted(transactions, key=lambda x: x['transaction_date'], x['currency'])

    # do some fetching and calculations
    #   - fetch the exchange rate on the day of each transaction
    #   - calculate usd value of transactions
    #   - fetch CPI for the day of each transaction
    #   - apply the CPI factor to the USD amount of the transaction to get the inflation-adjusted amount
   
    # fill in the daily_exchange_rate field for each transaction
    if not transaction['daily_exchange_rate']:
        if transaction['currency'] == 'USD':       # no currency conversion to do in this case
            transaction.update({"daily_exchange_rate": 1.0})
        else:
            try:                    # fail we may
                log.info(f"Fetching exchange rate for {transaction['currency']} to USD on {transaction['transaction_date']}")
                transaction.update({"daily_exchange_rate": forex_converter.get_rate(transaction['currency'], 'USD', 
                                                           datetime.strptime(transaction['transaction_date'], '%Y-%m-%d').date())})

                # optimistically loop through the remaining transactions looking for the exact date+currency combo
                for lookahead in transactions[i+1:]:
                    if (lookahead['transaction_date'] == transaction['transaction_date'] and 
                        lookahead['currency'] == transaction['currency']):
                            lookahead.update({"daily_exchange_rate": transactions['daily_exchange_rate']})
                            log.info(f"Setting exchange rate for transaction {transaction['id']} based on previous fetched value")
                    else:  # the transaction list is sorted, so if we're looking at another date, we need to fetch it exchange rate
                        break
            except:                 # save we must 
                log.error(f"Error attempting to calculate USD value for {transaction}")
                save_transactions_csv('transactions_amended.csv')
    
    # if we have an exchange rate and a valid amount, calculate value in usd
    if not transaction['calculated_value_in_usd']:
        if transaction['daily_exchange_rate'] and transaction['amount']:
            log.info(f"Setting calculated_value_in_usd for transaction {transaction['id']}")
            transaction['calculated_value_in_usd'] = calculate_value_in_usd(
                transaction['daily_exchange_rate'], transaction['amount'])
        
    

2023-03-03 19:30:03,496 root         INFO     Fetching exchange rate for EUR to USD on 2021-11-05
2023-03-03 19:30:04,396 root         ERROR    Error attempting to calculate USD value for {'id': '4314494369952', 'customer_id': '5778687852704', 'customer_country_code': 'DE', 'processed_at': '2021-11-05 07:08:29 UTC', 'currency': 'EUR', 'amount': '24.41', 'transaction_date': '2021-11-05', 'daily_exchange_rate': '', 'calculated_value_in_usd': '', 'inflation_adjust_value_in_usd': ''}
2023-03-03 19:30:21,260 root         INFO     Fetching exchange rate for GBP to USD on 2021-09-12
2023-03-03 19:30:22,072 root         ERROR    Error attempting to calculate USD value for {'id': '4184418091168', 'customer_id': '5582348714144', 'customer_country_code': 'GB', 'processed_at': '2021-09-12 16:26:37 UTC', 'currency': 'GBP', 'amount': '19.92', 'transaction_date': '2021-09-12', 'daily_exchange_rate': '', 'calculated_value_in_usd': '', 'inflation_adjust_value_in_usd': ''}
2023-03-03 19:30:23,383 root  

In [48]:
save_transactions_csv('transactions_amended.csv')

In [49]:
rows_remaining('daily_exchange_rate')
rows_remaining('calculated_value_in_usd')

2023-03-03 19:31:36,373 root         INFO     Transactions missing a daily_exchange_rate: 22 out of 55135 total
2023-03-03 19:31:36,396 root         INFO     Transactions missing a calculated_value_in_usd: 22 out of 55135 total


In [50]:
len(transactions)

55135

In [51]:
print(transactions[2])

{'id': '780128321595', 'customer_id': '1137657708603', 'customer_country_code': 'DE', 'processed_at': '2018-12-11 20:55:56 UTC', 'currency': 'EUR', 'amount': '23.55', 'transaction_date': '2018-12-11', 'daily_exchange_rate': '1.1379', 'calculated_value_in_usd': '26.797545', 'inflation_adjust_value_in_usd': ''}
