In [8]:
#DEVELOPING DATASET FOR EACH YEAR
#Goal: 

#To link [EIN, URL] from IRS to [EIN, NTEE1] from NCCS files, and get Mission or Purpose statements for each EIN.
#To- do List: 

#[+] 1. Get [EIN, URL] from IRS. -> df_irs 
#[+] 2. Get [EIN, NTEE1] from NCCS. -> df_nccs 
#[+] 3. Intersect [df_irs, df_nccs] -> df_inter -> "link'year'.csv" 
#[+] 4. Visit each URL in df_inter and get data from relevant tabs. 


import pandas as pd
import re, requests, string, os, gzip, pickle, sys
from bs4 import BeautifulSoup as bs
from tqdm import tqdm_notebook as tqdm
from multiprocessing import Pool
#use regex instead of beautifulsoup, if possible.

year_list = [2015, 2014, 2013, 2012, 2011]
year = year_list[0]

#When we create a data frame with pandas ≤ 0.19.2 and pickle it (using pickle.dump), 
#it is not possible to unpickle it using pandas 0.20.1.
#https://github.com/pandas-dev/pandas/issues/16474


#Create the file if it does not exist, if the file exists: jump to step 4
if(os.path.isfile("/Users/Rushi/Documents/GRAFall2018/Data/intermediary/link"+str(year)+".pkl.gz")==False):
    #Step 1. Get [EIN, URL] from IRS
    irsfile = pd.read_json('https://s3.amazonaws.com/irs-form-990/index_'+str(year)+'.json')
    ein_url=list(map(list, zip(*[[s['EIN'] for s in irsfile['Filings'+str(year)]], [s['URL'] for s in irsfile['Filings'+str(year)]]])))
    df_irs = pd.DataFrame(ein_url, columns=['EIN', 'URL'])


    #Step 2. Get [EIN, NTEE1] from NCCS
    df_nccs1 = pd.read_csv('https://nccs-data.urban.org/data/core/'+str(year)+'/nccs.core'+str(year)+'pc.csv')
    df_nccs = df_nccs1[['EIN', 'NTEE1']]


    #Step 3. Get common URL from df_irs and df_nccs and make corresponding list of [EIN, URL, NTEE1]
    df_nccs['EIN'] = df_nccs['EIN'].apply(str)
    df_inter = pd.DataFrame(pd.merge(df_nccs, df_irs, how='outer', on=['EIN']), columns=['EIN','NTEE1','URL'])
    df_inter.columns = ['EIN', 'NTEE', 'IRS_URL']

    #file size limit on Github - 25 MB
    df_inter.to_pickle('/Users/Rushi/Documents/GRAFall2018/Data/intermediary/link'+str(year)+'.pkl.gz', 'gzip')
    
    del irsfile, ein_url, df_irs, df_nccs1, df_nccs, df_inter


#Step 4: Provide list of tags to search from:

#Year	MissionTags	PurposeTags
#2015, 2014, 2013	ActivityOrMissionDesc	OtherExemptPurposeExpendGrp, TotalExemptPurposeExpendGrp
#2012, 2011	ActivityOrMissionDescription	OtherExemptPurposeExpenditures, TotalExemptPurposeExpenditures
#FormType	Mission	Purpose
#990	Yes	No
#990EZ	No	Yes
#990PF	No	No


#The original tag names are converted into small letters while parsing, for e.g. 'ActivityOrMissionDesc' is parsed as 'activityormissiondesc'.
#So, we will provide original tags converted into small letters for comparision.
#https://github.com/lecy/Open-Data-for-Nonprofit-Research/blob/master/Build_IRS990_E-Filer_Datasets/Data_Dictionary.md
#year| tag| line#

#alltags = ['ActivityOrMissionDesc', 'ActivityOrMissionDescription',
#           'OtherExemptPurposeExpenGrp', 'OtherExemptPurposeExpenditures',
#           'TotalExemptPurposeExpendGrp', 'TotalExemptPurposeExpenditures'] 
    
    
alltags = ['activityormissiondesc', 'activityormissiondescription',
           'otherexemptpurposeexpengrp', 'otherexemptpurposeexpenditures',
           'totalexemptpurposeexpendgrp', 'totalexemptpurposeexpenditures']


df = pd.read_pickle("/Users/Rushi/Documents/GRAFall2018/Data/intermediary/link"+str(year)+".pkl.gz", 'gzip')
no_url = df[pd.isnull(df['IRS_URL'])]
df_inter = df[~ pd.isnull(df['IRS_URL'])]

masterdata=pd.DataFrame(no_url, columns=['EIN', 'NTEE', 'IRS_URL', 'TEXT', 'TEXTTYPE', 'YEAR'])
#masterdata.to_pickle("/Users/Rushi/Documents/GRAFall2018/Data/Masterdata_"+str(year)+".pkl.gz", 'gzip')

#to overcome RecursionError: maximum recursion depth exceeded
sys.setrecursionlimit(10000000)

def build_coredata(r):
    
    print("inside: ",r)
    process_id = os.getpid()
    dest_file = '/Users/Rushi/Documents/GRAFall2018/Data/intermediary/'+str(year)+'/Masterdata_'+str(process_id)+'.pkl.gz'
    
    #assign local dataframe
    masterdata=pd.DataFrame(columns=['EIN', 'NTEE', 'IRS_URL', 'TEXT', 'TEXTTYPE', 'YEAR'])
    for i in range(r[0], r[1]):
        row = df_inter.values[int(i)]
        flag = 0

        
        page = requests.get(str(row[2]), timeout=5)
        
        bss = bs(page.text, 'html.parser')
        
        #Add record with tag name and string for each record tag matched in list "alltags"
        for tag in bss.find_all():
            if tag.name in alltags:
                masterdata.loc[len(masterdata)] = [str(row[0]), row[1], row[2], tag.string, tag.name, str(year)]
                flag = 1
       
        #Add record with EIN, NTEE and URL if no tags are found.
        if(flag == 0):
            masterdata.loc[len(masterdata)] = [str(row[0]), row[1], row[2],'','', str(year)]  
        
        #Data storage: to_pickle overwrites the existing file.
        #Here "Dataframe_2015.pkl.gz" file will not be appended by to_pickle but only existing values will appear.
        #First, we have to read the .pkl.gz, concat the dataframe, and write new dataframe as pkl.gz
        #However, in multiprocessing, doing the same may lead to loss of data
        #Hence, each thread will write in it's seperate .pkl.gz, and at the end all will be combined.
        
        #For every 200 records, move the data from dataframe to .csv file on disk.
        if(i%200==0):
            print("reached: ",i, " on process: ", process_id)
            #Since, pickle doesn't append file but rewrites it, we will first take the exisiting data and concat it with 
            #new data before storing into .pkl.gz
            if(os.path.isfile(dest_file)==True):
                masterdata = pd.concat([masterdata, pd.read_pickle(dest_file, 'gzip')])
                
            masterdata.to_pickle(dest_file, 'gzip')
            del masterdata
            masterdata=pd.DataFrame(columns=['EIN', 'NTEE', 'IRS_URL', 'TEXT', 'TEXTTYPE', 'YEAR'])
            
    #move remainig data
    if(os.path.isfile(dest_file)==True):
                masterdata = pd.concat([masterdata, pd.read_pickle(dest_file, 'gzip')])
                
    masterdata.to_pickle(dest_file, 'gzip')
    del masterdata
    
    
def writedata(masterdata):
    masterdata = pd.concat([masterdata, pd.read_pickle('Masterdata_2015.pkl.gz')])           
    masterdata.to_pickle('Masterdata_2015.pkl.gz')
    return "Added to pickle!"
    
    
def build_coredataV2(r):
    
    process_id = os.getpid()
    print("inside: ",r, process_id)
    #filedir = '/Users/Rushi/Documents/GRAFall2018/Data/intermediary/'+str(year)+'/Masterdata_'+str(process_id)+'.pkl.gz'
    #dest_file = os.path.join(filedir, ('%s_annots.pkl' % 'imagesetfile'))
    #dest_file = filedir
    #print(filedir)
    
    #assign local dataframe
    masterdata=pd.DataFrame(columns=['EIN', 'NTEE', 'IRS_URL', 'TEXT', 'TEXTTYPE', 'YEAR'])
    #for i in tqdm(range(r[0], r[1])):
    for i in range(r[0], r[1]):
        row = df_inter.values[int(i)]
        flag = 0

        page = requests.get(str(row[2]), timeout=5)
        bss = bs(page.text, 'html.parser')
        
        #Add record with tag name and string for each record tag matched in list "alltags"
        for tag in bss.find_all():
            if tag.name in alltags:
                masterdata.loc[len(masterdata)] = [str(row[0]), row[1], row[2], tag.string, tag.name, str(year)]
                flag = 1
       
        #Add record with EIN, NTEE and URL if no tags are found.
        if(flag == 0):
            masterdata.loc[len(masterdata)] = [str(row[0]), row[1], row[2],'','', str(year)]
        
        
        #Data storage: to_pickle overwrites the existing file.
        #Here "Dataframe_2015.pkl.gz" file will not be appended by to_pickle but only existing values will appear.
        #First, we have to read the .pkl.gz, concat the dataframe, and write new dataframe as pkl.gz
        #However, in multiprocessing, doing the same may lead to loss of data
        #Hence, each thread will write in it's seperate .pkl.gz, and at the end all will be combined.
        
    #Since, pickle doesn't append file but rewrites it, we will first take the exisiting data and concat it with 
    #new data before storing into .pkl.gz
    
    written = writedata(masterdata, str(year)+'/Masterdata_'+str(process_id)+'.pkl.gz')
    
    #if(os.path.isfile(str(year)+'/Masterdata_'+str(process_id)+'.pkl.gz')):
    #    masterdata = pd.concat([masterdata, pd.read_pickle(str(year)+'/Masterdata_'+str(process_id)+'.pkl.gz')])
    #    print("length already: ",len(pd.read_pickle(str(year)+'/Masterdata_'+str(process_id)+'.pkl.gz')))
        
    #print(len(masterdata), os.getpid())
    
    #print("Here!!")
                
    #masterdata.to_pickle(str(year)+'/Masterdata_'+str(process_id)+'.pkl.gz')
    
    masterdata=pd.DataFrame(columns=['EIN', 'NTEE', 'IRS_URL', 'TEXT', 'TEXTTYPE', 'YEAR'])
    
    print(r, written, process_id)
    

def writefile(masterdata, r):
    
    #totalmasterdata = 
    if(os.path.isfile(path)):
        masterdata = pd.concat([masterdata, pd.read_pickle(str(year)+'/Masterdata_'+str(process_id)+'.pkl.gz')])
                
    masterdata.to_pickle(path)
    return "Added to Dataset!"
    
    
def build_coredataV3(r):
    
    process_id = os.getpid()
    print("inside: ",r, process_id)
    masterdata=pd.DataFrame(columns=['EIN', 'NTEE', 'IRS_URL', 'TEXT', 'TEXTTYPE', 'YEAR'])
    #for i in tqdm(range(r[0], r[1])):
    for i in range(r[0], r[1]):
        row = df_inter.values[int(i)]
        flag = 0

        page = requests.get(str(row[2]), timeout=5)
        bss = bs(page.text, 'html.parser')
        
        #Add record with tag name and string for each record tag matched in list "alltags"
        for tag in bss.find_all():
            if tag.name in alltags:
                masterdata.loc[len(masterdata)] = [str(row[0]), row[1], row[2], tag.string, tag.name, str(year)]
                flag = 1
       
        #Add record with EIN, NTEE and URL if no tags are found.
        if(flag == 0):
            masterdata.loc[len(masterdata)] = [str(row[0]), row[1], row[2],'','', str(year)]
    
    written = writedata(masterdata)
    print(r, written, process_id)
    

#numbers of URLs accesed by each thread: must be greater than 1000 for this code
#no_urls = 5

#agents: # of cores, chunksize: total values from records to be accesed by a thread at one time
agents = 4
chunksize = 100

#initialize list to be assigned to threads
#records = [[i, i+no_urls] for i in range(0, len(df_inter)+1, no_urls)]
records = [[i, i+chunksize] for i in range(18000, 18800, chunksize)]

#implement parallel processing
with Pool(processes=agents) as pool:
    #print(multiprocessing.current_process())
    pool.map(build_coredataV3, records)
    
#df=pd.read_pickle("/Users/Rushi/Documents/GRAFall2018/Data/Masterdata_"+str(year)+".pkl.gz", 'gzip')
#print(df)

inside:  [18100, 18200] 1520
inside:  [18200, 18300] 1521
inside:  [18300, 18400] 1522
inside:  [18000, 18100] 1519
inside:  [18400, 18500] 1521
inside:  [18500, 18600] 1520
inside:  [18600, 18700] 1522
inside:  [18700, 18800] 1519


EOFError: 