<h1><font size=14><center>Data Engineering Part</center></font></h1>
First loading the necessary libraries:

In [3]:
import pandas as pd
import urllib.request, json
import boto3
import os

Loading the access and secret access keys in case it is provided by user:

In [109]:
myvars = {}
with open("rootkey.csv") as myfile:
    for line in myfile:
        name, var = line.partition("=")[::2]
        myvars[name.strip()] = var.rstrip()
    
    os.environ['AWS_ACCESS_KEY_ID'] = myvars['AWSAccessKeyId']
    os.environ['AWS_SECRET_ACCESS_KEY'] = myvars['AWSSecretKey']

Index listing of available filings are listed in separate JSON and CSV file for each year. Data in CSV contained some other form types than 990, 990EZ, and 990PF, and also unique identifier of the filing seemed not to be right. Therefore, I used JSON files. To get details of the filings, I read each of them, and merged them into one big file:

In [4]:
filings = []
for year in ['2011', '2012', '2013', '2014', '2015', '2016', '2017', '2018']:
    url_link = 'https://s3.amazonaws.com/irs-form-990/index_'+ year + '.json'
    with urllib.request.urlopen(url_link) as url:
        filings.extend(json.loads(url.read().decode())['Filings' + year])
    print(year, len(filings))

2011 203075
2012 464697
2013 726146
2014 1113675
2015 1374709
2016 1753129
2017 2242142
2018 2373310


In [None]:
df = pd.DataFrame.from_dict(filings)


In [121]:
df.head()

Unnamed: 0,DLN,EIN,FormType,LastUpdated,ObjectId,OrganizationName,SubmittedOn,TaxPeriod,URL,TaxYear,LatestSubmittedOn,LatestObjectId
0,93493316003251,591971002,990,2016-03-21T17:23:53,201103169349300325,ANGELUS INC,2011-11-30,201009,https://s3.amazonaws.com/irs-form-990/20110316...,2009,2011-11-30,201103169349300325
1,93493313012311,251713602,990,2016-03-21T17:23:53,201113139349301231,TOUCH-STONE SOLUTIONS INC,2011-11-30,201106,https://s3.amazonaws.com/irs-form-990/20111313...,2010,2011-11-30,201113139349301231
2,93493313013011,232705170,990,2016-03-21T17:23:53,201113139349301301,RONALD MCDONALD HOUSE CHARITIES- PHILADELPHIA ...,2011-11-30,201012,https://s3.amazonaws.com/irs-form-990/20111313...,2010,2011-11-30,201113139349301301
3,93493313013111,581805618,990,2016-03-21T17:23:53,201113139349301311,TORRINGTON VOA ELDERLY HOUSING INC BELL PARK T...,2011-11-30,201106,https://s3.amazonaws.com/irs-form-990/20111313...,2010,2011-11-30,201113139349301311
4,93493313013161,581876019,990,2016-03-21T17:23:53,201113139349301316,HOUSTON VOA INDEPENDENT HOUSING INC HEIGHTS MANOR,2011-11-30,201106,https://s3.amazonaws.com/irs-form-990/20111313...,2010,2011-11-30,201113139349301316


My observations:
- There are 2.4 million filings available so far.
- There are 3 types of forms, namely 990, 990EZ, 990PF. Each has different XML structure providing different forms and level of information that can be extracted.
- Names of certain tags in XML format changed over the time (from 2012 to 2013). So, tags of each year and each tax form should be assessed.

The assignment requires to use free services from AWS to calculate average Year-on-Year revenue (YoY) growth both nationally and by state.

Firstly, what is YoY growth? I calculated YoY% as the percentage change of current year revenue compared to the previous year using the following formula:

<center>$\text{YoY} \% = 100\%\frac{\text{CurrentYearRevenue} - \text{PreviousYearRevenue}}{\text{PreviousYearRevenue}}$</center>

This has to be calculated for each company, and then values of each filing should be averaged over years nationally and also per state. Therefore, 4 basic information is needed:
- State of the company
- Tax year of the filing
- Current year revenue of the company
- Previous year revenue of the company

When looking the available tags in 3 different form types:
- Form 990 contains both current and previous year revenue in the same form.
- Form 990EZ contains only current year revenue.
- Form 990PF doesnot contain any revenue information.

The following services of AWS are offered freely, and could be useful for this assignment:
- AWS Lambda
- AWS EC2
- AWS S3
- AWS DynamoDB
- AWS RDS
However, the amount of all data to be processed is quite a lot compared to the compute/processing capacity AWS Free Tier offers. For example, lambda can be invoked 1million times, and there are around 2.4 million files that need to be processed. Therefore, I had to put limitation in the analysis, and I had to scope analysis to only 1 year. If the compute power is increased, the same logic, and code be used to processed everything easily. Therefore, the rest of the work concentrates on a certain year of analysis.

For calculations, I need both previous and current year revenue info. Form990 can provide both. But Form990EZ can only provide the current year. Actually, this missing info could be derived by finding the company's previous year filing, but in that case the amount of data needs to be processed is going to increase. Therefore, I only scoped to analysis to Form990 only which provides both info in the same file.  

Form type is directly available in index files, and the tax year of the filing has to be deduced from the info available in the index file. Year of filing, and the tax period could be the relevant parts for this purpose. Simple guess would say tax year would 1 year before the year of filing. However, this is not true due to either corrections in filings or late filings. Therefore, tax period remains the only source to deduce the tax year in order to filter out the filings of a certain year. How tax period (YYYYMM) translates to tax year (TY) is as follows:
- if MM == 12, then TY is YYYY
- if MM != 12, then TY is YYYY-1

For example, tax year for tax period 201512 is 2015, and 201511 is 2014.

In [6]:
df['TaxYear'] = df['TaxPeriod'].map(lambda x: int(x[0:4]) if x[4:6] == '12' else int(x[0:4])-1)

In [10]:
pd.crosstab(df.FormType, df.TaxYear)

TaxYear,2009,2010,2011,2012,2013,2014,2015,2016
FormType,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1,Unnamed: 6_level_1,Unnamed: 7_level_1,Unnamed: 8_level_1
990,33360,123107,159539,179675,198738,218614,232760,193153
990EZ,15500,63253,82066,93769,104538,116461,124330,115624
990PF,2352,25275,34597,39936,45897,53443,58677,58646


The tax years range from 2009 to 2016 given the filing years range from 2011 to 2018. As can be seen from the breakdown of numbers, 2009 and 2016 are not complete yet.

Moreover, I expected to have a company had only one filing per form type per tax year per company. However, this was not case as sometimes there are more than one. The reason for this seems to be due to resubmission of corrections in filing at a later time, and filing the same form more than once. So, I cleaned the data considering these facts:

In [7]:
df['LatestSubmittedOn'] = df.groupby(['ObjectId'])['SubmittedOn'].transform(max)

In [8]:
df['LatestObjectId'] = df.groupby(['EIN', 'TaxYear', 'FormType'])['ObjectId'].transform(max)

In [None]:
df_updated = df.query("SubmittedOn == LatestSubmittedOn").query("ObjectId == LatestObjectId")

In [12]:
pd.crosstab(df_updated.FormType, df_updated.TaxYear)

TaxYear,2009,2010,2011,2012,2013,2014,2015,2016
FormType,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1,Unnamed: 6_level_1,Unnamed: 7_level_1,Unnamed: 8_level_1
990,33050,122077,158108,178128,197114,216584,230910,192303
990EZ,15465,63069,81813,93446,104173,116023,123885,115335
990PF,2346,25196,34481,39780,45713,53192,58422,58501


Now, it is time to scrape the data from these urls, and save them either into S3 or DynamoDB. First I wrote the following piece of code to read the xml in the url, and to read the certain tags which give the relevant information, and finally to push them to DynamoDB.

In [None]:
# This code goes into lambda function.

from xml.etree import ElementTree as ET
import urllib.request 
import boto3
import re

def lambda_handler(event, context):
    
    with urllib.request.urlopen(event['url_xml']) as url:
        document = url.read().decode()
        
    document = re.sub(' xmlns="[^"]+"', '', document, count=1)
    
    page = ET.fromstring(document)
    
    tax_yr_path = './ReturnHeader/TaxYear'
    ein_path    = './ReturnHeader/Filer/EIN'
    state_path  = './ReturnHeader/Filer/USAddress/State'
    py_rv_path  = './ReturnData/IRS990/TotalRevenuePriorYear'
    cy_rv_path  = './ReturnData/IRS990/TotalRevenueCurrentYear'
    bn_path     = './ReturnHeader/Filer/Name/BusinessNameLine1'
    
    objectID = event['url_xml'].split('/')[-1].split('_')[0]
    tax_yr = page.find(tax_yr_path).text
    state = page.find(state_path).text
    name = page.find(bn_path).text
    ein = page.find(ein_path).text
    
    py_rv = "0"
    py_rv_node = page.find(py_rv_path)
    if py_rv_node is not None:
        py_rv = py_rv_node.text
    
    cy_rv = "0"
    cy_rv_node = page.find(cy_rv_path)
    if cy_rv_node is not None:
        cy_rv = cy_rv_node.text
    
    dynamodb = boto3.resource('dynamodb', region_name = 'us-east-1')
    
    table = dynamodb.Table('revenues')
    table.put_item(
        Item={
            'ObjectID': objectID,
            'TaxYear': int(tax_yr),
            'EIN': int(ein),
            'State': state,
            'BusinessName': name,
            'PreviousYear_Revenue': int(py_rv),
            'CurrentYear_Revenue': int(cy_rv),
        }
    )
    
    return


Next cell runs over all the urls for a certain tax year, and invoke the lambda function to scrape the data. There is a point that I would like to point out: I first ran a test for year 2009 (which is the smallest set) to check if everything works. The code works just fine, however the whole run was very slow, and it took like 8 hours for approx 50k files to complete. Therefore, I did some test to understand which part is problematic. And it turns out that single asynchronous function invocation takes around 500ms latency to invoke, and come back, and therefore it can only call it around 125 times per minute. I even tried to call a dummy lambda function a couple of thousand times inside another one, but I couldnot go beyond 100ms which I think is still very slow. I checked all possible configuration options in lambda service, but I couldnot go beyond this although it allows 1000 concurrent lambda function invocations. Therefore, I didnot process a bigger batch for tax year 2013 or 2014 as it is going to take more than 24hours. Therefore, I kept the results of 2009, and gave YoY statistics based on 2009.

In [116]:
lamda_client = boto3.client('lambda', region_name = 'us-east-1')

In [158]:
for url_xml in list(df_updated.query("TaxYear == 2009").query("FormType == '990'").URL):
    data = {'url_xml': url_xml}
    response = lamda_client.invoke(
        FunctionName='scrapeXML',
        InvocationType='Event',
        Payload= json.dumps(data),
    )

Data now resides in dynamdb table. I could run EMR service to read and to do aggregations  by directly accessing data on dynamodb, however EMR is not freely available. Therefore, I decided to read the all data back to pandas dataframe, and do aggregations  quickly there.

In [111]:
dynamodb = boto3.resource('dynamodb', region_name='us-east-1')
    
table = dynamodb.Table('revenues')
print(table.creation_date_time)

2018-04-10 00:25:47.166000+02:00


Dynamo doesnot return all data in function call. Therefore, small parts of data should be retrieved recursively, and appended into another list.

In [112]:
items = []

response = table.scan()
items.extend(response['Items'])
print(len(items))

while 'LastEvaluatedKey' in response:
    response = table.scan(ExclusiveStartKey=response['LastEvaluatedKey'])
    items.extend(response['Items'])
    print(len(items))

7508
14992
22494
29987
37477
44967
50825


In [113]:
df_scraped_data = pd.DataFrame(items)

This part returns national average YoY growth:

In [114]:
(df_scraped_data
 .query("TaxYear == 2009 & PreviousYear_Revenue != 0")
 .merge(df_updated.query("TaxYear == 2009").query("FormType == '990'")[['ObjectId']], left_on='ObjectID', right_on='ObjectId')
 .assign(YoY = lambda r: pd.to_numeric(100*(r.CurrentYear_Revenue - r.PreviousYear_Revenue)/r.PreviousYear_Revenue))
 .groupby('TaxYear')['YoY'].mean()
)

TaxYear
2009    1565.916845
Name: YoY, dtype: float64

This part returns average YoY growth by state:

In [115]:
(df_scraped_data
 .query("TaxYear == 2009 & PreviousYear_Revenue != 0")
 .merge(df_updated.query("TaxYear == 2009").query("FormType == '990'")[['ObjectId']], left_on='ObjectID', right_on='ObjectId')
 .assign(YoY = lambda r: pd.to_numeric(100*(r.CurrentYear_Revenue - r.PreviousYear_Revenue)/r.PreviousYear_Revenue))
 .groupby(['TaxYear','State'])['YoY'].mean()
)

TaxYear  State
2009     AK         138.081261
         AL          19.060833
         AR           4.274604
         AZ          40.320089
         CA        9189.569603
         CO          30.946305
         CT         -52.225394
         DC          27.430759
         DE          16.152078
         FL         -27.896028
         GA          -2.992968
         HI           2.118341
         IA          61.748037
         ID           8.299954
         IL          11.917956
         IN          22.979566
         KS        1289.183364
         KY          58.489293
         LA          24.969474
         MA          69.111122
         MD         155.874883
         ME          24.819283
         MI          39.031880
         MN          17.964151
         MO          70.678746
         MS          20.137654
         MT          15.524128
         NC           7.862190
         ND          74.903984
         NE       81619.948393
         NH          31.721882
         NJ          35.

Future Work:
- Improve lambda function invocation to process a bigger and complete tax year.