In [17]:
#Question 1: Installing pyspark and Running the code provided

import pyspark
import os
os.environ['PYSPARK_SUBMIT_ARGS'] = '--packages com.databricks:spark-xml_2.11:0.4.1 pyspark-shell'

sc = pyspark.SparkContext.getOrCreate()

import random

rdd = sc.parallelize([random.randint(0,10000) for i in range(10000)])
rdd.takeOrdered(10, key=lambda x: -x)

[10000, 10000, 9998, 9998, 9997, 9996, 9995, 9995, 9995, 9995]

In [18]:
#Question 2: Reading the CSV File

sqlContext = pyspark.SQLContext(sc)
df = sqlContext.read.format("com.databricks.spark.csv").option("header", "true").load("index_2016.csv")
df.show()
#df.printSchema()


#Ref: https://spark.apache.org/docs/1.6.1/sql-programming-guide.html#starting-point-sqlcontext
#Ref: https://docs.databricks.com/spark/latest/data-sources/read-csv.html

+---------+-----------+---------+----------+----------+--------------------+-----------+--------------+------------------+
|RETURN_ID|FILING_TYPE|      EIN|TAX_PERIOD|  SUB_DATE|       TAXPAYER_NAME|RETURN_TYPE|           DLN|         OBJECT_ID|
+---------+-----------+---------+----------+----------+--------------------+-----------+--------------+------------------+
| 13190365|      EFILE|742661023|    201412|02/09/2016|HARRIET AND HARMO...|      990PF|93491315003445|201543159349100344|
| 13189948|      EFILE|562629114|    201412|02/09/2016|BROWN COMMUNITY D...|      990EZ|93492310002195|201543109349200219|
| 13191270|      EFILE|270678774|    201509|02/09/2016|KIWANIS CLUB OF G...|      990EZ|93492308002265|201513089349200226|
| 13191272|      EFILE|464114252|    201412|02/09/2016| CONFETTI FOUNDATION|      990EZ|93492308002365|201513089349200236|
| 13192838|      EFILE|510311790|    201506|02/10/2016|  SHEPHERD PLACE INC|        990|93493322003275|201523229349300327|
| 13193634|     

In [19]:
#Question 2A: Compare Return Types

#sql = pyspark.SQLContext.sc()
df.groupBy("RETURN_TYPE").count().show()


+-----------+------+
|RETURN_TYPE| count|
+-----------+------+
|      990EO| 28537|
|      990EZ| 84652|
|       990O| 50987|
|      990PF| 53694|
|        990|160550|
+-----------+------+



In [20]:
#Question 2B: Calculate popular dates to file (List the top ten dates)

popular_dates = df.groupBy("SUB_DATE").count().orderBy("count", ascending = False).show(10)

#Ref: http://spark.apache.org/docs/2.1.0/api/python/pyspark.sql.html


+----------+-----+
|  SUB_DATE|count|
+----------+-----+
|08/30/2016| 6687|
|01/21/2016| 5994|
|01/20/2016| 5560|
|01/26/2016| 5432|
|10/13/2016| 5054|
|01/22/2016| 4885|
|02/18/2016| 4877|
|08/16/2016| 4845|
|01/27/2016| 4647|
|01/14/2016| 4469|
+----------+-----+
only showing top 10 rows



Reason 2b: The top ten popular months when the taxes were filed were: 
January (30,987 taxes filed), 
August (11,532 taxes filed)
October(5054 taxes filed)
February(4877 taxes filed)

Form 990 is the form used by the United States Internal Revenue Service to gather financial information about nonprofit organizations. A highest number of taxes were filed in the month of January. 
Form 990, 990-EZ, or 990-PF must be filed by the 15th day of the 5th month after the end of the organization's accounting period. 
This means that most of the nonprofit organizations had their start date 5 month's before January(15-30th) i.e. Aug(15-30th). Apart from this, there is a 3 month extension offered which ia either applied automic or can be filled by the organization.
The next popular dates were in Auguest, which means organizations were formed 5 months before August and so on and so forth. 

In [21]:
#Question 2C: Calculate most popular months to file
from pyspark.sql.functions import *
#from pyspark.sql.functions import month
# Converting it to the unix time stamp as the month function was not taking the normal column
#Ref: https://stackoverflow.com/questions/37038014/pyspark-replace-strings-in-spark-dataframe-column
#Ref: https://docs-snaplogic.atlassian.net/wiki/spaces/SD/pages/2458071/Date+Functions+and+Properties+Spark+SQL

k = df.withColumn('SUB_DATE', from_unixtime(unix_timestamp(df['SUB_DATE'], 'MM/dd/yyyy'), "y-MM-dd'T'hh:mm:ssZ"))

monthpattern = k.select(month('SUB_DATE').alias('month'))
#monthpattern
monthcount = monthpattern.groupBy('month').count().orderBy("count", ascending = False).show()


+-----+-----+
|month|count|
+-----+-----+
|    8|60929|
|    1|60470|
|    2|57674|
|   12|47433|
|   10|38635|
|    9|38189|
|    7|35288|
|    3|17974|
|   11|17832|
|    4| 3996|
+-----+-----+



In [22]:
#Question 3A: Loading XML Files

#There is a method in SparkContext to add files and we can use SparkFiles.get(filename) in load to load the file.

#sc.addFile("https://irs-form-990.s3.amazonaws.com/201611339349202661_public.xml")
#df1 = sqlContext.read.format("xml").options(rowTag="IRS990EZ").load(pyspark.SparkFiles.get("*_public.xml")) 

df2 = sqlContext.read.format("com.databricks.spark.csv").option("header", "true").load("index_2016_sample.csv")
#df2.show()

df3 = df2.filter(df2.RETURN_TYPE == "990EZ")
#df3.count()
#rdd = df3.rdd.map(list)
#df3.show()
#toString = df3.withColumn("OBJECT_ID", df3["OBJECT_ID"].cast(String()))

a = df3.select("OBJECT_ID").rdd.map(lambda x:"https://irs-form-990.s3.amazonaws.com/"+ x[0]+"_public.xml").collect()
#a
for x in range(0, len(a)):
    #print (a[x])
    sc.addFile(a[x])

df4= sqlContext.read.format("xml").options(rowTag="IRS990EZ").load(pyspark.SparkFiles.get("*_public.xml")).coalesce(10).cache()
df4.collect()

[Row(ActivitiesNotPreviouslyRpt=None, ActivitiesNotPreviouslyRptInd='false', AddressChange=None, AddressChangeInd='X', AmendedReturn=None, AmendedReturnInd=None, AmountOfTaxImposedOnOrgMgr=None, AmountOfTaxReimbursedByOrg=None, BenefitsPaidToOrForMembers=None, BenefitsPaidToOrForMembersAmt=None, BooksInCareOfDetail=Row(BusinessName=None, ForeignAddress=None, PersonNm='TERA PAGEL HOLZSCHUH', PhoneNum=3039292308, USAddress=Row(AddressLine1=None, AddressLine1Txt='1566 Quebec St', AddressLine2Txt=None, City=None, CityNm='Denver', State=None, StateAbbreviationCd='CO', ZIPCd=80220, ZIPCode=None)), CashSavingsAndInvestments=None, CashSavingsAndInvestmentsGrp=Row(BOYAmt=41650, EOYAmt=49165), ChgMadeToOrgnzngDocNotRptInd='false', CntrctRcvdGreaterThan100KCnt=None, CompensationOfHghstPdCntrctGrp=None, ContributionsGiftsGrantsEtc=None, ContributionsGiftsGrantsEtcAmt=4565, CostOfGoodsSold=None, CostOfGoodsSoldAmt=0, CostOrOtherBasisExpenseSaleAmt=0, CostOtherBasisAndSalesExpenses=None, DidOrgFileF

In [23]:
#Question 3B: Gross Receipts Amount

import pyspark.sql.functions

max = df4.groupBy().max('GrossReceiptsAmt').collect()
min = df4.groupBy().min('GrossReceiptsAmt').collect()
mean = df4.groupBy().mean('GrossReceiptsAmt').collect()

print("Maximum Gross Receipt Amount: ", max)
print("Minimum Gross Receipt Amount: ", min)
print("Mean Gross Receipt Amount: ", mean)

#df4.max(['GrossReceiptsAmt']).show()
#df4.maximum('GrossReceiptsAmt')

Maximum Gross Receipt Amount:  [Row(max(GrossReceiptsAmt)=199952)]
Minimum Gross Receipt Amount:  [Row(min(GrossReceiptsAmt)=0)]
Mean Gross Receipt Amount:  [Row(avg(GrossReceiptsAmt)=69823.25)]


Reason 3b: Gross Receipt Amount, if less than  10 Million dollar, then such companies are termed as Small companies. However, The amount Maximum Gross Receipt Amount obtained is: 200,000 million dollar. Hence, from the data provided, we could conclude that all the companies are Small listed Companies

In [31]:
#Question 3C: Change in Assets Histogram

df5 = df4.withColumn('Difference', df4.NetAssetsOrFundBalancesEOYAmt - df4.NetAssetsOrFundBalancesBOYAmt)

#df5 = sqlContext.sql('SELECT *, NetAssetsOrFundBalancesEOYAmt - NetAssetsOrFundBalancesBOYAmt AS Difference FROM df4')
#df5.select('Difference').show()

rdd = df5.select('Difference').rdd.flatMap(lambda x: x).histogram(20)
rdd

([-249428.0,
  -226387.5,
  -203347.0,
  -180306.5,
  -157266.0,
  -134225.5,
  -111185.0,
  -88144.5,
  -65104.0,
  -42063.5,
  -19023.0,
  4017.5,
  27058.0,
  50098.5,
  73139.0,
  96179.5,
  119220.0,
  142260.5,
  165301.0,
  188341.5,
  211382],
 [1, 1, 1, 0, 1, 3, 3, 5, 13, 55, 494, 277, 53, 17, 10, 1, 2, 0, 1, 1])