In [1]:
import findspark
findspark.init()
import pyspark
from pyspark.sql import SparkSession
from pyspark.sql.types import (StructField,StringType,IntegerType,StructType,TimestampType)
from pyspark.sql.functions import *
import pandas as pd


In [2]:
spark = SparkSession.builder.appName('SparkByExample').getOrCreate()

In [3]:
df = spark.read.json("cdw_sapp_custmer.json")
df.printSchema()
#df.show()

root
 |-- APT_NO: string (nullable = true)
 |-- CREDIT_CARD_NO: string (nullable = true)
 |-- CUST_CITY: string (nullable = true)
 |-- CUST_COUNTRY: string (nullable = true)
 |-- CUST_EMAIL: string (nullable = true)
 |-- CUST_PHONE: long (nullable = true)
 |-- CUST_STATE: string (nullable = true)
 |-- CUST_ZIP: string (nullable = true)
 |-- FIRST_NAME: string (nullable = true)
 |-- LAST_NAME: string (nullable = true)
 |-- LAST_UPDATED: string (nullable = true)
 |-- MIDDLE_NAME: string (nullable = true)
 |-- SSN: long (nullable = true)
 |-- STREET_NAME: string (nullable = true)



Changing the datatypes

In [4]:
data_schema = [StructField('ssn', IntegerType(), True),
               StructField('CUST_ZIP',IntegerType(),True),
               StructField('APT_NO', StringType(),True),
               StructField('CREDIT_CARD_NO',StringType(),True),
               StructField('CUST_CITY',StringType(),True),
               StructField('CUST_COUNTRY',StringType(),True),
               StructField('CUST_EMAIL',StringType(),True),
               StructField('CUST_PHONE', StringType(),True),
               StructField('CUST_STATE',StringType(),True),
               StructField('FIRST_NAME',StringType(),True),
               StructField('LAST_NAME',StringType(),True),
               StructField('MIDDLE_NAME',StringType(),True),
               StructField('STREET_NAME',StringType(),True),
               StructField('LAST_UPDATED',TimestampType(),True)]


In [5]:
final_struc = StructType(fields=data_schema)

In [6]:
df = spark.read.json('cdw_sapp_custmer.json',schema=final_struc)

In [7]:
df.printSchema()

root
 |-- ssn: integer (nullable = true)
 |-- CUST_ZIP: integer (nullable = true)
 |-- APT_NO: string (nullable = true)
 |-- CREDIT_CARD_NO: string (nullable = true)
 |-- CUST_CITY: string (nullable = true)
 |-- CUST_COUNTRY: string (nullable = true)
 |-- CUST_EMAIL: string (nullable = true)
 |-- CUST_PHONE: string (nullable = true)
 |-- CUST_STATE: string (nullable = true)
 |-- FIRST_NAME: string (nullable = true)
 |-- LAST_NAME: string (nullable = true)
 |-- MIDDLE_NAME: string (nullable = true)
 |-- STREET_NAME: string (nullable = true)
 |-- LAST_UPDATED: timestamp (nullable = true)



In [8]:
df.head(5)


[Row(ssn=None, CUST_ZIP=None, APT_NO='656', CREDIT_CARD_NO='4210653310061055', CUST_CITY='Natchez', CUST_COUNTRY='United States', CUST_EMAIL='AHooper@example.com', CUST_PHONE='1237818', CUST_STATE='MS', FIRST_NAME='Alec', LAST_NAME='Hooper', MIDDLE_NAME='Wm', STREET_NAME='Main Street North', LAST_UPDATED=datetime.datetime(2018, 4, 21, 12, 49, 2)),
 Row(ssn=None, CUST_ZIP=None, APT_NO='829', CREDIT_CARD_NO='4210653310102868', CUST_CITY='Wethersfield', CUST_COUNTRY='United States', CUST_EMAIL='EHolman@example.com', CUST_PHONE='1238933', CUST_STATE='CT', FIRST_NAME='Etta', LAST_NAME='Holman', MIDDLE_NAME='Brendan', STREET_NAME='Redwood Drive', LAST_UPDATED=datetime.datetime(2018, 4, 21, 12, 49, 2)),
 Row(ssn=None, CUST_ZIP=None, APT_NO='683', CREDIT_CARD_NO='4210653310116272', CUST_CITY='Huntley', CUST_COUNTRY='United States', CUST_EMAIL='WDunham@example.com', CUST_PHONE='1243018', CUST_STATE='IL', FIRST_NAME='Wilber', LAST_NAME='Dunham', MIDDLE_NAME='Ezequiel', STREET_NAME='12th Street E

Convert the first_name to Title case and lastname

In [9]:
df = df.withColumn("FIRST_NAME", initcap(col('FIRST_NAME'))).withColumn("LAST_NAME",initcap(col('LAST_NAME')))

change the middle name to lowercase

In [10]:
df = df.withColumn('MIDDLE_NAME',lower(col('MIDDLE_NAME')))

In [None]:
df.show()

Concatenate aptno and streetname with comma and give column name full_street_address

In [11]:
df = df.withColumn("FULL_STREET_ADDRESS", concat_ws(",",col('APT_NO'),col('STREET_NAME'))).drop("FIRST_NAME").drop("STREET_NAME")


In [12]:
df.show()

+----+--------+------+----------------+------------+-------------+--------------------+----------+----------+---------+-----------+-------------------+--------------------+
| ssn|CUST_ZIP|APT_NO|  CREDIT_CARD_NO|   CUST_CITY| CUST_COUNTRY|          CUST_EMAIL|CUST_PHONE|CUST_STATE|LAST_NAME|MIDDLE_NAME|       LAST_UPDATED| FULL_STREET_ADDRESS|
+----+--------+------+----------------+------------+-------------+--------------------+----------+----------+---------+-----------+-------------------+--------------------+
|null|    null|   656|4210653310061055|     Natchez|United States| AHooper@example.com|   1237818|        MS|   Hooper|         wm|2018-04-21 12:49:02|656,Main Street N...|
|null|    null|   829|4210653310102868|Wethersfield|United States| EHolman@example.com|   1238933|        CT|   Holman|    brendan|2018-04-21 12:49:02|   829,Redwood Drive|
|null|    null|   683|4210653310116272|     Huntley|United States| WDunham@example.com|   1243018|        IL|   Dunham|   ezequiel|2018

In [41]:
df.printSchema()

root
 |-- ssn: integer (nullable = true)
 |-- CUST_ZIP: integer (nullable = true)
 |-- APT_NO: string (nullable = true)
 |-- CREDIT_CARD_NO: string (nullable = true)
 |-- CUST_CITY: string (nullable = true)
 |-- CUST_COUNTRY: string (nullable = true)
 |-- CUST_EMAIL: string (nullable = true)
 |-- CUST_PHONE: string (nullable = true)
 |-- CUST_STATE: string (nullable = true)
 |-- LAST_NAME: string (nullable = true)
 |-- MIDDLE_NAME: string (nullable = true)
 |-- LAST_UPDATED: timestamp (nullable = true)
 |-- FULL_STREET_ADDRESS: string (nullable = false)



In [36]:
df = df.withColumn("CUST_PHONE", regexp_replace(df.CUST_PHONE, "(\d{3})(\d{3})(\d{1})", "($1) $2-$3"))

In [37]:
df.show()

+----+--------+------+----------------+------------+-------------+--------------------+-----------+----------+---------+-----------+-------------------+--------------------+
| ssn|CUST_ZIP|APT_NO|  CREDIT_CARD_NO|   CUST_CITY| CUST_COUNTRY|          CUST_EMAIL| CUST_PHONE|CUST_STATE|LAST_NAME|MIDDLE_NAME|       LAST_UPDATED| FULL_STREET_ADDRESS|
+----+--------+------+----------------+------------+-------------+--------------------+-----------+----------+---------+-----------+-------------------+--------------------+
|null|    null|   656|4210653310061055|     Natchez|United States| AHooper@example.com|(123) 781-8|        MS|   Hooper|         wm|2018-04-21 12:49:02|656,Main Street N...|
|null|    null|   829|4210653310102868|Wethersfield|United States| EHolman@example.com|(123) 893-3|        CT|   Holman|    brendan|2018-04-21 12:49:02|   829,Redwood Drive|
|null|    null|   683|4210653310116272|     Huntley|United States| WDunham@example.com|(124) 301-8|        IL|   Dunham|   ezequie

In [13]:
pandasDF = df.toPandas()
pandasDF

  series = series.astype(t, copy=False)


Unnamed: 0,ssn,CUST_ZIP,APT_NO,CREDIT_CARD_NO,CUST_CITY,CUST_COUNTRY,CUST_EMAIL,CUST_PHONE,CUST_STATE,LAST_NAME,MIDDLE_NAME,LAST_UPDATED,FULL_STREET_ADDRESS
0,,,656,4210653310061055,Natchez,United States,AHooper@example.com,1237818,MS,Hooper,wm,2018-04-21 12:49:02,"656,Main Street North"
1,,,829,4210653310102868,Wethersfield,United States,EHolman@example.com,1238933,CT,Holman,brendan,2018-04-21 12:49:02,"829,Redwood Drive"
2,,,683,4210653310116272,Huntley,United States,WDunham@example.com,1243018,IL,Dunham,ezequiel,2018-04-21 12:49:02,"683,12th Street East"
3,,,253,4210653310195948,NewBerlin,United States,EHardy@example.com,1243215,WI,Hardy,trina,2018-04-21 12:49:02,"253,Country Club Road"
4,,,301,4210653310356919,ElPaso,United States,WAyers@example.com,1242074,TX,Ayers,may,2018-04-21 12:49:02,"301,Madison Street"
...,...,...,...,...,...,...,...,...,...,...,...,...,...
947,,,882,4210653399559239,SiouxCity,United States,FCastle@example.com,1238344,IA,Castle,jonah,2018-04-21 12:49:02,"882,Main Street South"
948,,,470,4210653399650358,Summerville,United States,FBlock@example.com,1240420,SC,Block,aron,2018-04-21 12:49:02,"470,Glenwood Avenue"
949,,,405,4210653399732638,Duluth,United States,DPruitt@example.com,1236149,GA,Pruitt,lucas,2018-04-21 12:49:02,"405,Country Lane"
950,,,15,4210653399859149,Rowlett,United States,EBeatty@example.com,1236886,TX,Beatty,susanna,2018-04-21 12:49:02,"15,8th Street"


In [15]:
df.printSchema()

root
 |-- ssn: integer (nullable = true)
 |-- CUST_ZIP: integer (nullable = true)
 |-- APT_NO: string (nullable = true)
 |-- CREDIT_CARD_NO: string (nullable = true)
 |-- CUST_CITY: string (nullable = true)
 |-- CUST_COUNTRY: string (nullable = true)
 |-- CUST_EMAIL: string (nullable = true)
 |-- CUST_PHONE: string (nullable = true)
 |-- CUST_STATE: string (nullable = true)
 |-- LAST_NAME: string (nullable = true)
 |-- MIDDLE_NAME: string (nullable = true)
 |-- LAST_UPDATED: timestamp (nullable = true)
 |-- FULL_STREET_ADDRESS: string (nullable = false)



In [16]:
pandasDF.dtypes

ssn                           float64
CUST_ZIP                      float64
APT_NO                         object
CREDIT_CARD_NO                 object
CUST_CITY                      object
CUST_COUNTRY                   object
CUST_EMAIL                     object
CUST_PHONE                     object
CUST_STATE                     object
LAST_NAME                      object
MIDDLE_NAME                    object
LAST_UPDATED           datetime64[ns]
FULL_STREET_ADDRESS            object
dtype: object

In [31]:
from dataprep.clean import clean_phone
pandasDF = pandasDF['CUST_PHONE']
clean_phone(pandasDF, df['CUST_PHONE'], output_format='national')



KeyError: 'CUST_PHONE'

In [27]:
s = pandasDF['CUST_PHONE']
phone_nos = '(' + s.str[:3] + ')' + s.str[3:6] + '-' + s.str[6:]


change the phone number format

In [83]:
def format_phone_number(phone_number):
    return "({}) {}-{}".format(phone_number[0:3],phone_number[3:6],phone_number[6:])
    #return phone_number

formatted_number = udf(format_phone_number)
df.select(col("CUST_PHONE"),
    formatted_number(col("CUST_PHONE")))

#df = df.withColumn("CUST_PHONE",format_phone_number(df["CUST_PHONE"]))
#df['PHONE_NUM'] = df.apply(lambda row: formatted_number(row['CUST_PHONE']), axis=1)



DataFrame[CUST_PHONE: string, format_phone_number(CUST_PHONE): string]

In [86]:
df.head()

Row(ssn=None, CUST_ZIP=None, APT_NO='656', CREDIT_CARD_NO='4210653310061055', CUST_CITY='Natchez', CUST_COUNTRY='United States', CUST_EMAIL='AHooper@example.com', CUST_PHONE='1237818', CUST_STATE='MS', FIRST_NAME='Alec', LAST_NAME='Hooper', MIDDLE_NAME='wm', STREET_NAME='Main Street North', LAST_UPDATED=datetime.datetime(2018, 4, 21, 12, 49, 2))

In [85]:
panda_df.head()

Unnamed: 0,ssn,CUST_ZIP,APT_NO,CREDIT_CARD_NO,CUST_CITY,CUST_COUNTRY,CUST_EMAIL,CUST_PHONE,CUST_STATE,LAST_NAME,MIDDLE_NAME,LAST_UPDATED,FULL_STREET_ADDRESS,test
0,,,656,4210653310061055,Natchez,United States,AHooper@example.com,"({}) {}-{}.format(phone_number[0:3],phone_numb...",MS,Hooper,wm,2018-04-21 12:49:02,"656,Main Street North","(({}) ) {-}-{}.format(phone_number[0:3],phone_..."
1,,,829,4210653310102868,Wethersfield,United States,EHolman@example.com,"({}) {}-{}.format(phone_number[0:3],phone_numb...",CT,Holman,brendan,2018-04-21 12:49:02,"829,Redwood Drive","(({}) ) {-}-{}.format(phone_number[0:3],phone_..."
2,,,683,4210653310116272,Huntley,United States,WDunham@example.com,"({}) {}-{}.format(phone_number[0:3],phone_numb...",IL,Dunham,ezequiel,2018-04-21 12:49:02,"683,12th Street East","(({}) ) {-}-{}.format(phone_number[0:3],phone_..."
3,,,253,4210653310195948,NewBerlin,United States,EHardy@example.com,"({}) {}-{}.format(phone_number[0:3],phone_numb...",WI,Hardy,trina,2018-04-21 12:49:02,"253,Country Club Road","(({}) ) {-}-{}.format(phone_number[0:3],phone_..."
4,,,301,4210653310356919,ElPaso,United States,WAyers@example.com,"({}) {}-{}.format(phone_number[0:3],phone_numb...",TX,Ayers,may,2018-04-21 12:49:02,"301,Madison Street","(({}) ) {-}-{}.format(phone_number[0:3],phone_..."


In [25]:
#df = df.withColumn(col("CUST_PHONE"), regexp_replace(("CUST_PHONE", "(\\ d{3})(\\ d{3})(\\ d{4})", "($1) $2-$3")))
#df = df.select(format_string(col("CUST_PHONE"),df["CUST_PHONE"][:3],df['CUST_PHONE'][3:6],df['CUST_PHONE'][6:]))

df = df.withColumn("CUST_PHONE",df["CUST_PHONE"].format_string("(%s-%s-%s)",df["CUST_PHONE"][:3],df['CUST_PHONE'][3:6],df['CUST_PHONE'][6:]))

TypeError: startPos and length must be the same type. Got <class 'int'> and <class 'NoneType'>, respectively.

In [63]:
df = df.select (regexp_replace(("CUST_PHONE", "(d{3})(d{3})(d{4})", "($1) $2-$3")))

TypeError: regexp_replace() missing 2 required positional arguments: 'pattern' and 'replacement'

In [43]:
df.show()

+----+--------+------+----------------+------------+-------------+--------------------+--------------------+----------+---------+-----------+-------------------+--------------------+
| ssn|CUST_ZIP|APT_NO|  CREDIT_CARD_NO|   CUST_CITY| CUST_COUNTRY|          CUST_EMAIL|          CUST_PHONE|CUST_STATE|LAST_NAME|MIDDLE_NAME|       LAST_UPDATED| FULL_STREET_ADDRESS|
+----+--------+------+----------------+------------+-------------+--------------------+--------------------+----------+---------+-----------+-------------------+--------------------+
|null|    null|   656|4210653310061055|     Natchez|United States| AHooper@example.com|({}) {}-{}.format...|        MS|   Hooper|         wm|2018-04-21 12:49:02|656,Main Street N...|
|null|    null|   829|4210653310102868|Wethersfield|United States| EHolman@example.com|({}) {}-{}.format...|        CT|   Holman|    brendan|2018-04-21 12:49:02|   829,Redwood Drive|
|null|    null|   683|4210653310116272|     Huntley|United States| WDunham@example.co