# Job Interview Assignment Submission
## by Craig Barbisan

**Problem Description:**
For every transaction ID, find the last price published prior or equal to the time of that transaction. The output should be in CSV format, where each row stores the transaction ID, the price of that transaction, along with time that the transaction was executed on.

**Additional Notes:**
* The published and purchase times of each currency price and transaction are unrelated. If a transaction is executed at time ‘x’, then the price of that transaction is published at time ‘x – k’, where k can be zero or
any other number of units of time. Don’t try to match the transaction time with an exact published time.
* Since we work with Google Cloud and Apache Beam to process large streaming data, implementing the solution in Apache Beam or Spark is preferred. However feel free to use anything else which you feel is
best.

**Data Files:**

2 data files, in .csv format, were provided as inputs for this assignment:

1.   prices-2019-02-13.csv - Contains a list of the prices of an unnamed currency throughout the course of the day on Feb 13, 2019. The file includes these fields in the following order:

>> CURRENCY_PRICE - The price of the currency<br/>
> PUBLISHED_TIME - The date/time at which CURRENCY_PRICE was first effective.

2.   transactions-2019-02-13.csv - Contains a list of currency purchase transactions, throughout the course of the day on Feb 13, 2019. The file includes these fields in the following order:

>> TRANSACTION_ID - A numeric identifier for a currency purchase transaction.<br/>
> TRANSACTION_TIME - The date/time at which the transaction occurred.

**Data Profiling**

The following is a list of findings from data profiling prior to solution design:

* TRANSACTION_ID is NOT a unique identifier in the transaction file.
* The combination of TRANSACTION_ID and TRANSACTION_TIME is unique. Thus we can lookup different prices for the same TRANSACTION_ID, depending on when the transaction occurred.
* PUBLISHED_TIME for prices IS unique, so we should never find more than 1 price to apply to a transaction.
* All timezones in both TRANSACTION_TIME and PUBLISHED_TIME are UTC.
* Both the transaction data and price data span multiple days (Feb 13 and Feb 14)
* For the entire range of PUBLISHED_TIME values, there is a price in the file for every second in that range.
* PUBLISHED_TIME and TRANSACTION_TIME are measured down to the nanosecond.




# Solution Design

##Approach

1. Bounded PCollections will be used for the input data, 1 each for prices and transactions.
2. Parse all elements of each PCollection into tuples of typed values.
3. Index each price tuple with its PUBLISHED_TIME value in format YYYYMMDDHHMISS. This implies that for any given second of the day, this collection will contain multiple price tuples.
4. Create a dictionary view of the indexed price collection so it can be used as a side-input. This provides the capability to lookup currency prices that were published within a particular second, and will significantly reduce the number of elements to be processed when searching for a price.
5. For each transaction, lookup all prices for the second when the transaction occurred, and the previous second. The previous second is required for the scenario where the transaction occurred in second x, prior to the first price published in second x. In this case, we pickup the last price published in second x-1.
6. Add the looked-up price to the transaction tuple.
7. Format the transaction tuples and output to a file.



##Pipeline Data Flow
![](https://s3pzxq.bn.files.1drv.com/y4m3LMTOQxIEcrNc0_GFZ_Zk9kMNx-1l-20tzh5TElTFl0250yMcAHpyB7x5uSxuFbGf6uWGxgXNBYIcfB5n5vb280vqGNTbxKbjIQky_1U_Vz9XmzeqVXqrqmAg77152YUTsajhnA99-KTm1fiSKbam6SYz7U899KMXdp5Zj4_-Qkmc7JpOcCt8Q1i-PpoNcHA7zT6IjLogiZIxib2H7jGhg?width=660&height=643&cropmode=none)

#Assumptions

1. Since all timestamps are captured in UTC, there is no requirement for translating date/time values across timezones.

2. Since TRANSACTION_ID is not a unique identifier, then multiple prices should be assigned to the same transaction ID (though only 1 price assigned per record).

3. There will be at least 1 price record in the price file, for every second within the range of date/time values in the transaction file. This assumption seemed safe, since in the test file, there was typically between 20-40 prices within any given second.

# Setup

This cell contains code for environment setup, which includes:
1. Installing required python packages.
2. Creating a folder to store data files.
3. Setting up file names.
4. Downloading input files.

In [0]:
# Run a shell command.
def run(cmd):
    print('>> {}'.format(cmd))
    !{cmd}
    print('')

# Install dependencies
run('pip -q install apache-beam[interactive]')
run('pip -q install attotime')

# Create local folder to contain data files
run('mkdir -p data')

# Data file names
txn_filename = './data/transactions-2019-02-13.csv'
price_filename = './data/prices-2019-02-13.csv'
output_filename = './data/out'

# Download the input files into the local file system.
import os
import urllib.request as rq
rq.urlretrieve('https://cbarbisan.netlify.com/prices-2019-02-13.csv', price_filename)
rq.urlretrieve('https://cbarbisan.netlify.com/transactions-2019-02-13.csv', txn_filename)

# Helper Functions and Classes

In [0]:
import apache_beam as beam
from attotime import attodatetime, attotimedelta

# For converting the data file representation of a date/time value
# to a datetime object which supports nanoseconds.
def to_datetime(d):
    return attodatetime(year=int(d[0:4]), \
        month=int(d[5:7]), \
        day=int(d[8:10]), \
        hour=int(d[11:13]), \
        minute=int(d[14:16]), \
        second=int(d[17:19]), \
        microsecond=int(d[20:26]), \
        nanosecond=int(d[26:29]) \
  )

# Splits the record into separate typed fields in a tuple
def parse_record(line):
    rec = line.split(',')
    dt = to_datetime(rec[1])
    return (float(rec[0]), dt)

# Creates an index value using a portion of the datetime
# argument formatted as YYYYMMDDHHMISS
def create_index(dt):
    return (dt.strftime('%Y%m%d%H%M%S'))

# Creates a tuple whose elements are:
#   1. index value
#   2. price record tuple
def index_price(p):
    return(create_index(p[1]), p)

# DoFn for a ParDo transform which gets applied to elements in the
# transaction PCollection.
# Uses a portion of the TRANSACTION_TIME value to lookup all price tuples
# which were published in the same second as the transaction, and the second
# prior. Searches through those tuples to find the last price published before
# the transaction and includes it in the transaction tuple.
class EnhanceTransaction(beam.DoFn):
    def process(self, element, lkp):
        txn_time = element[1]
        lkp_key = create_index(txn_time)
        lkp_prev_key = create_index(txn_time - attotimedelta(0,1))

        prev_list = lkp.get(lkp_prev_key)
        if (prev_list == None):
            listPrices = lkp.get(lkp_key)
        else:
            listPrices = prev_list + lkp.get(lkp_key)
    
        # Need a loop that finds the price that was published immediately
        # before txnTime
        # price is a tuple of price and published time
        published_price = 0
        prev_published_time = to_datetime('1970-01-01 00:00:00.000000000 UTC')
        for price in listPrices:
            published_time = price[1]
            if (txn_time >= published_time) \
                and (published_time > prev_published_time):
                    published_price = price[0]
            prev_published_time = published_time

        yield element + (published_price,)

# Formats the transaction tuple to match the output file requirements.
def format_output(e):
    return ','.join((str(int(e[0])), str(e[2]), e[1].strftime('%Y-%m-%d %H:%M:%S.') \
            + str(e[1].microsecond) + str(e[1].nanosecond) + ' UTC'))

# Data Pipeline



In [0]:
# Running locally in the DirectRunner.
with beam.Pipeline() as pipeline:

    pcPrice = (
        pipeline | 'Read price records' >> beam.io.ReadFromText(price_filename, skip_header_lines=1)
        | 'Parse price records' >> beam.Map(parse_record)
        | 'Index price' >> beam.Map(index_price)
        | 'Group by key' >> beam.GroupByKey()
    )

    vPrice = beam.pvalue.AsDict(pcPrice)

    pcTxn = (
        pipeline | 'Read txn records' >> beam.io.ReadFromText(txn_filename, skip_header_lines=1, strip_trailing_newlines=True)
        | 'Parse txn records' >> beam.Map(parse_record)
        | 'Enhance transaction' >> beam.ParDo(EnhanceTransaction(), vPrice)
        | 'Format output' >> beam.Map(format_output)
        | beam.io.WriteToText(output_filename, file_name_suffix='.csv', \
               header='TRANSACTION_ID,CURRENCY_PRICE,TRANSACTION_TIME')
    )