# Connecting to our database and retrieving data
    In this case in particular the database is named irs990 and the filter is credit unions eins

## Always start with your imports

In [1]:
#imports
import pandas as pd
import pyspark
from pyspark.sql import *
from pyspark.sql.types import *
from pyspark.sql import functions as f
from pyspark import SparkContext, SparkConf
import psycopg2
import os

## Read in our credit unions dataframe from a csv using pandas 

In [2]:
# read in credit union data
dat = pd.read_csv('../data/credit_unions.csv')

## Pull the list of eins from our dataframe

In [3]:
# create list of CUs eins
eins = dat.ein.to_list()

## Making the Connection and Session to our Database and Pulling from the Database

#### Order:  
    Connection -> Session -> Pulling Table

#### Make the connection

In [4]:
# Actually making the connection
connection = psycopg2.connect(
    host = 'noahcook_db_1', # Name of database docker container
    database = 'irs990', # Name of database
    user = 'postgres', # Username for our database
    password = 'postgres1234', # Password for our database
    port='5432' # Port to be used for our connection
)

#### Configuration Settings for the session

In [5]:
# Configuration settings for the session
conf = (SparkConf()
    .set("spark.ui.port", "4041") # port of which our spark session is running on
    .set('spark.executor.memory', '4G') # RAM available to our spark session
    .set('spark.driver.memory', '45G') # Simulated HDD for our spark session
    .set('spark.driver.maxResultSize', '10G') # Size of largest results back from requests
    .set('spark.jars', '/home/jovyan/scratch/postgresql-42.2.18.jar')) # Not sure what this does????

#### Actually creating the session and starting it

In [6]:
# Create the session
spark = SparkSession.builder \
    .appName('test') \
    .config(conf=conf) \
    .getOrCreate()

#### Creating the table settings

In [7]:
# Creating the properties for the table that we are going to pull from our database
table_properties = {
    'driver': 'org.postgresql.Driver',  # should match the type of sql that our db uses I think
    'url': 'jdbc:postgresql://noahcook_db_1:5432/irs990', # Location of where our database is
    'user': 'postgres', # Username for our database
    'password': 'postgres1234', # Password for our database
    'dbtable': 'return_ezfrgnoffccntrycd', # Name of table in our database that we want to pull
}

#### Pull table from database and store it in a PySpark DataFrame

In [8]:
table = spark.read \
    .format('jdbc') \
    .option('driver', table_properties['driver']) \
    .option('url', table_properties['url']) \
    .option('dbtable', table_properties['dbtable']) \
    .option('user', table_properties['user']) \
    .load()

#### Filter data from the dataframe to only include values where the eins match the eins pulled from the credit union Pandas DataFrame earlier.

In [9]:
cu_table = table.where(f.col("ein").isin(eins))

#### Optional: 
    convert PySpark DatyFrame to a Pandas DataFrame

In [10]:
cu_pandas = cu_table.toPandas()

#### Optional:
    If you are done with Spark you can stop the session

In [11]:
spark.stop()