In [0]:
import pandas as pd
# Install Java, Spark, and Findspark
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
!wget -q http://www-us.apache.org/dist/spark/spark-2.4.5/spark-2.4.5-bin-hadoop2.7.tgz
!tar xf spark-2.4.5-bin-hadoop2.7.tgz
!pip install -q findspark

# Set Environment Variables
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-2.4.5-bin-hadoop2.7"

# Start a SparkSession
import findspark
findspark.init()

In [2]:
!wget https://jdbc.postgresql.org/download/postgresql-42.2.9.jar

--2020-05-14 22:34:06--  https://jdbc.postgresql.org/download/postgresql-42.2.9.jar
Resolving jdbc.postgresql.org (jdbc.postgresql.org)... 72.32.157.228, 2001:4800:3e1:1::228
Connecting to jdbc.postgresql.org (jdbc.postgresql.org)|72.32.157.228|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 914037 (893K) [application/java-archive]
Saving to: ‘postgresql-42.2.9.jar’


2020-05-14 22:34:08 (1.05 MB/s) - ‘postgresql-42.2.9.jar’ saved [914037/914037]



In [0]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("CloudETL").config("spark.driver.extraClassPath","/content/postgresql-42.2.9.jar").getOrCreate()

In [4]:
from pyspark import SparkFiles
url="https://qos-bucket.s3.us-east-2.amazonaws.com/credit_default_clean.csv"
spark.sparkContext.addFile(url)
df = spark.read.csv(SparkFiles.get("credit_default_clean.csv"), sep=",", header=True, inferSchema=True)
df.show()

+---+---+---------+------+-----------+--------+---+-----+-----+-----+-----+-----+-----+---------+---------+---------+---------+---------+---------+--------+--------+--------+--------+--------+--------+--------------------------+
|_c0| ID|LIMIT_BAL|   SEX|  EDUCATION|MARRIAGE|AGE|PAY_0|PAY_2|PAY_3|PAY_4|PAY_5|PAY_6|BILL_AMT1|BILL_AMT2|BILL_AMT3|BILL_AMT4|BILL_AMT5|BILL_AMT6|PAY_AMT1|PAY_AMT2|PAY_AMT3|PAY_AMT4|PAY_AMT5|PAY_AMT6|default payment next month|
+---+---+---------+------+-----------+--------+---+-----+-----+-----+-----+-----+-----+---------+---------+---------+---------+---------+---------+--------+--------+--------+--------+--------+--------+--------------------------+
|  0|  1|    20000|Female| University| married| 24|    2|    2|   -1|   -1|   -2|   -2|     3913|     3102|      689|        0|        0|        0|       0|     689|       0|       0|       0|       0|                       yes|
|  1|  2|   120000|Female| University|  single| 26|   -1|    2|    0|    0|    0|   

In [5]:
df = df.select("*").toPandas()
df.head()

Unnamed: 0,_c0,ID,LIMIT_BAL,SEX,EDUCATION,MARRIAGE,AGE,PAY_0,PAY_2,PAY_3,PAY_4,PAY_5,PAY_6,BILL_AMT1,BILL_AMT2,BILL_AMT3,BILL_AMT4,BILL_AMT5,BILL_AMT6,PAY_AMT1,PAY_AMT2,PAY_AMT3,PAY_AMT4,PAY_AMT5,PAY_AMT6,default payment next month
0,0,1,20000,Female,University,married,24,2,2,-1,-1,-2,-2,3913,3102,689,0,0,0,0,689,0,0,0,0,yes
1,1,2,120000,Female,University,single,26,-1,2,0,0,0,2,2682,1725,2682,3272,3455,3261,0,1000,1000,1000,0,2000,yes
2,2,3,90000,Female,University,single,34,0,0,0,0,0,0,29239,14027,13559,14331,14948,15549,1518,1500,1000,1000,1000,5000,no
3,3,4,50000,Female,University,married,37,0,0,0,0,0,0,46990,48233,49291,28314,28959,29547,2000,2019,1200,1100,1069,1000,no
4,4,5,50000,Male,University,married,57,-1,0,-1,0,0,0,8617,5670,35835,20940,19146,19131,2000,36681,10000,9000,689,679,no


In [6]:
user_demo = df[['ID', 'SEX', 'EDUCATION', 'MARRIAGE', 'AGE', 'LIMIT_BAL', 'default payment next month']]
user_demo.head()

Unnamed: 0,ID,SEX,EDUCATION,MARRIAGE,AGE,LIMIT_BAL,default payment next month
0,1,Female,University,married,24,20000,yes
1,2,Female,University,single,26,120000,yes
2,3,Female,University,single,34,90000,no
3,4,Female,University,married,37,50000,no
4,5,Male,University,married,57,50000,no


In [7]:
payment_status = df[['ID', 'PAY_0', 'PAY_2', 'PAY_3', 'PAY_4', 'PAY_5', 'PAY_6']]
payment_status.head()

Unnamed: 0,ID,PAY_0,PAY_2,PAY_3,PAY_4,PAY_5,PAY_6
0,1,2,2,-1,-1,-2,-2
1,2,-1,2,0,0,0,2
2,3,0,0,0,0,0,0
3,4,0,0,0,0,0,0
4,5,-1,0,-1,0,0,0


In [8]:
bill_amount = df[['ID', 'BILL_AMT1', 'BILL_AMT2', 'BILL_AMT3', 'BILL_AMT4', 'BILL_AMT5', 'BILL_AMT6']]
bill_amount.head()

Unnamed: 0,ID,BILL_AMT1,BILL_AMT2,BILL_AMT3,BILL_AMT4,BILL_AMT5,BILL_AMT6
0,1,3913,3102,689,0,0,0
1,2,2682,1725,2682,3272,3455,3261
2,3,29239,14027,13559,14331,14948,15549
3,4,46990,48233,49291,28314,28959,29547
4,5,8617,5670,35835,20940,19146,19131


In [9]:
payment_amount = df[['ID', 'PAY_AMT1', 'PAY_AMT2', 'PAY_AMT3', 'PAY_AMT4', 'PAY_AMT5', 'PAY_AMT6']]
payment_amount.head()
password = 'turambar1'

Unnamed: 0,ID,PAY_AMT1,PAY_AMT2,PAY_AMT3,PAY_AMT4,PAY_AMT5,PAY_AMT6
0,1,0,689,0,0,0,0
1,2,0,1000,1000,1000,0,2000
2,3,1518,1500,1000,1000,1000,5000
3,4,2000,2019,1200,1100,1069,1000
4,5,2000,36681,10000,9000,689,679


In [0]:
from sqlalchemy import create_engine

In [11]:
engine = create_engine(f'postgresql://root:{password}@mypostgresdb.c1ugovqjchx6.us-east-2.rds.amazonaws.com:5432/my_data_class_db')

  """)


In [12]:
engine.table_names()

['bill_amount', 'user_demo', 'payment_status', 'payment_amount']

In [13]:
bill_amount.to_sql(name='bill_amount', con=engine, if_exists='replace', index=False)
user_demo.to_sql(name='user_demo', con=engine, if_exists='replace', index=False)
payment_status.to_sql(name='payment_status', con=engine, if_exists='replace', index=False)
payment_amount.to_sql(name='payment_amount', con=engine, if_exists='replace', index=False)

KeyboardInterrupt: ignored