In [2]:
import findspark
findspark.init()

from pyspark.sql import SparkSession
from pyspark.sql.functions import*

from pyspark.sql.types import StructType, StructField, StringType, IntegerType, BooleanType, DoubleType

In [3]:
# 2. Initializing SparkSession

spark = SparkSession.builder.appName('creditinfo').getOrCreate()

In [4]:
df_customer = spark.read.json("cdw_sapp_custmer.json")


In [5]:
df_customer.printSchema()

root
 |-- APT_NO: string (nullable = true)
 |-- CREDIT_CARD_NO: string (nullable = true)
 |-- CUST_CITY: string (nullable = true)
 |-- CUST_COUNTRY: string (nullable = true)
 |-- CUST_EMAIL: string (nullable = true)
 |-- CUST_PHONE: long (nullable = true)
 |-- CUST_STATE: string (nullable = true)
 |-- CUST_ZIP: string (nullable = true)
 |-- FIRST_NAME: string (nullable = true)
 |-- LAST_NAME: string (nullable = true)
 |-- LAST_UPDATED: string (nullable = true)
 |-- MIDDLE_NAME: string (nullable = true)
 |-- SSN: long (nullable = true)
 |-- STREET_NAME: string (nullable = true)



In [6]:
df_credit = spark.read.json("cdw_sapp_credit.json")


In [7]:
df_credit.printSchema()

root
 |-- BRANCH_CODE: long (nullable = true)
 |-- CREDIT_CARD_NO: string (nullable = true)
 |-- CUST_SSN: long (nullable = true)
 |-- DAY: long (nullable = true)
 |-- MONTH: long (nullable = true)
 |-- TRANSACTION_ID: long (nullable = true)
 |-- TRANSACTION_TYPE: string (nullable = true)
 |-- TRANSACTION_VALUE: double (nullable = true)
 |-- YEAR: long (nullable = true)



In [8]:
df_branch = spark.read.json("cdw_sapp_branch.json")


In [9]:
df_branch.printSchema()

root
 |-- BRANCH_CITY: string (nullable = true)
 |-- BRANCH_CODE: long (nullable = true)
 |-- BRANCH_NAME: string (nullable = true)
 |-- BRANCH_PHONE: string (nullable = true)
 |-- BRANCH_STATE: string (nullable = true)
 |-- BRANCH_STREET: string (nullable = true)
 |-- BRANCH_ZIP: long (nullable = true)
 |-- LAST_UPDATED: string (nullable = true)



In [10]:
df_branch.show(5)

+-----------------+-----------+------------+------------+------------+-----------------+----------+--------------------+
|      BRANCH_CITY|BRANCH_CODE| BRANCH_NAME|BRANCH_PHONE|BRANCH_STATE|    BRANCH_STREET|BRANCH_ZIP|        LAST_UPDATED|
+-----------------+-----------+------------+------------+------------+-----------------+----------+--------------------+
|        Lakeville|          1|Example Bank|  1234565276|          MN|     Bridle Court|     55044|2018-04-18T16:51:...|
|          Huntley|          2|Example Bank|  1234618993|          IL|Washington Street|     60142|2018-04-18T16:51:...|
|SouthRichmondHill|          3|Example Bank|  1234985926|          NY|    Warren Street|     11419|2018-04-18T16:51:...|
|       Middleburg|          4|Example Bank|  1234663064|          FL| Cleveland Street|     32068|2018-04-18T16:51:...|
|    KingOfPrussia|          5|Example Bank|  1234849701|          PA|      14th Street|     19406|2018-04-18T16:51:...|
+-----------------+-----------+-

df.withColumn("species", initcap(col('species'))
df = spark.createDataFrame([('100-200',)], ['str'])
df.select(regexp_replace('str', r'(\d+)', '--').alias('d')).collect()
[Row(d='-----')]


In [11]:
df_cutomer=df_customer.withColumn('FIRST_NAME', initcap(df_customer.FIRST_NAME))

df_customer = df_customer.withColumn('LAST_NAME', initcap(df_customer.LAST_NAME))
df_customer = df_customer.withColumn('MIDDLE_NAME', lower(df_customer.MIDDLE_NAME))
df_customer.show(5)

+------+----------------+------------+-------------+-------------------+----------+----------+--------+----------+---------+--------------------+-----------+---------+-----------------+
|APT_NO|  CREDIT_CARD_NO|   CUST_CITY| CUST_COUNTRY|         CUST_EMAIL|CUST_PHONE|CUST_STATE|CUST_ZIP|FIRST_NAME|LAST_NAME|        LAST_UPDATED|MIDDLE_NAME|      SSN|      STREET_NAME|
+------+----------------+------------+-------------+-------------------+----------+----------+--------+----------+---------+--------------------+-----------+---------+-----------------+
|   656|4210653310061055|     Natchez|United States|AHooper@example.com|   1237818|        MS|   39120|      Alec|   Hooper|2018-04-21T12:49:...|         wm|123456100|Main Street North|
|   829|4210653310102868|Wethersfield|United States|EHolman@example.com|   1238933|        CT|   06109|      Etta|   Holman|2018-04-21T12:49:...|    brendan|123453023|    Redwood Drive|
|   683|4210653310116272|     Huntley|United States|WDunham@example.co

In [12]:
df_customer=df_customer.withColumn('FULL_STREET_ADDRESS',concat(df_customer['APT_NO'],lit(', '),df_customer['STREET_NAME']))
df_customer['FULL_STREET_ADDRESS', 'APT_NO', 'STREET_NAME'].show(5)
#df_cust['FULL_STREET_ADDRESS', 'APT_NO', 'STREET_NAME'].show()


+--------------------+------+-----------------+
| FULL_STREET_ADDRESS|APT_NO|      STREET_NAME|
+--------------------+------+-----------------+
|656, Main Street ...|   656|Main Street North|
|  829, Redwood Drive|   829|    Redwood Drive|
|683, 12th Street ...|   683| 12th Street East|
|253, Country Club...|   253|Country Club Road|
| 301, Madison Street|   301|   Madison Street|
+--------------------+------+-----------------+
only showing top 5 rows



In [13]:
df_customer = df_customer.drop('APT_NO', 'STREET_NAME')
df_customer.show(5)

+----------------+------------+-------------+-------------------+----------+----------+--------+----------+---------+--------------------+-----------+---------+--------------------+
|  CREDIT_CARD_NO|   CUST_CITY| CUST_COUNTRY|         CUST_EMAIL|CUST_PHONE|CUST_STATE|CUST_ZIP|FIRST_NAME|LAST_NAME|        LAST_UPDATED|MIDDLE_NAME|      SSN| FULL_STREET_ADDRESS|
+----------------+------------+-------------+-------------------+----------+----------+--------+----------+---------+--------------------+-----------+---------+--------------------+
|4210653310061055|     Natchez|United States|AHooper@example.com|   1237818|        MS|   39120|      Alec|   Hooper|2018-04-21T12:49:...|         wm|123456100|656, Main Street ...|
|4210653310102868|Wethersfield|United States|EHolman@example.com|   1238933|        CT|   06109|      Etta|   Holman|2018-04-21T12:49:...|    brendan|123453023|  829, Redwood Drive|
|4210653310116272|     Huntley|United States|WDunham@example.com|   1243018|        IL|   

In [14]:
df_customer = df_customer.withColumn('CUST_PHONE', df_customer['CUST_PHONE'].cast('string'))
# df_cust = df_cust.select(df_cust['CUST_PHONE'].cast('string'))
df_customer.printSchema()


root
 |-- CREDIT_CARD_NO: string (nullable = true)
 |-- CUST_CITY: string (nullable = true)
 |-- CUST_COUNTRY: string (nullable = true)
 |-- CUST_EMAIL: string (nullable = true)
 |-- CUST_PHONE: string (nullable = true)
 |-- CUST_STATE: string (nullable = true)
 |-- CUST_ZIP: string (nullable = true)
 |-- FIRST_NAME: string (nullable = true)
 |-- LAST_NAME: string (nullable = true)
 |-- LAST_UPDATED: string (nullable = true)
 |-- MIDDLE_NAME: string (nullable = true)
 |-- SSN: long (nullable = true)
 |-- FULL_STREET_ADDRESS: string (nullable = true)



In [15]:
df_customer=df_customer.withColumn('CUST_PHONE', concat(lit('(224)'), df_customer['CUST_PHONE'].substr(1,3), lit('-'), df_customer['CUST_PHONE'].substr(4,4)))
df_customer.show()

+----------------+------------+-------------+--------------------+-------------+----------+--------+----------+---------+--------------------+-----------+---------+--------------------+
|  CREDIT_CARD_NO|   CUST_CITY| CUST_COUNTRY|          CUST_EMAIL|   CUST_PHONE|CUST_STATE|CUST_ZIP|FIRST_NAME|LAST_NAME|        LAST_UPDATED|MIDDLE_NAME|      SSN| FULL_STREET_ADDRESS|
+----------------+------------+-------------+--------------------+-------------+----------+--------+----------+---------+--------------------+-----------+---------+--------------------+
|4210653310061055|     Natchez|United States| AHooper@example.com|(224)123-7818|        MS|   39120|      Alec|   Hooper|2018-04-21T12:49:...|         wm|123456100|656, Main Street ...|
|4210653310102868|Wethersfield|United States| EHolman@example.com|(224)123-8933|        CT|   06109|      Etta|   Holman|2018-04-21T12:49:...|    brendan|123453023|  829, Redwood Drive|
|4210653310116272|     Huntley|United States| WDunham@example.com|(224

In [16]:
df_customer = df_customer.withColumn('CUST_ZIP', df_customer['CUST_ZIP'].cast('int'))


In [17]:
df_customer.printSchema()

root
 |-- CREDIT_CARD_NO: string (nullable = true)
 |-- CUST_CITY: string (nullable = true)
 |-- CUST_COUNTRY: string (nullable = true)
 |-- CUST_EMAIL: string (nullable = true)
 |-- CUST_PHONE: string (nullable = true)
 |-- CUST_STATE: string (nullable = true)
 |-- CUST_ZIP: integer (nullable = true)
 |-- FIRST_NAME: string (nullable = true)
 |-- LAST_NAME: string (nullable = true)
 |-- LAST_UPDATED: string (nullable = true)
 |-- MIDDLE_NAME: string (nullable = true)
 |-- SSN: long (nullable = true)
 |-- FULL_STREET_ADDRESS: string (nullable = true)



In [18]:
df_customer = df_customer.withColumn('LAST_UPDATED', df_customer['LAST_UPDATED'].cast('timestamp'))


In [19]:
df_customer['LAST_UPDATED','CUST_ZIP'].show()

+-------------------+--------+
|       LAST_UPDATED|CUST_ZIP|
+-------------------+--------+
|2018-04-21 12:49:02|   39120|
|2018-04-21 12:49:02|    6109|
|2018-04-21 12:49:02|   60142|
|2018-04-21 12:49:02|   53151|
|2018-04-21 12:49:02|   79930|
|2018-04-21 12:49:02|   44070|
|2018-04-21 12:49:02|   22180|
|2018-04-21 12:49:02|   91010|
|2018-04-21 12:49:02|   48867|
|2018-04-21 12:49:02|   60099|
|2018-04-21 12:49:02|   44512|
|2018-04-21 12:49:02|   29483|
|2018-04-21 12:49:02|   79930|
|2018-04-21 12:49:02|   48430|
|2018-04-21 12:49:02|   49418|
|2018-04-21 12:49:02|   95993|
|2018-04-21 12:49:02|   33904|
|2018-04-21 12:49:02|   53045|
|2018-04-21 12:49:02|   23223|
|2018-04-21 12:49:02|   19380|
+-------------------+--------+
only showing top 20 rows



In [20]:
df_customer.write.format("jdbc") \
  .mode("append") \
  .option("url", "jdbc:mysql://localhost:3306/creditcard_capstone") \
  .option("dbtable", "CDW_SAPP_CUSTOMER") \
  .option("user", "root") \
  .option("password", "password") \
  .save()

In [21]:
df_new = spark.read.format("jdbc") \
  .option("url", "jdbc:mysql://localhost:3306/creditcard_capstone") \
  .option("dbtable", "CDW_SAPP_CUSTOMER") \
  .option("user", "root") \
  .option("password", "password") \
  .load()

df_new.show()


+----------------+------------+-------------+--------------------+-------------+----------+--------+----------+---------+-------------------+-----------+---------+--------------------+
|  CREDIT_CARD_NO|   CUST_CITY| CUST_COUNTRY|          CUST_EMAIL|   CUST_PHONE|CUST_STATE|CUST_ZIP|FIRST_NAME|LAST_NAME|       LAST_UPDATED|MIDDLE_NAME|      SSN| FULL_STREET_ADDRESS|
+----------------+------------+-------------+--------------------+-------------+----------+--------+----------+---------+-------------------+-----------+---------+--------------------+
|4210653310061055|     Natchez|United States| AHooper@example.com|(224)123-7818|        MS|   39120|      Alec|   Hooper|2018-04-21 12:49:02|         wm|123456100|656, Main Street ...|
|4210653310102868|Wethersfield|United States| EHolman@example.com|(224)123-8933|        CT|    6109|      Etta|   Holman|2018-04-21 12:49:02|    brendan|123453023|  829, Redwood Drive|
|4210653310116272|     Huntley|United States| WDunham@example.com|(224)124-