In [1]:
import requests
import findspark
import pandas as pd
import numpy as np
findspark.init()
from pyspark.sql import SparkSession
from pyspark.sql.functions import*
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, BooleanType, DoubleType
from pyspark import SparkContext
from pyspark.sql.functions import monotonically_increasing_id

In [2]:
spark = SparkSession.builder.appName("Customer_App").getOrCreate()

In [3]:
df_customer = spark.read.json(r'C:\suneetha\Python_labs\Capstone_project\CreditCard_dataset\cdw_sapp_custmer.json')

In [4]:
df_customer.printSchema()

root
 |-- APT_NO: string (nullable = true)
 |-- CREDIT_CARD_NO: string (nullable = true)
 |-- CUST_CITY: string (nullable = true)
 |-- CUST_COUNTRY: string (nullable = true)
 |-- CUST_EMAIL: string (nullable = true)
 |-- CUST_PHONE: long (nullable = true)
 |-- CUST_STATE: string (nullable = true)
 |-- CUST_ZIP: string (nullable = true)
 |-- FIRST_NAME: string (nullable = true)
 |-- LAST_NAME: string (nullable = true)
 |-- LAST_UPDATED: string (nullable = true)
 |-- MIDDLE_NAME: string (nullable = true)
 |-- SSN: long (nullable = true)
 |-- STREET_NAME: string (nullable = true)



In [5]:
df_customer.show()

+------+----------------+------------+-------------+--------------------+----------+----------+--------+----------+---------+--------------------+-----------+---------+-----------------+
|APT_NO|  CREDIT_CARD_NO|   CUST_CITY| CUST_COUNTRY|          CUST_EMAIL|CUST_PHONE|CUST_STATE|CUST_ZIP|FIRST_NAME|LAST_NAME|        LAST_UPDATED|MIDDLE_NAME|      SSN|      STREET_NAME|
+------+----------------+------------+-------------+--------------------+----------+----------+--------+----------+---------+--------------------+-----------+---------+-----------------+
|   656|4210653310061055|     Natchez|United States| AHooper@example.com|   1237818|        MS|   39120|      Alec|   Hooper|2018-04-21T12:49:...|         Wm|123456100|Main Street North|
|   829|4210653310102868|Wethersfield|United States| EHolman@example.com|   1238933|        CT|   06109|      Etta|   Holman|2018-04-21T12:49:...|    Brendan|123453023|    Redwood Drive|
|   683|4210653310116272|     Huntley|United States| WDunham@exam

In [6]:
#Convert First_Name to Title Case
df_customer.withColumn('FIRST_NAME', initcap(df_customer.FIRST_NAME))

df_customer.withColumn('MIDDLE_NAME', lower(df_customer.MIDDLE_NAME))
df_customer.withColumn('LAST_NAME', initcap(df_customer.LAST_NAME))
df_customer.show()

+------+----------------+------------+-------------+--------------------+----------+----------+--------+----------+---------+--------------------+-----------+---------+-----------------+
|APT_NO|  CREDIT_CARD_NO|   CUST_CITY| CUST_COUNTRY|          CUST_EMAIL|CUST_PHONE|CUST_STATE|CUST_ZIP|FIRST_NAME|LAST_NAME|        LAST_UPDATED|MIDDLE_NAME|      SSN|      STREET_NAME|
+------+----------------+------------+-------------+--------------------+----------+----------+--------+----------+---------+--------------------+-----------+---------+-----------------+
|   656|4210653310061055|     Natchez|United States| AHooper@example.com|   1237818|        MS|   39120|      Alec|   Hooper|2018-04-21T12:49:...|         Wm|123456100|Main Street North|
|   829|4210653310102868|Wethersfield|United States| EHolman@example.com|   1238933|        CT|   06109|      Etta|   Holman|2018-04-21T12:49:...|    Brendan|123453023|    Redwood Drive|
|   683|4210653310116272|     Huntley|United States| WDunham@exam

In [7]:
df_customer = df_customer.withColumn('FULL_STREET_ADDRESS', concat(df_customer['APT_NO'], lit(', '), df_customer['STREET_NAME']))
df_customer['FULL_STREET_ADDRESS', 'APT_NO', 'STREET_NAME'].show()

+--------------------+------+-----------------+
| FULL_STREET_ADDRESS|APT_NO|      STREET_NAME|
+--------------------+------+-----------------+
|656, Main Street ...|   656|Main Street North|
|  829, Redwood Drive|   829|    Redwood Drive|
|683, 12th Street ...|   683| 12th Street East|
|253, Country Club...|   253|Country Club Road|
| 301, Madison Street|   301|   Madison Street|
|   3, Colonial Drive|     3|   Colonial Drive|
|  84, Belmont Avenue|    84|   Belmont Avenue|
|   728, Oxford Court|   728|     Oxford Court|
|   81, Forest Street|    81|    Forest Street|
|   561, Court Street|   561|     Court Street|
|  622, Cypress Court|   622|    Cypress Court|
|924, 8th Street West|   924|  8th Street West|
|    611, East Avenue|   611|      East Avenue|
|       680, Route 44|   680|         Route 44|
|   71, Warren Street|    71|    Warren Street|
|   195, Jones Street|   195|     Jones Street|
|     500, New Street|   500|       New Street|
|989, Division Street|   989|  Division 

In [8]:
df_customer = df_customer.drop('APT_NO', 'STREET_NAME')
df_customer.show()

+----------------+------------+-------------+--------------------+----------+----------+--------+----------+---------+--------------------+-----------+---------+--------------------+
|  CREDIT_CARD_NO|   CUST_CITY| CUST_COUNTRY|          CUST_EMAIL|CUST_PHONE|CUST_STATE|CUST_ZIP|FIRST_NAME|LAST_NAME|        LAST_UPDATED|MIDDLE_NAME|      SSN| FULL_STREET_ADDRESS|
+----------------+------------+-------------+--------------------+----------+----------+--------+----------+---------+--------------------+-----------+---------+--------------------+
|4210653310061055|     Natchez|United States| AHooper@example.com|   1237818|        MS|   39120|      Alec|   Hooper|2018-04-21T12:49:...|         Wm|123456100|656, Main Street ...|
|4210653310102868|Wethersfield|United States| EHolman@example.com|   1238933|        CT|   06109|      Etta|   Holman|2018-04-21T12:49:...|    Brendan|123453023|  829, Redwood Drive|
|4210653310116272|     Huntley|United States| WDunham@example.com|   1243018|        

In [9]:
df_customer = df_customer.withColumn('CUST_PHONE', df_customer['CUST_PHONE'].cast('string'))
# df_cust = df_cust.select(df_cust['CUST_PHONE'].cast('string'))
df_customer.printSchema()

root
 |-- CREDIT_CARD_NO: string (nullable = true)
 |-- CUST_CITY: string (nullable = true)
 |-- CUST_COUNTRY: string (nullable = true)
 |-- CUST_EMAIL: string (nullable = true)
 |-- CUST_PHONE: string (nullable = true)
 |-- CUST_STATE: string (nullable = true)
 |-- CUST_ZIP: string (nullable = true)
 |-- FIRST_NAME: string (nullable = true)
 |-- LAST_NAME: string (nullable = true)
 |-- LAST_UPDATED: string (nullable = true)
 |-- MIDDLE_NAME: string (nullable = true)
 |-- SSN: long (nullable = true)
 |-- FULL_STREET_ADDRESS: string (nullable = true)



In [10]:
df_customer = df_customer.withColumn('CUST_PHONE', concat(lit('(610)'), df_customer['CUST_PHONE'].substr(1,3), lit('-'), df_customer['CUST_PHONE'].substr(4,4)))
df_customer.select('CUST_PHONE').show()

+-------------+
|   CUST_PHONE|
+-------------+
|(610)123-7818|
|(610)123-8933|
|(610)124-3018|
|(610)124-3215|
|(610)124-2074|
|(610)124-2570|
|(610)123-9685|
|(610)123-8213|
|(610)124-0689|
|(610)123-5222|
|(610)124-1363|
|(610)123-6228|
|(610)123-8165|
|(610)123-4730|
|(610)124-2113|
|(610)123-9888|
|(610)124-0158|
|(610)124-1408|
|(610)123-8390|
|(610)123-5067|
+-------------+
only showing top 20 rows



In [11]:
df_customer.write.format("jdbc") \
  .mode("append") \
  .option("url", "jdbc:mysql://localhost:3306/classicmodels") \
  .option("dbtable", "capstone_project.cust1") \
  .option("user", "root") \
  .option("password", "password") \
  .save()

In [12]:
spark.stop()