### Load the Customer CSV data into sql catalog

In [0]:

from pyspark.sql.functions import *
from pyspark.sql.types import *
from pyspark.sql import SparkSession

#Explicit Schema definition
schema_customer_csv=StructType([StructField("customerID",StringType(),False),
                                StructField("companyName",StringType(),True),
                                StructField("contactName",StringType(),True),
                                StructField("contactTitle",StringType(),True),
                                StructField("address",StringType(),True),
                                StructField("city",StringType(),True),
                                StructField("region",StringType(),True),
                                StructField("postalCode",StringType(),True),
                                StructField("country",StringType(),True),
                                StructField("phone",StringType(),True),
                                StructField("fax",StringType(),True)
                                ])

spark=SparkSession.builder.appName("Spark DataFrames for Customer Segmentation").getOrCreate()

df_cust=spark.read.csv("/Volumes/dbckwork/aibiwork/datafiles/csv/customers.csv",header=True,schema=schema_customer_csv)
#display(df_cust)

#apply data tarnsformations and data cleaning
df_cust_valid=df_cust.select(col("customerID"),
                             col("companyName"),
                             col("contactName"),
                             col("contactTitle"),
                             when ((length(df_cust.address) <= 5) | (col("country").rlike("[0-9]")), concat(col("address") , col("city")))
                             .otherwise(col("address")).alias("address_T") ,
                             when ((length(df_cust.address) <= 5) | (col("country").rlike("[0-9]")), col("region"))
                             .otherwise(col("city")).alias("city_t"),
                             regexp_replace(regexp_replace(when ((length(df_cust.address) <= 5) | (col("country").rlike("[0-9]")), col("postalCode"))
                             .otherwise(col("region")),"-","")," ","").alias("region_t"),
                             regexp_replace(regexp_replace(when ((length(df_cust.address) <= 5) | (col("country").rlike("[0-9]")), col("country"))
                             .otherwise(col("postalCode")),"-","")," ","").alias("postalCode_t"),
                            when ((length(df_cust.address) <= 5) | (col("country").rlike("[0-9]")), col("phone"))
                             .otherwise(col("country")).alias("country_t"),
                             regexp_replace(regexp_replace(regexp_replace(regexp_replace(regexp_replace(when ((length(df_cust.address) <= 5) | (col("country").rlike("[0-9]")), col("fax"))
                             .otherwise(col("phone")),r"\(",""),r"\)",""),"-",""),r"\.","")," ","").alias("phone_t"),
                             regexp_replace(regexp_replace(regexp_replace(regexp_replace(regexp_replace(when ((length(df_cust.address) <= 5) | (col("country").rlike("[0-9]")), lit(None))
                             .otherwise(col("fax")),r"\(",""),r"\)",""),"-",""),r"\.","")," ","").alias("fax_t")) \
                             .drop_duplicates()    

#display(df_cust_valid.count())


In [0]:
df_save_to_sql=df_cust_valid.select(col("customerID"),
                             col("companyName"),
                             col("contactName"),
                             col("contactTitle"),
                             col("address_T").alias("address"),
                             col("city_t").alias("city"),
                             col("region_t").alias("region"),
                             col("postalCode_t").alias("postalCode"),
                             col("country_t").alias("country"),
                             col("phone_t").alias("phone"),
                             col("fax_t").alias("fax"))
df_save_to_sql.write.mode("overwrite").saveAsTable("dbckwork.dwh_sql.customer")