## Overview

This notebook will show you how to create and query a table or DataFrame that you uploaded to DBFS. [DBFS](https://docs.databricks.com/user-guide/dbfs-databricks-file-system.html) is a Databricks File System that allows you to store data for querying inside of Databricks. This notebook assumes that you have a file already inside of DBFS that you would like to read from.

This notebook is written in **Python** so the default cell type is Python. However, you can use different languages by using the `%LANGUAGE` syntax. Python, Scala, SQL, and R are all supported.

In [None]:
from pyspark.sql.types import StringType, IntegerType
from pyspark.sql.functions import udf, col, split, current_timestamp, lit, monotonically_increasing_id

In [None]:
#Load the file into a dataframe
file_location = "s3://data-engineering-aspen-capital/data_engineer_raw_data.xlsx"
file_type = "com.crealytics.spark.excel"
infer_schema = "true"
first_row_is_header = "true"

df = spark.read.format(file_type) \
  .option("inferSchema", infer_schema) \
  .option("header", first_row_is_header) \
  .load(file_location)

#Contains data from sheet1 of the given xlsx
display(df)

id,full_name,street,city,state,zip_code,phone_home,phone_cell,email
a20d3e1c-d1fb-45a8-b89d-0373f7a29d9d,Kimball Bruyns,,Waterbury,Connecticut,6705,,,
0c53b401-def5-4c5e-9981-fc7ec92ac230,Claudette MacColgan,2 Roth Alley,Memphis,Tennessee,38104,615-897-8189,407-427-8986,cmaccolgan1@amazonaws.com
6a91cf5a-6166-4e01-bdda-00953de327a3,Vanny Fanshaw,,Huntington,West Virginia,25721,,757-599-0522,
560cea0d-a266-4c40-b7d1-1688db8d86b8,West Kalinsky,,San Bernardino,California,92410,,281-593-3658,
1cd44b01-06ce-44dc-ad12-27bf24bc3b2e,Vivie Filpo,13 Golf View Hill,Baltimore,Maryland,21290,410-622-0404,801-297-6131,vfilpo4@networkadvertising.org
ef56846c-d942-45ea-8a2d-5852fb2b51ec,Syman Pauleau,,Crawfordsville,Indiana,47937,,512-116-2045,
746c01a7-a1cb-4fa4-856d-1d1bf48e13b8,Martita Rudinger,,Wilkes Barre,Pennsylvania,18768,,754-177-0322,
c298d352-6d0d-4d64-aa3d-5c77202b509e,Martica Joynson,56009 Weeping Birch Hill,Long Beach,California,90810,562-743-9355,520-889-5766,mjoynson7@deviantart.com
db374708-8a1e-436f-a04f-0e10adeede41,Perla Besant,43776 Calypso Junction,Montpelier,Vermont,5609,802-175-4804,352-363-9436,pbesant8@wp.com
7ba0730f-c29c-45c8-a56a-cda9c159bbdf,Lemuel Colloff,,Fresno,California,93704,,,


In [None]:
df_role_profile = spark.read.format(file_type) \
  .option("inferSchema", infer_schema) \
  . option("sheetName", "role_profile") \
  .option("header", first_row_is_header) \
  .option("dataAddress", "'role_profile'!A1") \
  .load(file_location)

#Contains data from sheet2 of the given xlsx
display(df_role_profile)

borrower_id,role_profile
a20d3e1c-d1fb-45a8-b89d-0373f7a29d9d,co-borrower
0c53b401-def5-4c5e-9981-fc7ec92ac230,co-borrower
6a91cf5a-6166-4e01-bdda-00953de327a3,co-borrower
560cea0d-a266-4c40-b7d1-1688db8d86b8,co-borrower
1cd44b01-06ce-44dc-ad12-27bf24bc3b2e,co-borrower
ef56846c-d942-45ea-8a2d-5852fb2b51ec,borrower
746c01a7-a1cb-4fa4-856d-1d1bf48e13b8,co-borrower
c298d352-6d0d-4d64-aa3d-5c77202b509e,borrower
db374708-8a1e-436f-a04f-0e10adeede41,borrower
7ba0730f-c29c-45c8-a56a-cda9c159bbdf,co-borrower


In [None]:
'''
Table: user_profile
PK: user_profile_id
Get the required details from the original df.
Extract first and last names from full_name
Add columns for created_date, created_by, updated_date and updated_by
'''

user_profile = df.select(col("id").alias("user_profile_id"), col("full_name").alias("first_name"))\
                .withColumn("last_name", split(col("first_name"), " ")[1])\
                .withColumn("first_name", split(col("first_name"), " ")[0])\
                .withColumn("created_date", current_timestamp())\
                .withColumn("updated_date", col("created_date"))\
                .withColumn("created_by", lit("admin"))\
                .withColumn("updated_by", col("created_by"))

user_profile.show()

+--------------------+----------+----------+--------------------+--------------------+----------+----------+
|     user_profile_id|first_name| last_name|        created_date|        updated_date|created_by|updated_by|
+--------------------+----------+----------+--------------------+--------------------+----------+----------+
|a20d3e1c-d1fb-45a...|   Kimball|    Bruyns|2023-04-15 20:26:...|2023-04-15 20:26:...|     admin|     admin|
|0c53b401-def5-4c5...| Claudette| MacColgan|2023-04-15 20:26:...|2023-04-15 20:26:...|     admin|     admin|
|6a91cf5a-6166-4e0...|     Vanny|   Fanshaw|2023-04-15 20:26:...|2023-04-15 20:26:...|     admin|     admin|
|560cea0d-a266-4c4...|      West|  Kalinsky|2023-04-15 20:26:...|2023-04-15 20:26:...|     admin|     admin|
|1cd44b01-06ce-44d...|     Vivie|     Filpo|2023-04-15 20:26:...|2023-04-15 20:26:...|     admin|     admin|
|ef56846c-d942-45e...|     Syman|   Pauleau|2023-04-15 20:26:...|2023-04-15 20:26:...|     admin|     admin|
|746c01a7-a1cb-4fa.

In [None]:
'''
Table: user
PK: user_id (monotonically increasing from 1)
Get the required details from the original df.
Add columns for created_date, created_by, updated_date and updated_by
'''

user = user_profile.selectExpr("user_profile_id", "created_date", "updated_date", "created_by", "updated_by")\
        .withColumn("user_id", monotonically_increasing_id()+1)
user.show()

+--------------------+--------------------+--------------------+----------+----------+-------+
|     user_profile_id|        created_date|        updated_date|created_by|updated_by|user_id|
+--------------------+--------------------+--------------------+----------+----------+-------+
|a20d3e1c-d1fb-45a...|2023-04-15 20:26:...|2023-04-15 20:26:...|     admin|     admin|      1|
|0c53b401-def5-4c5...|2023-04-15 20:26:...|2023-04-15 20:26:...|     admin|     admin|      2|
|6a91cf5a-6166-4e0...|2023-04-15 20:26:...|2023-04-15 20:26:...|     admin|     admin|      3|
|560cea0d-a266-4c4...|2023-04-15 20:26:...|2023-04-15 20:26:...|     admin|     admin|      4|
|1cd44b01-06ce-44d...|2023-04-15 20:26:...|2023-04-15 20:26:...|     admin|     admin|      5|
|ef56846c-d942-45e...|2023-04-15 20:26:...|2023-04-15 20:26:...|     admin|     admin|      6|
|746c01a7-a1cb-4fa...|2023-04-15 20:26:...|2023-04-15 20:26:...|     admin|     admin|      7|
|c298d352-6d0d-4d6...|2023-04-15 20:26:...|2023-04

In [None]:
'''
Table: role_profile_type
PK: role_profile_type_id
Get the distinct profile types
Add columns for created_date, created_by, updated_date and updated_by
'''

role_profile_type = df_role_profile.select(col("role_profile").alias("type")).distinct()\
                    .withColumn("created_date", current_timestamp())\
                    .withColumn("updated_date", col("created_date"))\
                    .withColumn("created_by", lit("admin"))\
                    .withColumn("updated_by", col("created_by"))\
                    .withColumn("role_profile_type_id", monotonically_increasing_id()+1)

role_profile_type.show()

+-----------+--------------------+--------------------+----------+----------+--------------------+
|       type|        created_date|        updated_date|created_by|updated_by|role_profile_type_id|
+-----------+--------------------+--------------------+----------+----------+--------------------+
|co-borrower|2023-04-15 20:26:...|2023-04-15 20:26:...|     admin|     admin|                   1|
|   borrower|2023-04-15 20:26:...|2023-04-15 20:26:...|     admin|     admin|                   2|
+-----------+--------------------+--------------------+----------+----------+--------------------+



In [None]:
'''
Table: role_profile
PK: role_profile_id (monotonically increasing from 1)
Get the required details from the original df
Join with role_profile_type to get the profile types
Add columns for created_date, created_by, updated_date and updated_by
'''

role_profile = df_role_profile.select(col("role_profile").alias("type"), col("borrower_id").alias("role_profile_id"))\
                .withColumn("created_date", current_timestamp())\
                .withColumn("updated_date", col("created_date"))\
                .withColumn("created_by", lit("admin"))\
                .withColumn("updated_by", col("created_by"))\
                .join(role_profile_type.selectExpr("type", "role_profile_type_id"), "type")\
                .drop("type")\
                .join(user.select("user_id", col("user_profile_id").alias("role_profile_id")), "role_profile_id")

role_profile.show()

+--------------------+--------------------+--------------------+----------+----------+--------------------+-------+
|     role_profile_id|        created_date|        updated_date|created_by|updated_by|role_profile_type_id|user_id|
+--------------------+--------------------+--------------------+----------+----------+--------------------+-------+
|a20d3e1c-d1fb-45a...|2023-04-15 20:26:...|2023-04-15 20:26:...|     admin|     admin|                   1|      1|
|0c53b401-def5-4c5...|2023-04-15 20:26:...|2023-04-15 20:26:...|     admin|     admin|                   1|      2|
|6a91cf5a-6166-4e0...|2023-04-15 20:26:...|2023-04-15 20:26:...|     admin|     admin|                   1|      3|
|560cea0d-a266-4c4...|2023-04-15 20:26:...|2023-04-15 20:26:...|     admin|     admin|                   1|      4|
|1cd44b01-06ce-44d...|2023-04-15 20:26:...|2023-04-15 20:26:...|     admin|     admin|                   1|      5|
|746c01a7-a1cb-4fa...|2023-04-15 20:26:...|2023-04-15 20:26:...|     adm

In [None]:
'''
Table: address
PK: address_id (monotonically increasing from 1)
Get the required details from the original df.
Join with role_profile to get the role_profile_id
Add columns for created_date, created_by, updated_date and updated_by
'''
address = df.select("street", "city", "state", "zip_code", col("id").alias("role_profile_id"))\
            .withColumn("address_id", monotonically_increasing_id()+1).withColumn("created_date", current_timestamp())\
            .withColumn("updated_date", col("created_date"))\
            .withColumn("created_by", lit("admin"))\
            .withColumn("updated_by", col("created_by"))\
            .join(role_profile.select("role_profile_id"), "role_profile_id")\

address.show()

+--------------------+--------------------+--------------+--------------+--------+----------+--------------------+--------------------+----------+----------+
|     role_profile_id|              street|          city|         state|zip_code|address_id|        created_date|        updated_date|created_by|updated_by|
+--------------------+--------------------+--------------+--------------+--------+----------+--------------------+--------------------+----------+----------+
|a20d3e1c-d1fb-45a...|                null|     Waterbury|   Connecticut|   06705|         1|2023-04-15 20:27:...|2023-04-15 20:27:...|     admin|     admin|
|0c53b401-def5-4c5...|        2 Roth Alley|       Memphis|     Tennessee|   38104|         2|2023-04-15 20:27:...|2023-04-15 20:27:...|     admin|     admin|
|6a91cf5a-6166-4e0...|                null|    Huntington| West Virginia|   25721|         3|2023-04-15 20:27:...|2023-04-15 20:27:...|     admin|     admin|
|560cea0d-a266-4c4...|                null|San Berna

In [None]:
'''
Table: phone_number
PK: phone_number_id (monotonically increasing from 1)
Get the home and cell phone numbers for every user and concatenate the two dataframes
Add columns for created_date, created_by, updated_date and updated_by
Join with role_profile to get the role_profile_id
'''

phone_home = df.select(col("phone_home").alias("value"), col("id").alias("role_profile_id"))\
            .withColumn("phone_number_type_id", lit(1))
phone_cell = df.select(col("phone_cell").alias("value"), col("id").alias("role_profile_id"))\
            .withColumn("phone_number_type_id", lit(2))
            
phone_number = phone_home.unionAll(phone_cell).dropna()\
                .withColumn("phone_number_id", monotonically_increasing_id()+1)\
                .withColumn("created_date", current_timestamp())\
                .withColumn("updated_date", col("created_date"))\
                .withColumn("created_by", lit("admin"))\
                .withColumn("updated_by", col("created_by"))\
                .join(role_profile.select("role_profile_id"), "role_profile_id")
                
phone_number.show()

+--------------------+------------+--------------------+---------------+--------------------+--------------------+----------+----------+
|     role_profile_id|       value|phone_number_type_id|phone_number_id|        created_date|        updated_date|created_by|updated_by|
+--------------------+------------+--------------------+---------------+--------------------+--------------------+----------+----------+
|0c53b401-def5-4c5...|407-427-8986|                   2|    68719476737|2023-04-15 20:27:...|2023-04-15 20:27:...|     admin|     admin|
|0c53b401-def5-4c5...|615-897-8189|                   1|              1|2023-04-15 20:27:...|2023-04-15 20:27:...|     admin|     admin|
|6a91cf5a-6166-4e0...|757-599-0522|                   2|    68719476738|2023-04-15 20:27:...|2023-04-15 20:27:...|     admin|     admin|
|560cea0d-a266-4c4...|281-593-3658|                   2|    68719476739|2023-04-15 20:27:...|2023-04-15 20:27:...|     admin|     admin|
|1cd44b01-06ce-44d...|801-297-6131|      

In [None]:
'''
Table: phone_number_type
PK: phone_number_type_id
Get the distinct phone types (home,cell)
Add columns for created_date, created_by, updated_date and updated_by
'''

phone_types = [("home", 1), ("cell", 2)]
phone_number_type = spark.createDataFrame(phone_types, ["type", "phone_number_type_id"])\
                    .withColumn("created_date", current_timestamp())\
                    .withColumn("updated_date", col("created_date"))\
                    .withColumn("created_by", lit("admin"))\
                    .withColumn("updated_by", col("created_by"))\
                    
phone_number_type.show()

+----+--------------------+--------------------+--------------------+----------+----------+
|type|phone_number_type_id|        created_date|        updated_date|created_by|updated_by|
+----+--------------------+--------------------+--------------------+----------+----------+
|home|                   1|2023-04-15 20:27:...|2023-04-15 20:27:...|     admin|     admin|
|cell|                   2|2023-04-15 20:27:...|2023-04-15 20:27:...|     admin|     admin|
+----+--------------------+--------------------+--------------------+----------+----------+



In [None]:
'''
Table: email_type
PK: email_type_id(monotonically increasing from 1)
Split the id based on '.' and get the domain name to find the type of email
Add columns for created_date, created_by, updated_date and updated_by
'''

email = df.select(col("email").alias("value"), col("id").alias("role_profile_id"))\
        .withColumn("type", split(col("value"), "\.")[1])

email_type = email.select("type").distinct()\
            .withColumn("created_date", current_timestamp())\
            .withColumn("updated_date", col("created_date"))\
            .withColumn("created_by", lit("admin"))\
            .withColumn("updated_by", col("created_by"))\
            .withColumn("email_type_id", monotonically_increasing_id()+1)
            
email_type.show()

+----+--------------------+--------------------+----------+----------+-------------+
|type|        created_date|        updated_date|created_by|updated_by|email_type_id|
+----+--------------------+--------------------+----------+----------+-------------+
|  fm|2023-04-15 20:27:...|2023-04-15 20:27:...|     admin|     admin|            1|
| com|2023-04-15 20:27:...|2023-04-15 20:27:...|     admin|     admin|            2|
|  io|2023-04-15 20:27:...|2023-04-15 20:27:...|     admin|     admin|            3|
|null|2023-04-15 20:27:...|2023-04-15 20:27:...|     admin|     admin|            4|
| gov|2023-04-15 20:27:...|2023-04-15 20:27:...|     admin|     admin|            5|
| edu|2023-04-15 20:27:...|2023-04-15 20:27:...|     admin|     admin|            6|
|  co|2023-04-15 20:27:...|2023-04-15 20:27:...|     admin|     admin|            7|
|  nl|2023-04-15 20:27:...|2023-04-15 20:27:...|     admin|     admin|            8|
|  ru|2023-04-15 20:27:...|2023-04-15 20:27:...|     admin|     a

In [None]:
'''
Table: email
PK: email_id(monotonically increasing from 1)
Get the required data from the email df created above
Join with email_type to get the type of email
Add columns for created_date, created_by, updated_date and updated_by
'''
email = email.select("*").withColumn("created_date", current_timestamp())\
        .withColumn("updated_date", col("created_date"))\
        .withColumn("created_by", lit("admin"))\
        .withColumn("updated_by", col("created_by"))\
        .join(email_type.selectExpr("type", "email_type_id"), "type")\
        .drop("type")\
        .withColumn("email_id", monotonically_increasing_id()+1)

email.show()

+--------------------+--------------------+--------------------+--------------------+----------+----------+-------------+--------+
|               value|     role_profile_id|        created_date|        updated_date|created_by|updated_by|email_type_id|email_id|
+--------------------+--------------------+--------------------+--------------------+----------+----------+-------------+--------+
|   crapper1x@last.fm|d29e37f0-2314-483...|2023-04-15 20:17:...|2023-04-15 20:17:...|     admin|     admin|            1|       1|
|cmaccolgan1@amazo...|0c53b401-def5-4c5...|2023-04-15 20:17:...|2023-04-15 20:17:...|     admin|     admin|            2|       2|
|mjoynson7@deviant...|c298d352-6d0d-4d6...|2023-04-15 20:17:...|2023-04-15 20:17:...|     admin|     admin|            2|       3|
|     pbesant8@wp.com|db374708-8a1e-436...|2023-04-15 20:17:...|2023-04-15 20:17:...|     admin|     admin|            2|       4|
|    zmeakesb@cnn.com|c86c89f9-afe0-4b6...|2023-04-15 20:17:...|2023-04-15 20:17:..

In [None]:
#save the dataframes back to S3
user.write.format("csv").option("header", "true").save(s"s3://data-engineering-aspen-capital/user.csv")
user_profile.write.format("csv").option("header", "true").save(s"s3://data-engineering-aspen-capital/user_profile.csv")
role_profile.write.format("csv").option("header", "true").save(s"s3://data-engineering-aspen-capital/role_profile.csv")
role_profile_type.write.format("csv").option("header", "true").save(s"s3://data-engineering-aspen-capital/role_profile_type.csv")
phone_number.write.format("csv").option("header", "true").save(s"s3://data-engineering-aspen-capital/phone_number.csv")
phone_number_type.write.format("csv").option("header", "true").save(s"s3://data-engineering-aspen-capital/phone_number_type.csv")
address.write.format("csv").option("header", "true").save(s"s3://data-engineering-aspen-capital/address.csv")
email.write.format("csv").option("header", "true").save(s"s3://data-engineering-aspen-capital/email.csv")
email_type.write.format("csv").option("header", "true").save(s"s3://data-engineering-aspen-capital/email_type.csv")