# Environment Setup
Set up an environment by installing and importing necessary libraries.  Find Spark helps add PySpark into the sys.path at runtime.

In [188]:
!pip install pandas
!pip install findspark



In [1]:
# solution
import findspark
findspark.init()

import pyspark
from pyspark.sql import *
from pyspark.sql.types import *
import pyspark.sql.functions as sf

import pandas as pd

# Data Vitualization
Skip this if you are familiar with the data in this test.

Just use pandas to vitualize the data before transforming it via pyspark.

In [2]:
# comment the line below after running it
!unzip -a data/senior_data_eng_code_challenge.zip -d data

Archive:  data/senior_data_eng_code_challenge.zip
replace data/mcc_data.csv? [y]es, [n]o, [A]ll, [N]one, [r]ename: ^C


In [2]:
# set up paths for data
data_path = 'data'
data_mcc = pd.read_csv(data_path + '/mcc_data.csv') 
data_tx = pd.read_csv(data_path + '/acme_december_2018.csv') 

In [3]:
# vitualize merchant category code data
data_mcc.head()

Unnamed: 0,id,code
0,2284,MCC000
1,2285,MCC001
2,2288,MCC002
3,2290,MCC003
4,2297,MCC004


In [4]:
# vitualize transaction data
data_tx.head()

Unnamed: 0,date,merchant_category_id,payment_method,backend_name,card_brand,card_type,card_country_issuer_code,amount
0,2018-12-06,2083.0,payment01,b02,brand01,type02,A029,269.460308
1,2018-12-06,1020.0,payment01,b02,brand03,type01,A029,24.507962
2,2018-12-12,2083.0,payment01,b02,brand03,type02,A110,329.700464
3,2018-12-22,2083.0,payment01,b02,brand03,type02,A029,9705.023678
4,2018-12-19,2167.0,payment03,b04,,,,463.348967


In [5]:
print('Transaction Count: {}'.format(len(data_tx)))
print('Merchant Category Count: {}'.format(len(data_mcc)))

Transaction Count: 1117112
Merchant Category Count: 261


# Implementation via PySpark

In [6]:
# create spark context and get spark SQL context
sc = pyspark.SparkContext(appName='Acme Accounting Monthly Report Pipeline')
sql = SQLContext(sc)

# import transaction and merchant category code into dataframes
df_acme_tx = sql.read.csv(data_path + '/acme_december_2018.csv', inferSchema = True, header = True)
df_mcc = sql.read.csv(data_path + '/mcc_data.csv', inferSchema = True, header = True)

### Lookup Data Preparation

In [7]:
# create transaction type lookup
list_card_type = ['type01', 'type01', 'type01', 'type01', 'type01', 'type01', 'type01', 'type01', 
                 'type02', 'type02', 'type02', 'type02', 'type02', 'type02', 'type02', 'type02']

list_country = ['Local', 'Local', 'Local', 'Local', 'Inter', 'Inter', 'Inter', 'Inter', 
                 'Local', 'Local', 'Local', 'Local', 'Inter', 'Inter', 'Inter', 'Inter']

list_card_band = ['brand01', 'brand02', 'brand03', 'other', 'brand01', 'brand02', 'brand03', 'other', 
                 'brand01', 'brand02', 'brand03', 'other', 'brand01', 'brand02', 'brand03', 'other']

list_transaction_type_code = ['010001', '010002', '010003', '010004', '010011', '010012', '010013', '010014', 
                 '010021', '010022', '010023', '010024', '010031', '010032', '010033', '010034']

tx_type_lookup_field = [StructField('card_type_lkp', StringType(), False),
                 StructField('country_lkp', StringType(), False),
                 StructField('card_brand_lkp', StringType(), False),
                 StructField('transaction_type_code_lkp', StringType(), False)]

tx_type_lookup_schema = StructType(tx_type_lookup_field)

df_tx_type_lookup = sql.createDataFrame(zip(list_card_type, list_country, list_card_band, list_transaction_type_code),
                           schema=tx_type_lookup_schema)

# create service system type lookup
service_system_type_mapping = {'payment01':'CPF',
                               'payment02':'OTH',
                               'payment03':'OTH',
                               'payment04':'OTH'}

# create merchant business type lookup
merchant_business_type_mapping = {'brand01':'30101',
                               'brand02':'30102',
                               'brand03':'30103',
                               'brand04':'30104'}

### Define Necessary Functions for Column Generation and Transformation

In [8]:
# dataframe transformation functions
# get fi_code column
def with_fi_code(df):
    return df.withColumn('fi_code', sf.lit('42'))

# get date column
def with_date(df):
    return df.withColumn('date', sf.from_unixtime(sf.unix_timestamp(sf.last_day(df.date)), 'yyyy-MM-dd'))

# get service system type column
def with_service_system_type(df):   
    udf_get_service_system_type = sf.udf(lambda payment_method: service_system_type_mapping.get(payment_method, ''),
                                         StringType())
    
    return df.withColumn('service_system_type', udf_get_service_system_type(df['payment_method']))

# get merchant business type column
def with_merchant_business_type(df):
    udf_get_merchant_business_type = sf.udf(lambda service_system_type, card_brand: 
                                            merchant_business_type_mapping.get(card_brand, '') 
                                            if service_system_type == 'CPF'
                                            else '',
                                            StringType())
    
    return df.withColumn('merchant_business_type', udf_get_merchant_business_type(df['service_system_type'],
                                                                                 df['card_brand']))

# get merchant category code column
def with_merchant_category_code(df, df_lookup):
    df = df.alias('df')
    df_lookup = df_lookup.alias('df_lookup')
    df = df.join(df_lookup, df.merchant_category_id == df_lookup.id, how='left')
    
    udf_get_merchant_category_code_for_cpf = sf.udf(lambda service_system_type, code: '9999'
                                                    if service_system_type == 'CPF' and code == None
                                                    else code,
                                                    StringType())
    
    df = df.withColumn('merchant_category_code', udf_get_merchant_category_code_for_cpf(df['service_system_type'],
                                                                                     df['code']))
    return df.drop('code').drop('id')

# get transaction type column
def with_transaction_type(df, df_lookup):
    
    udf_get_country_code = sf.udf(lambda card_country_issuer_code: 'Local' 
                                  if card_country_issuer_code == 'A029'
                                  else 'Inter',
                                  StringType())
    
    df = df.withColumn('card_country', udf_get_country_code(df['card_country_issuer_code']))
    df = df.join(df_lookup, (df.card_type == df_lookup.card_type_lkp)
                            &(df.card_brand == df_lookup.card_brand_lkp)
                            &(df.card_country == df_lookup.country_lkp),
                 how='left')
    
    udf_replacing_null = sf.udf(lambda transaction_type_code: '099999' 
                                if transaction_type_code == None 
                                else transaction_type_code,
                                StringType())
    
    df = df.withColumn('transaction_type', udf_replacing_null(df['transaction_type_code_lkp']))
    
    return df.drop('card_type_lkp').drop('card_brand_lkp').drop('country_lkp').drop('transaction_type_code_lkp')

# get sum amount and transaction count columns
def with_sum_amount_n_transaction_count(df):
    groupby_list = ['date', 'service_system_type', 'transaction_type',
                'merchant_business_type', 'merchant_category_code']
    
    return df.groupBy(groupby_list).agg(sf.round(sf.sum('amount'), 2).alias('amount'),
                                                sf.count('amount').alias('number'),
                                                sf.round(sf.avg('amount'), 2).alias('average_amount'))

# get terminal average amount range column
def get_terminal_average_amount_range(average_amount):
    if average_amount <= 500:
        return '94560000001'
    elif average_amount <= 1000:
        return '94560000002'
    elif average_amount <= 2000:
        return '94560000003'
    elif average_amount <= 5000:
        return '94560000004'
    elif average_amount <= 10000:
        return '94560000005'
    elif average_amount <= 30000:
        return '94560000006'
    else:
        return '94560000007'
    return 

udf_get_terminal_average_amount_range = sf.udf(get_terminal_average_amount_range, StringType())

def with_terminal_average_amount_range(df):
    
    return df.withColumn('terminal_average_amount_range', 
                         udf_get_terminal_average_amount_range(df['average_amount']))
    

### Calling the Prepared Functions to Get Formatted Report using Dataframe

In [29]:
# generate column by column via function calls
df_report = with_date(df_acme_tx)
df_report = with_service_system_type(df_report)
df_report = with_merchant_business_type(df_report)
df_report = with_merchant_category_code(df_report, df_mcc)
df_report = with_transaction_type(df_report, df_tx_type_lookup)

# drop unnecessary columns before aggregation
df_report = df_report.drop('merchant_category_id')
df_report = df_report.drop('payment_method')
df_report = df_report.drop('backend_name')
df_report = df_report.drop('card_brand')
df_report = df_report.drop('card_type')
df_report = df_report.drop('card_country_issuer_code')
df_report = df_report.drop('card_country')

# aggregation for amount and number columns
df_report = with_sum_amount_n_transaction_count(df_report)

# generate fi_code column
df_report = with_fi_code(df_report)

# generate terminal average amount range column and drop helper column
df_report = with_terminal_average_amount_range(df_report)
df_report = df_report.drop('average_amount')

# column ordering
column_order_list = ['fi_code','date', 'service_system_type', 'transaction_type', 'merchant_business_type',
                             'merchant_category_code', 'amount', 'number', 'terminal_average_amount_range']

df_report = df_report.select(column_order_list)

In [44]:
# def write_csv_file(collection):
#     for row in collection:
#         data = [str(rec) for rec in row]
#         write_line = '|'.join(data)
#         f = open("demofile.txt", "a")
#         f.write(write_line + '\r\n')
#     return 'Y'

# write_csv_file(df_report.collect())

# udf_write_csv_file = sf.udf(write_csv_file, StringType())
# df_dummy = df_report.withColumn('is_written',
#                                 udf_write_csv_file(sf.array(column_order_list)))

# df_dummy.collect()

'Y'

### Write the Formatted Dataframe to CSV

In [12]:
# convert spark dataframe into pandas dataframe
pandas_df_report = df_report.toPandas()

In [13]:
# write csv file based on date column
for u in pandas_df_report['date'].unique():
    file_name = 'ACME_{0}.csv'.format(u.replace('-', '')) 
    pandas_df_report[pandas_df_report['date'] == u].to_csv(file_name, sep='|', line_terminator='\r\n', index=False)

In [14]:
# see result
!file ACME_20181231.csv
!cat ACME_20181231.csv 

ACME_20181231.csv: ASCII text, with CRLF line terminators
fi_code|date|service_system_type|transaction_type|merchant_business_type|merchant_category_code|amount|number|terminal_average_amount_range
42|2018-12-31|CPF|010013|30103|MCC065|2264.53|3|94560000002
42|2018-12-31|CPF|010001|30101|MCC193|571.58|5|94560000001
42|2018-12-31|CPF|099999|30103|MCC044|630.74|2|94560000001
42|2018-12-31|CPF|010001|30101|MCC031|50678.63|237|94560000001
42|2018-12-31|CPF|010001|30101|MCC023|253506.72|186|94560000003
42|2018-12-31|CPF|010023|30103|MCC236|84912.2|6|94560000006
42|2018-12-31|CPF|099999|30103|MCC184|408.0|3|94560000001
42|2018-12-31|CPF|010032|30102|MCC090|133464.6|73|94560000003
42|2018-12-31|CPF|010031|30101|MCC125|422198.0|89|94560000004
42|2018-12-31|CPF|010021|30101|MCC259|121795.92|8|94560000006
42|2018-12-31|CPF|010032|30102|9999|1578985.27|2500|94560000002
42|2018-12-31|CPF|010001|30101|MCC041|1706.64|2|94560000002
42|2018-12-31|CPF|010021|30101|MCC033|6791.76|27|94560000001
42|2018-

In [11]:
print('Reporting Records: {}'.format(df_report.count()))

Reporting Records: 788


In [30]:
sc.stop()