In [None]:
# Install Java, Spark, and Findspark
!apt-get update
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
!wget -q -c http://www-us.apache.org/dist/spark/spark-2.4.6/spark-2.4.6-bin-hadoop2.7.tgz
!tar xf spark-2.4.6-bin-hadoop2.7.tgz
!pip install -q findspark

# Set Environment Variables
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-2.4.6-bin-hadoop2.7"

# Start a SparkSession
import findspark
findspark.init()

0% [Working]            Hit:1 http://ppa.launchpad.net/graphics-drivers/ppa/ubuntu bionic InRelease
0% [Waiting for headers] [Connecting to security.ubuntu.com (91.189.91.39)] [Co                                                                               Hit:2 http://archive.ubuntu.com/ubuntu bionic InRelease
                                                                               Get:3 http://archive.ubuntu.com/ubuntu bionic-updates InRelease [88.7 kB]
                                                                               Get:4 http://ppa.launchpad.net/marutter/c2d4u3.5/ubuntu bionic InRelease [15.4 kB]
0% [3 InRelease 15.6 kB/88.7 kB 18%] [Connecting to security.ubuntu.com (91.1890% [1 InRelease gpgv 21.3 kB] [3 InRelease 15.6 kB/88.7 kB 18%] [Connecting to                                                                                Get:5 https://cloud.r-project.org/bin/linux/ubuntu bionic-cran35/ InRelease [3,626 B]
0% [1 InRelease gpgv 21.3 kB] [3 

In [None]:
# Install Postgresql
!wget https://jdbc.postgresql.org/download/postgresql-42.2.9.jar

# Add Postgres to this current SparkSession
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("GhostMode").config("spark.driver.extraClassPath","/content/postgresql-42.2.9.jar").getOrCreate()

--2020-08-08 17:59:27--  https://jdbc.postgresql.org/download/postgresql-42.2.9.jar
Resolving jdbc.postgresql.org (jdbc.postgresql.org)... 72.32.157.228, 2001:4800:3e1:1::228
Connecting to jdbc.postgresql.org (jdbc.postgresql.org)|72.32.157.228|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 914037 (893K) [application/java-archive]
Saving to: ‘postgresql-42.2.9.jar’


2020-08-08 17:59:28 (1.43 MB/s) - ‘postgresql-42.2.9.jar’ saved [914037/914037]



In [None]:
from pyspark import SparkFiles

url = "https://ghostmode.s3.us-east-2.amazonaws.com/creditcard.csv"

spark.sparkContext.addFile(url)
df = spark.read.csv(SparkFiles.get("creditcard.csv"), sep=",", header=True, inferSchema=True)

# Show DataFrame
df.show()

+----+------------------+-------------------+------------------+-------------------+-------------------+-------------------+--------------------+-------------------+------------------+-------------------+------------------+------------------+-------------------+-------------------+-------------------+-------------------+--------------------+-------------------+-------------------+-------------------+--------------------+-------------------+-------------------+-------------------+-------------------+-------------------+--------------------+-------------------+------+-----+
|Time|                V1|                 V2|                V3|                 V4|                 V5|                 V6|                  V7|                 V8|                V9|                V10|               V11|               V12|                V13|                V14|                V15|                V16|                 V17|                V18|                V19|                V20|                 V

In [None]:
df.printSchema()

root
 |-- Time: decimal(10,0) (nullable = true)
 |-- V1: double (nullable = true)
 |-- V2: double (nullable = true)
 |-- V3: double (nullable = true)
 |-- V4: double (nullable = true)
 |-- V5: double (nullable = true)
 |-- V6: double (nullable = true)
 |-- V7: double (nullable = true)
 |-- V8: double (nullable = true)
 |-- V9: double (nullable = true)
 |-- V10: double (nullable = true)
 |-- V11: double (nullable = true)
 |-- V12: double (nullable = true)
 |-- V13: double (nullable = true)
 |-- V14: double (nullable = true)
 |-- V15: double (nullable = true)
 |-- V16: double (nullable = true)
 |-- V17: double (nullable = true)
 |-- V18: double (nullable = true)
 |-- V19: double (nullable = true)
 |-- V20: double (nullable = true)
 |-- V21: double (nullable = true)
 |-- V22: double (nullable = true)
 |-- V23: double (nullable = true)
 |-- V24: double (nullable = true)
 |-- V25: double (nullable = true)
 |-- V26: double (nullable = true)
 |-- V27: double (nullable = true)
 |-- V28: double

In [None]:
print("There are " + str(df.count()) + " rows of transactions.")
print("Totaling " + str(len(df.columns)) + " columns.")

There are 284807 rows of transactions.
Totaling 31 columns.


In [None]:
df1 = df.dropna()
print("After dropping NaN, there are " + str(df1.count()) + " rows of data.")

After dropping NaN, there are 284807 rows of data.


In [None]:
# Change all column name to lower case
for col in df1.columns:
    df1 = df1.withColumnRenamed(col, col.lower())

df1.columns

['time',
 'v1',
 'v2',
 'v3',
 'v4',
 'v5',
 'v6',
 'v7',
 'v8',
 'v9',
 'v10',
 'v11',
 'v12',
 'v13',
 'v14',
 'v15',
 'v16',
 'v17',
 'v18',
 'v19',
 'v20',
 'v21',
 'v22',
 'v23',
 'v24',
 'v25',
 'v26',
 'v27',
 'v28',
 'amount',
 'class']

In [None]:
# add the id column
from pyspark.sql.functions import monotonically_increasing_id

# This will return a new DF with all the columns + id
df1 = df1.withColumn("id", monotonically_increasing_id())

df1.show()

+----+------------------+-------------------+------------------+-------------------+-------------------+-------------------+--------------------+-------------------+------------------+-------------------+------------------+------------------+-------------------+-------------------+-------------------+-------------------+--------------------+-------------------+-------------------+-------------------+--------------------+-------------------+-------------------+-------------------+-------------------+-------------------+--------------------+-------------------+------+-----+---+
|time|                v1|                 v2|                v3|                 v4|                 v5|                 v6|                  v7|                 v8|                v9|                v10|               v11|               v12|                v13|                v14|                v15|                v16|                 v17|                v18|                v19|                v20|              

In [None]:
# Load into AWS RDS Postgres
# Configure settings for RDS
mode = "append"
jdbc_url="jdbc:postgresql://db-creditcard.c0cdrlhmo9ac.us-east-2.rds.amazonaws.com:5432/creditcard_db"
config = {"user":"root", 
          "password": "postgres", 
          "driver":"org.postgresql.Driver"}

In [None]:
# Write DataFrame to RDS table
df1.write.jdbc(url=jdbc_url, table='creditcard', mode=mode, properties=config)