## Load Libs

In [2]:
import pandas as pd
import os
import csv
import database_helper as dh

## Define Data Source

Make sure you save the following raw files under the same folder
1. Employer master.csv (download by provider)
2. Payment transactions.csv (download from provider)
3. pythonsqlite.db (manually created)

In [4]:
# if not exist create a db file
if not os.path.exists('../data/pythonsqlite.db'):
    with open('../data/pythonsqlite.db', 'w'): pass

In [5]:
employer = "../data/Employer master.csv"
payment = "../data/Payment transactions.csv"
database = r'../data/pythonsqlite.db'

In [6]:
# keep raw data without any processing, easy for future data validation
raw_employer = pd.read_csv(employer)
raw_payment = pd.read_csv(payment)

## Database Solution

This solution is to save the datasets to sqlite3 database then use SQL language to get aggregation report

### Data Check

 This activity helps us to design the table structure for all data sources

In [7]:
# copy raw data to do data check
employer_df = raw_employer.copy()
payment_df = raw_payment.copy()

In [8]:
# show employer first 5 raws
employer_df.head()

Unnamed: 0,Employer_No,Effective_From,Effective_To,Status,Tier
0,100269,2002-06-29,2016-05-23,Open,3
1,100269,2016-05-24,2018-11-02,Open,3
2,100269,2018-11-03,2018-11-04,Open,3
3,100269,2018-11-05,2018-11-13,Open,3
4,100269,2018-11-14,2018-11-21,Open,3


In [9]:
employer_df.columns

Index(['Employer_No', 'Effective_From', 'Effective_To', 'Status', 'Tier'], dtype='object')

In [10]:
employer_df.dtypes

Employer_No        int64
Effective_From    object
Effective_To      object
Status            object
Tier               int64
dtype: object

In [11]:
# show employer first 5 raws
payment_df.head()

Unnamed: 0,Employer_No,Cash_Received_Date,Total_Amt
0,100951,6/08/2018,2000.0
1,100029,26/09/2018,1539.47
2,100807,15/10/2018,430.0
3,100130,3/01/2019,942.05
4,100147,6/11/2018,525.7


In [12]:
payment_df.columns

Index(['Employer_No', 'Cash_Received_Date', 'Total_Amt'], dtype='object')

In [13]:
payment_df.dtypes

Employer_No             int64
Cash_Received_Date     object
Total_Amt             float64
dtype: object

### Dtata Processing

Based on the analysis, we need make sure the data format is consistent aross all data sources. Because SQLlite could not do data comparsion, need do data string comparsion. 

In [14]:
# copy raw data to do data processing
payment_df = raw_payment.copy()

In [15]:
payment_df['Cash_Received_Date'] = pd.to_datetime(payment_df['Cash_Received_Date'], format='%d/%m/%Y')

In [18]:
payment_df.head()

Unnamed: 0,Employer_No,Cash_Received_Date,Total_Amt
0,100951,2018-08-06,2000.0
1,100029,2018-09-26,1539.47
2,100807,2018-10-15,430.0
3,100130,2019-01-03,942.05
4,100147,2018-11-06,525.7


In [19]:
payment_df.to_csv('../data/Payment transactions Processed.csv', index = False)

In [20]:
payment_processed = '../data/Payment transactions Processed.csv'

### Setup Database

This will use the helper functions defined in the database_helper module

In [21]:
# employee table
sql_create_employer_table = """
CREATE TABLE IF NOT EXISTS employer (
    Employer_No integer,
    Effective_From text,
    Effective_To text,
    Status text,
    Tier integer
);
"""
# payment transaction table
sql_create_payment_table = """
CREATE TABLE IF NOT EXISTS payment (
    Employer_No integer,
    Cash_Received_Date text,
    Total_Amt Numeric
);
"""

In [22]:
# create a database connection
conn = dh.create_connection(database)

dh.delete_all_employers(conn)
dh.delete_all_payments(conn)

# create employer rows
employer_file = open(employer)
employer_reader = csv.reader(employer_file)
next(employer_reader, None) # drop header
employer_rows = ((int(row[0]),row[1],row[2],row[3],int(row[4])) for row in employer_reader)

payment_file = open(payment_processed)
payment_reader = csv.reader(payment_file)
next(payment_reader, None) # drop header
payment_rows =  ((int(row[0]),row[1],float(row[2])) for row in payment_reader)
    
if conn is not None:
    # create tables
    # create employer table
    dh.create_table(conn, sql_create_employer_table)
    print("Success! Employer table has been created.")

    # create payment table
    dh.create_table(conn, sql_create_payment_table)
    print("Success! Payment table has been created.")
    
    # insert employers
    dh.insert_employer(conn,employer_rows)
    print("Success! Employer rows has been inserted.")
    
    # insert payment transactions
    dh.insert_payment(conn, payment_rows)
    print("Success! Payment table has been inserted.")
    
    conn.close()
else:
    print("Error! cannot create the database connection.")


Success! Employer table has been created.
Success! Payment table has been created.
Success! Employer rows has been inserted.
Success! Payment table has been inserted.


### Table Check

In [23]:
# Check total employer rows in raw data
len(employer_df)

5786

In [24]:
conn = dh.create_connection(database)
cur = conn.cursor()
cur.execute("SELECT COUNT(*) FROM employer")
row = cur.fetchall()
print(row[0][0]) 

5786


In [25]:
# Check total payment transaction rows in raw data
len(payment_df)

14853

In [26]:
conn = dh.create_connection(database)
cur = conn.cursor()
cur.execute("SELECT COUNT(*) FROM payment")
row = cur.fetchall()
print(row[0][0])

14853


## Generate report

The report takes the form of a table, with the following columns:
1. Tier – the employer tier (i.e. segmentation category) of this row
2. Month end date – the end date for the month of this row
3. Num payments – the total number of superannuation contributions paid for this month/tier
4. Amount of payments – the total amount of superannuation contributions paid for this month/tier
5. New employers – newly open employer accounts introduced within this month at this tier
6. Open emloyers at EOM (end of month) – number of employer accounts open at the month end date at this tier

In [27]:
sql_monthly_payments = """
WITH RECURSIVE
  cnt(x) AS (
     SELECT 0
     UNION ALL
     SELECT x+1 FROM cnt
      LIMIT (SELECT ROUND(((julianday('2018-12-01') - julianday('2018-01-01'))/30) + 1))
  ), 
  
MonthTable AS(
SELECT
1 as flag,
month_start, 
DATE(month_start,'start of month','+1 month','-1 day') as month_end
FROM (SELECT DATE(julianday('2018-01-01'), '+' || x || ' month') as month_start FROM cnt)), 

Tier AS (
SELECT DISTINCT 
1 as flag,
tier
FROM employer
ORDER BY Tier
), 

EmployerPayment AS (
SELECT
P.*,
E.Tier, 
MT.month_end
FROM payment P 
INNER JOIN Employer E ON P.Employer_No = E.Employer_No AND replace(P.Cash_Received_Date,'-','') BETWEEN replace(E.Effective_From,'-','') AND replace(E.Effective_To,'-','') 
INNER JOIN MonthTable MT ON replace(P.Cash_Received_Date,'-','') BETWEEN replace(MT.month_start,'-','') AND replace(MT.month_end,'-','')
)

SELECT 
T.tier,
MT.month_end, 
COUNT(EP.Total_Amt) AS num_payments,
ROUND(SUM(EP.Total_Amt),2) AS amount_of_payments
FROM Tier T
INNER JOIN MonthTable MT ON T.flag = MT.flag
LEFT JOIN EmployerPayment EP ON EP.tier = T.tier AND EP.month_end = MT.month_end
GROUP BY 
T.tier,
MT.month_end

ORDER BY 
T.tier,
MT.month_end

"""

In [36]:
sql_monthly_new_employers = """
WITH RECURSIVE
  cnt(x) AS (
     SELECT 0
     UNION ALL
     SELECT x+1 FROM cnt
      LIMIT (SELECT ROUND(((julianday('2018-12-01') - julianday('2018-01-01'))/30) + 1))
  ), 
  
MonthTable AS(
SELECT
1 as flag,
month_start, 
DATE(month_start,'start of month','+1 month','-1 day') as month_end
FROM (SELECT DATE(julianday('2018-01-01'), '+' || x || ' month') as month_start FROM cnt)), 

Tier AS (
SELECT DISTINCT 
1 as flag,
tier
FROM employer
ORDER BY Tier
), 

NewEmployer AS (
SELECT
E.*, 
MT.month_end
FROM Employer E
INNER JOIN MonthTable MT ON replace(E.Effective_From,'-','') BETWEEN replace(MT.month_start,'-','') AND replace(MT.month_end,'-','') AND E.Status = 'Open'
)

SELECT 
T.tier,
MT.month_end, 
COUNT(DISTINCT NE.Employer_No) AS new_employers
FROM Tier T
INNER JOIN MonthTable MT ON T.flag = MT.flag
LEFT JOIN NewEmployer NE ON NE.tier = T.tier AND NE.month_end = MT.month_end

GROUP BY 
T.tier,
MT.month_end

ORDER BY 
T.tier,
MT.month_end
"""

In [37]:
sql_monthly_open_employers = """
WITH RECURSIVE
  cnt(x) AS (
     SELECT 0
     UNION ALL
     SELECT x+1 FROM cnt
      LIMIT (SELECT ROUND(((julianday('2018-12-01') - julianday('2018-01-01'))/30) + 1))
  ), 
  
MonthTable AS(
SELECT
1 as flag,
month_start, 
DATE(month_start,'start of month','+1 month','-1 day') as month_end
FROM (SELECT DATE(julianday('2018-01-01'), '+' || x || ' month') as month_start FROM cnt)), 

Tier AS (
SELECT DISTINCT 
1 as flag,
tier
FROM employer
ORDER BY Tier
), 

OpenEmployerEOM AS (
SELECT
E.*, 
MT.month_end
FROM Employer E
INNER JOIN MonthTable MT ON replace(E.Effective_FROM,'-','') <= replace(MT.month_end,'-','') AND replace(E.Effective_To,'-','') >= replace(MT.month_end,'-','') AND E.Status = 'Open'
)


SELECT 
T.tier,
MT.month_end, 
COUNT(DISTINCT OE.Employer_No) AS open_employers_eom

FROM Tier T
INNER JOIN MonthTable MT ON T.flag = MT.flag
LEFT JOIN OpenEmployerEOM OE ON OE.tier = T.tier AND OE.month_end = MT.month_end
GROUP BY 
T.tier,
MT.month_end

ORDER BY 
T.tier,
MT.month_end
"""

### Result Check

In [38]:
conn = dh.create_connection(database)
payments_df = pd.read_sql_query(sql_monthly_payments, conn)
payments_df.head()

Unnamed: 0,tier,month_end,num_payments,amount_of_payments
0,1,2018-01-31,40,66912.91
1,1,2018-02-28,30,77029.77
2,1,2018-03-31,0,
3,1,2018-04-30,45,77125.12
4,1,2018-05-31,38,100050.32


In [39]:
conn = dh.create_connection(database)
new_employers_df = pd.read_sql_query(sql_monthly_new_employers, conn)
new_employers_df.head()

Unnamed: 0,tier,month_end,new_employers
0,1,2018-01-31,0
1,1,2018-02-28,0
2,1,2018-03-31,1
3,1,2018-04-30,2
4,1,2018-05-31,0


In [40]:
conn = dh.create_connection(database)
open_employers_df = pd.read_sql_query(sql_monthly_open_employers, conn)
open_employers_df.head()

Unnamed: 0,tier,month_end,open_employers_eom
0,1,2018-01-31,47
1,1,2018-02-28,47
2,1,2018-03-31,48
3,1,2018-04-30,50
4,1,2018-05-31,50


In [44]:
report_inner = pd.merge(payments_df, new_employers_df, on=["tier", "month_end"])
report_inner.head()

Unnamed: 0,tier,month_end,num_payments,amount_of_payments,new_employers
0,1,2018-01-31,40,66912.91,0
1,1,2018-02-28,30,77029.77,0
2,1,2018-03-31,0,,1
3,1,2018-04-30,45,77125.12,2
4,1,2018-05-31,38,100050.32,0


In [45]:
report_final = pd.merge(report_inner , open_employers_df, on=["tier", "month_end"])
report_final.head()

Unnamed: 0,tier,month_end,num_payments,amount_of_payments,new_employers,open_employers_eom
0,1,2018-01-31,40,66912.91,0,47
1,1,2018-02-28,30,77029.77,0,47
2,1,2018-03-31,0,,1,48
3,1,2018-04-30,45,77125.12,2,50
4,1,2018-05-31,38,100050.32,0,50


In [46]:
report_final.columns = ['Tier','Month end date','Num payments','Amount of payments','New employers','Open employers at EOM']

In [47]:
report_final['Month end date'] = pd.to_datetime(report_final['Month end date'])
report_final['Month end date'] = report_final['Month end date'].dt.strftime('%d/%m/%Y')

In [49]:
report_final.to_csv('../result/monthly_employer_report.csv',index=False)