In [1]:
import pandas as pd
import numpy as np
import os
from pyspark import SparkConf #, SparkContext 
from pyspark.sql import SparkSession #, SQLContext https://spark.apache.org/docs/1.6.1/sql-programming-guide.html
from pyspark.sql import functions as F # access to the sql functions https://spark.apache.org/docs/latest/api/python/pyspark.sql.html#module-pyspark.sql.functions
from IPython.display import HTML

In [2]:
warehouse_location = os.path.abspath('../../../data/spark-warehouse')
print(warehouse_location)
# make sure you have set the warehouse location to 'hom/jovyan/data/spark-warehouse'

/home/jovyan/data/spark-warehouse


In [3]:
if os.path.normpath("/home/jovyan/data/spark-warehouse") != warehouse_location:
    print('\x1b[6;37;41m' + 'Your path is not correct' + '\x1b[0m')

In [4]:
# Create the session
conf = (SparkConf()
    .set("spark.ui.port", "4041")
    .set('spark.jars', '/home/jovyan/scratch/postgresql-42.2.18.jar')
    .set("spark.sql.inMemoryColumnarStorage.compressed", True) # default
    .set("spark.sql.inMemoryColumnarStorage.batchSize",10000) # default
    .set("spark.sql.warehouse.dir", warehouse_location) # set above
    .set("spark.driver.memory", "7g")    
    )

# Create the Session (used to be context)
# you can move the number up or down depending on your memory and processors "local[*]" will use all.
spark = SparkSession.builder \
    .master("local[3]") \
    .appName('test') \
    .config(conf=conf) \
    .getOrCreate()

In [58]:
# spark.stop()

In [5]:
properties = {
    'driver': 'org.postgresql.Driver',
    'url': 'jdbc:postgresql://c451_db_1:5432/irs990',
    'user': 'postgres',
    'password': 'postgres1234',
}


In [6]:
# schema table

schema = spark.read \
    .format('jdbc') \
    .option('driver', properties['driver']) \
    .option('url', properties['url']) \
    .option('dbtable', "information_schema.tables") \
    .option('user', properties['user']) \
    .load()



In [7]:
tables = schema.toPandas().filter(['table_catalog', 'table_schema', 'table_name']).query('table_schema == "public"')

In [8]:
HTML(tables.to_html())

Unnamed: 0,table_catalog,table_schema,table_name
0,irs990,public,address_table
1,irs990,public,cmsid_eins
2,irs990,public,django_migrations
3,irs990,public,excess_benefits
4,irs990,public,excess_benefits_types
5,irs990,public,filing_filing
6,irs990,public,fl_loans_from
7,irs990,public,insider_assistance
8,irs990,public,insider_assistance_types
9,irs990,public,insider_transactions


In [11]:
import shutil
shutil.rmtree('/home/jovyan/data/spark-warehouse/irs990.db')
spark.sql('DROP DATABASE IF EXISTS irs990 CASCADE;')
spark.sql("create database irs990")

# shutil.rmtree('/home/jovyan/data/spark-warehouse/irs990.db/address_table')
# shutil.rmtree('/home/jovyan/data/spark-warehouse/irs990.db/return_EZOffcrDrctrTrstEmpl')
# shutil.rmtree('/home/jovyan/data/spark-warehouse/irs990.db/tmp_990ez_employees')

DataFrame[]

In [38]:

# spark.sql('DROP TABLE IF EXISTS return_EZOffcrDrctrTrstEmpl')
# spark.sql('DROP TABLE IF EXISTS address_table')
# spark.sql('DROP TABLE IF EXISTS tmp_990ez_employees')


DataFrame[]

In [12]:
spark.sql("USE irs990")

DataFrame[]

In [13]:
return_EZOffcrDrctrTrstEmpl = spark.read \
    .format('jdbc') \
    .option('driver', properties['driver']) \
    .option('url', properties['url']) \
    .option('dbtable', "return_EZOffcrDrctrTrstEmpl") \
    .option('user', properties['user']) \
    .load()

In [14]:
address = spark.read \
    .format('jdbc') \
    .option('driver', properties['driver']) \
    .option('url', properties['url']) \
    .option('dbtable', 'address_table') \
    .option('user', properties['user']) \
    .load()

In [15]:
print(address.count())
print(return_EZOffcrDrctrTrstEmpl.count())

2243560
4698799


In [16]:
print('Number of partitions: {}'.format(address.rdd.getNumPartitions()))
print('Number of partitions: {}'.format(return_EZOffcrDrctrTrstEmpl.rdd.getNumPartitions()))

Number of partitions: 1
Number of partitions: 1


In [17]:
address = address.repartition(50)

In [18]:
return_EZOffcrDrctrTrstEmpl = return_EZOffcrDrctrTrstEmpl.repartition(50)

In [19]:
print('Number of partitions: {}'.format(address.rdd.getNumPartitions()))
print('Number of partitions: {}'.format(return_EZOffcrDrctrTrstEmpl.rdd.getNumPartitions()))

Number of partitions: 50
Number of partitions: 50


In [20]:
address.write.saveAsTable("address_table")

In [40]:
spark.sql('SHOW TABLES IN irs990').show()


+--------+--------------------+-----------+
|database|           tableName|isTemporary|
+--------+--------------------+-----------+
|  irs990|       address_table|      false|
|  irs990|return_ezoffcrdrc...|      false|
+--------+--------------------+-----------+



In [22]:
return_EZOffcrDrctrTrstEmpl.write.saveAsTable('return_EZOffcrDrctrTrstEmpl')

In [41]:
%%time
sql_query_join = """
SELECT addt.*, rez.PrsnNm, rez.TtlTxt, rez.CmpnstnAmt
FROM return_EZOffcrDrctrTrstEmpl as rez
    LEFT JOIN address_table as addt
    ON rez.ein = addt.ein
    AND rez.object_id = addt.object_id
    ORDER BY addt.ein DESC, addt.object_id DESC;
"""

tmp_990ez_employees = spark.sql(sql_query_join)

tmp_990ez_employees.write.saveAsTable('tmp_990ez_employees')


CPU times: user 5.64 ms, sys: 2.3 ms, total: 7.94 ms
Wall time: 36.2 s


In [42]:
%%time
dat = spark.read.parquet("../../../data/spark-warehouse/irs990.db/tmp_990ez_employees/")
dat.count()


CPU times: user 3.33 ms, sys: 597 µs, total: 3.93 ms
Wall time: 806 ms


4698887

In [None]:
# https://github.com/jsfenfen/990-xml-database/blob/master/directors.sh

# DROP TABLE IF EXISTS tmp_990ez_employees;
# SELECT address_table.*,
# 	'/IRS990EZ' as form,
#    "PrsnNm",
#    "TtlTxt",
#    "CmpnstnAmt" 
#    INTO temporary table tmp_990EZ_employees
#    FROM return_EZOffcrDrctrTrstEmpl
# 	LEFT JOIN address_table ON return_EZOffcrDrctrTrstEmpl.ein = address_table.ein
# 	AND return_EZOffcrDrctrTrstEmpl.object_id= address_table.object_id;

In [39]:
temp = tmp_990ez_employees.limit(100).toPandas()
temp

Unnamed: 0,ein,object_id,RtrnHdr_TxPrdEndDt,RtrnHdr_TxYr,BsnssNm_BsnssNmLn1Txt,BsnssNm_BsnssNmLn2Txt,BsnssOffcr_PrsnNm,BsnssOffcr_PrsnTtlTxt,BsnssOffcr_PhnNm,BsnssOffcr_EmlAddrssTxt,...,USAddrss_SttAbbrvtnCd,USAddrss_ZIPCd,FrgnAddrss_AddrssLn1Txt,FrgnAddrss_AddrssLn2Txt,FrgnAddrss_CtyNm,FrgnAddrss_PrvncOrSttNm,FrgnAddrss_CntryCd,PrsnNm,TtlTxt,CmpnstnAmt
0,996078202,201921349349202282,2018-12-31,2018,JOSEPH CAMPBELL FOUNDATION,ENDOWMENT TRUST,EDWARD C HORTON,TRUSTEE,9732180500,,...,NY,10014,,,,,,EDWARD C HORTON,TRUSTEE,0
1,996078202,201921349349202282,2018-12-31,2018,JOSEPH CAMPBELL FOUNDATION,ENDOWMENT TRUST,EDWARD C HORTON,TRUSTEE,9732180500,,...,NY,10014,,,,,,ROBERT WALTER,TRUSTEE,0
2,996078202,201921349349202282,2018-12-31,2018,JOSEPH CAMPBELL FOUNDATION,ENDOWMENT TRUST,EDWARD C HORTON,TRUSTEE,9732180500,,...,NY,10014,,,,,,THEODORE DILLINGHAM,TRUSTEE,0
3,996078202,201801289349200760,2017-12-31,2017,JOSEPH CAMPBELL FOUNDATION,ENDOWMENT TRUST,EDWARD C HORTON,TRUSTEE,9732180500,,...,NY,10014,,,,,,ROBERT WALTER,TRUSTEE,0
4,996078202,201801289349200760,2017-12-31,2017,JOSEPH CAMPBELL FOUNDATION,ENDOWMENT TRUST,EDWARD C HORTON,TRUSTEE,9732180500,,...,NY,10014,,,,,,THEODORE DILLINGHAM,TRUSTEE,0
...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
95,996022758,201413029349200641,2014-06-30,2013,PUNAHOU ALUMNI ASSOCIATION,,JENNIFER HONG,TREASURER,8089445740,,...,HI,96822,,,,,,BONNIE-LEE PANG,PAST PRESIDENT,0
96,996022758,201413029349200641,2014-06-30,2013,PUNAHOU ALUMNI ASSOCIATION,,JENNIFER HONG,TREASURER,8089445740,,...,HI,96822,,,,,,RICHARD ASATO,PRESIDENT,0
97,996022758,201413029349200641,2014-06-30,2013,PUNAHOU ALUMNI ASSOCIATION,,JENNIFER HONG,TREASURER,8089445740,,...,HI,96822,,,,,,TREVER ASAM,BOARD MEMBER,0
98,996022758,201413029349200641,2014-06-30,2013,PUNAHOU ALUMNI ASSOCIATION,,JENNIFER HONG,TREASURER,8089445740,,...,HI,96822,,,,,,RANDALL KAM,SECOND VICE PRESIDENT,0
