# Completed Mapping Logic

In [16]:
#import spark as sp
import findspark
findspark.init()

import pyspark
from pyspark.sql import SparkSession
from pyspark.sql.functions import*

from pyspark.sql.types import StructType, StructField, StringType, IntegerType, BooleanType, DoubleType

In [17]:
spark = SparkSession\
    .builder\
    .appName("capstone")\
    .config("spark.jars","/Users/roy/Downloads/mysql-connector-j-8.0.32/mysql-connector-j-8.0.32.jar")\
    .getOrCreate()

CC_DF = spark.read.format("json") \
    .option("header", True) \
    .load("cdw_sapp_credit.json")
CC_DF.show(5)

+-----------+----------------+---------+---+-----+--------------+----------------+-----------------+----+
|BRANCH_CODE|  CREDIT_CARD_NO| CUST_SSN|DAY|MONTH|TRANSACTION_ID|TRANSACTION_TYPE|TRANSACTION_VALUE|YEAR|
+-----------+----------------+---------+---+-----+--------------+----------------+-----------------+----+
|        114|4210653349028689|123459988| 14|    2|             1|       Education|             78.9|2018|
|         35|4210653349028689|123459988| 20|    3|             2|   Entertainment|            14.24|2018|
|        160|4210653349028689|123459988|  8|    7|             3|         Grocery|             56.7|2018|
|        114|4210653349028689|123459988| 19|    4|             4|   Entertainment|            59.73|2018|
|         93|4210653349028689|123459988| 10|   10|             5|             Gas|             3.59|2018|
+-----------+----------------+---------+---+-----+--------------+----------------+-----------------+----+
only showing top 5 rows



In [18]:
CC_DF.printSchema()

root
 |-- BRANCH_CODE: long (nullable = true)
 |-- CREDIT_CARD_NO: string (nullable = true)
 |-- CUST_SSN: long (nullable = true)
 |-- DAY: long (nullable = true)
 |-- MONTH: long (nullable = true)
 |-- TRANSACTION_ID: long (nullable = true)
 |-- TRANSACTION_TYPE: string (nullable = true)
 |-- TRANSACTION_VALUE: double (nullable = true)
 |-- YEAR: long (nullable = true)



Mapping Logics

In [19]:
#Convert DAY, MONTH, and YEAR into a TIMEID (YYYYMMDD)
from pyspark.sql.functions import concat_ws, lpad

# create a new column called "TIMEID" by concatenating YEAR, MONTH, and DAY columns
CC_DF = CC_DF.withColumn("TIMEID", concat_ws("", "YEAR", lpad("MONTH", 2, "0"), lpad("DAY", 2, "0")))

# display the new data frame with the TIMEID column drop the others
CC_DF.show(1)


+-----------+----------------+---------+---+-----+--------------+----------------+-----------------+----+--------+
|BRANCH_CODE|  CREDIT_CARD_NO| CUST_SSN|DAY|MONTH|TRANSACTION_ID|TRANSACTION_TYPE|TRANSACTION_VALUE|YEAR|  TIMEID|
+-----------+----------------+---------+---+-----+--------------+----------------+-----------------+----+--------+
|        114|4210653349028689|123459988| 14|    2|             1|       Education|             78.9|2018|20180214|
+-----------+----------------+---------+---+-----+--------------+----------------+-----------------+----+--------+
only showing top 1 row



In [20]:
CC_DF.write.format("jdbc") \
  .option("url", "jdbc:mysql://localhost:3306/creditcard_capstone") \
    .option("dbtable", "creditcard_capstone.CDW_SAPP_CREDIT_CARD") \
          .option("user", "root") \
            .option("password", "ShaShi3493*") \
            .mode("overwrite")\
            .save()
CC_DF.show(1)



+-----------+----------------+---------+---+-----+--------------+----------------+-----------------+----+--------+
|BRANCH_CODE|  CREDIT_CARD_NO| CUST_SSN|DAY|MONTH|TRANSACTION_ID|TRANSACTION_TYPE|TRANSACTION_VALUE|YEAR|  TIMEID|
+-----------+----------------+---------+---+-----+--------------+----------------+-----------------+----+--------+
|        114|4210653349028689|123459988| 14|    2|             1|       Education|             78.9|2018|20180214|
+-----------+----------------+---------+---+-----+--------------+----------------+-----------------+----+--------+
only showing top 1 row



                                                                                

In [21]:
# making csv files
import pandas as pd

# convert PySpark DataFrame to Pandas DataFrame
cc_pd = CC_DF.toPandas()

# save Pandas DataFrame to CSV file
cc_pd.to_csv('creditcard.csv', index=False)

cc_pd.head(2)

Unnamed: 0,BRANCH_CODE,CREDIT_CARD_NO,CUST_SSN,DAY,MONTH,TRANSACTION_ID,TRANSACTION_TYPE,TRANSACTION_VALUE,YEAR,TIMEID
0,114,4210653349028689,123459988,14,2,1,Education,78.9,2018,20180214
1,35,4210653349028689,123459988,20,3,2,Entertainment,14.24,2018,20180320
