In [None]:
# Install Java, Spark, and Findspark
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
!wget -q http://www-us.apache.org/dist/spark/spark-2.4.6/spark-2.4.6-bin-hadoop2.7.tgz
!tar xf spark-2.4.6-bin-hadoop2.7.tgz
!pip install -q findspark

# Set Environment Variables
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-2.4.6-bin-hadoop2.7"

# Start a SparkSession
import findspark
findspark.init()

!wget https://jdbc.postgresql.org/download/postgresql-42.2.9.jar

# Start Spark session
from pyspark.sql import SparkSession
from pyspark.conf import SparkConf
from pyspark.sql.types import *

conf = SparkConf().set("spark.files.overwrite", "true").set('spark.driver.extraClassPath', '/content/postgresql-42.2.9.jar')
spark = SparkSession.builder.appName("ETL2").config("spark.driver.extraClassPath","/content/postgresql-42.2.9.jar",conf=conf).getOrCreate()

--2020-08-04 19:38:36--  https://jdbc.postgresql.org/download/postgresql-42.2.9.jar
Resolving jdbc.postgresql.org (jdbc.postgresql.org)... 72.32.157.228, 2001:4800:3e1:1::228
Connecting to jdbc.postgresql.org (jdbc.postgresql.org)|72.32.157.228|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 914037 (893K) [application/java-archive]
Saving to: ‘postgresql-42.2.9.jar’


2020-08-04 19:38:36 (8.97 MB/s) - ‘postgresql-42.2.9.jar’ saved [914037/914037]



In [None]:
#Create Function

def etl_load(table_name,  gcs_bucket, file_name, year):
  from pyspark import SparkFiles 
  
  #Define Schema
  schema = StructType([
    StructField("CMTE_ID",StringType(),True),
    StructField("AMNDT_IND",StringType(),True),
    StructField("RPT_TP",StringType(),True),
    StructField("TRANSACTION_PGI",StringType(),True),
    StructField("IMAGE_NUM",StringType(),True),
    StructField("TRANSACTION_TP",StringType(),True),
    StructField("ENTITY_TP",StringType(),True),
    StructField("NAME",StringType(),True),
    StructField("CITY",StringType(),True),
    StructField("STATE",StringType(),True),
    StructField("ZIP",StringType(),True),
    StructField("EMPLOYER",StringType(),True),
    StructField("OCCUPATION",StringType(),True),
    StructField("TRANSACTION_DT",StringType(),True),
    StructField("TRANSACTION_AMT",IntegerType(),True),
    StructField("OTHER_ID",StringType(),True),
    StructField("TRAN_ID",StringType(),True),
    StructField("FILE_NUM",StringType(),True),
    StructField("MEMO_CD",StringType(),True),
    StructField("MEMO_TEXT",StringType(),True),
    StructField("SUB_ID",StringType(),True)])
      
  #Unzip file and overwrite for the job
  url=gcs_bucket + file_name
  print(url)
  !wget $url
  !unzip -o  $file_name  
  new_file_name=year+file_name
  !mv -f itcont.txt $new_file_name

  spark.sparkContext.addFile(new_file_name)
  df = spark.read.csv(SparkFiles.get(new_file_name), sep="|", header=False, inferSchema=True, schema=schema)

  df.show()

  mode = "append"
  jdbc_url="jdbc:postgresql://34.67.52.115/team5k"
  postgres_config = {"user":"postgres", 
            "password": "team5kteam5k", 
            "driver":"org.postgresql.Driver"}
  
  #Limit ETL to 6 States
  #six_state_df=df.filter(("STATE='NC'" | "STATE='AZ'" | "STATE='FL'" | "STATE='MI'" | "STATE='WI'" | "STATE='PA'"))
  six_state_df=df.filter("STATE in ('NC','AZ','FL','MI','WI','PA')")
  six_state_df.show()
  print(six_state_df.count())

  print("Starting " +year)
  # Read in data to dataframe
  #df = pd.read_csv(year+"/itcont.txt", sep="|", low_memory=False)
  #df.head()
  # Write file
  six_state_df.write.jdbc(url=jdbc_url, table=table_name, mode=mode, properties=postgres_config)

  #Clean up file
  !rm itcont.txt

  print(year + " Complete")

In [None]:
#Run ETL Function
etl_load('donations', 'https://storage.googleapis.com/team5k/donations/', 'indiv00.zip','2000')
etl_load('donations', 'https://storage.googleapis.com/team5k/donations/', 'indiv02.zip','2002')
etl_load('donations', 'https://storage.googleapis.com/team5k/donations/', 'indiv04.zip','2004')
etl_load('donations', 'https://storage.googleapis.com/team5k/donations/', 'indiv06.zip','2006')
etl_load('donations', 'https://storage.googleapis.com/team5k/donations/', 'indiv08.zip','2008')
print("2008 Complete")


https://storage.googleapis.com/team5k/donations/indiv00.zip
--2020-08-04 19:47:29--  https://storage.googleapis.com/team5k/donations/indiv00.zip
Resolving storage.googleapis.com (storage.googleapis.com)... 172.217.212.128, 172.217.214.128, 2607:f8b0:4001:c1f::80, ...
Connecting to storage.googleapis.com (storage.googleapis.com)|172.217.212.128|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 45751800 (44M) [application/x-zip-compressed]
Saving to: ‘indiv00.zip.3’


2020-08-04 19:47:29 (287 MB/s) - ‘indiv00.zip.3’ saved [45751800/45751800]

Archive:  indiv00.zip
  inflating: itcont.txt              
+---------+---------+------+---------------+-----------+--------------+---------+------------------+---------+-----+-----+--------------------+----------+--------------+---------------+--------+-------+--------+-------+---------+-------------------+
|  CMTE_ID|AMNDT_IND|RPT_TP|TRANSACTION_PGI|  IMAGE_NUM|TRANSACTION_TP|ENTITY_TP|              NAME|     CITY|STATE|  Z

In [None]:

etl_load('donations', 'https://storage.googleapis.com/team5k/donations/', 'indiv10.zip','2010')
etl_load('donations', 'https://storage.googleapis.com/team5k/donations/', 'indiv12.zip','2012')
etl_load('donations', 'https://storage.googleapis.com/team5k/donations/', 'indiv14.zip','2014')
etl_load('donations', 'https://storage.googleapis.com/team5k/donations/', 'indiv16.zip','2016')

https://storage.googleapis.com/team5k/donations/indiv10.zip
--2020-08-04 19:50:16--  https://storage.googleapis.com/team5k/donations/indiv10.zip
Resolving storage.googleapis.com (storage.googleapis.com)... 172.217.212.128, 172.217.214.128, 108.177.111.128, ...
Connecting to storage.googleapis.com (storage.googleapis.com)|172.217.212.128|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 75717456 (72M) [application/x-zip-compressed]
Saving to: ‘indiv10.zip’


2020-08-04 19:50:18 (84.0 MB/s) - ‘indiv10.zip’ saved [75717456/75717456]

Archive:  indiv10.zip
  inflating: itcont.txt              
+---------+---------+------+---------------+-----------+--------------+---------+--------------------+-------------+-----+-----+--------------------+--------------------+--------------+---------------+--------+--------------+--------+-------+---------+-------------------+
|  CMTE_ID|AMNDT_IND|RPT_TP|TRANSACTION_PGI|  IMAGE_NUM|TRANSACTION_TP|ENTITY_TP|                NAME|    

In [None]:
etl_load('donations', 'https://storage.googleapis.com/team5k/donations/', 'indiv18.zip','2018')

https://storage.googleapis.com/team5k/donations/indiv18.zip
--2020-08-04 20:03:09--  https://storage.googleapis.com/team5k/donations/indiv18.zip
Resolving storage.googleapis.com (storage.googleapis.com)... 209.85.234.128, 108.177.112.128, 172.217.212.128, ...
Connecting to storage.googleapis.com (storage.googleapis.com)|209.85.234.128|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 1480281430 (1.4G) [application/x-zip-compressed]
Saving to: ‘indiv18.zip’


2020-08-04 20:03:23 (104 MB/s) - ‘indiv18.zip’ saved [1480281430/1480281430]

Archive:  indiv18.zip
  inflating: itcont.txt              
  inflating: by_date/itcont_2018_20020411_20170529.txt  
  inflating: by_date/itcont_2018_20170530_20170824.txt  
  inflating: by_date/itcont_2018_20170825_20171101.txt  
  inflating: by_date/itcont_2018_20171102_20171228.txt  
  inflating: by_date/itcont_2018_20171229_20180426.txt  
  inflating: by_date/itcont_2018_20180427_20180705.txt  
  inflating: by_date/itcont_2018_

In [None]:
etl_load('donations', 'https://storage.googleapis.com/team5k/donations/', 'indiv20.zip','2020')
print("All Years Complete")

https://storage.googleapis.com/team5k/donations/indiv20.zip
--2020-08-04 20:16:00--  https://storage.googleapis.com/team5k/donations/indiv20.zip
Resolving storage.googleapis.com (storage.googleapis.com)... 209.85.145.128, 172.217.212.128, 172.217.214.128, ...
Connecting to storage.googleapis.com (storage.googleapis.com)|209.85.145.128|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 1993105940 (1.9G) [application/x-zip-compressed]
Saving to: ‘indiv20.zip’


2020-08-04 20:16:17 (117 MB/s) - ‘indiv20.zip’ saved [1993105940/1993105940]

Archive:  indiv20.zip
  inflating: itcont.txt              
  inflating: by_date/itcont_2020_20010425_20190426.txt  
  inflating: by_date/itcont_2020_20190427_20190629.txt  
  inflating: by_date/itcont_2020_20190630_20190810.txt  
  inflating: by_date/itcont_2020_20190811_20190915.txt  
  inflating: by_date/itcont_2020_20190916_20191014.txt  
  inflating: by_date/itcont_2020_20191015_20191112.txt  
  inflating: by_date/itcont_2020_