-sandbox

<div style="text-align: center; line-height: 0; padding-top: 9px;">
  <img src="https://databricks.com/wp-content/uploads/2018/03/db-academy-rgb-1200px.png" alt="Databricks Learning" style="width: 600px">
</div>

# DataFrames and Transformations Review
## De-Duping Data Lab

In this exercise, we're doing ETL on a file we've received from a customer. That file contains data about people, including:

* first, middle and last names
* gender
* birth date
* Social Security number
* salary

But, as is unfortunately common in data we get from this customer, the file contains some duplicate records. Worse:

* In some of the records, the names are mixed case (e.g., "Carol"), while in others, they are uppercase (e.g., "CAROL").
* The Social Security numbers aren't consistent either. Some of them are hyphenated (e.g., "992-83-4829"), while others are missing hyphens ("992834829").

If all of the name fields match -- if you disregard character case -- then the birth dates and salaries are guaranteed to match as well,
and the Social Security Numbers *would* match if they were somehow put in the same format.

Your job is to remove the duplicate records. The specific requirements of your job are:

* Remove duplicates. It doesn't matter which record you keep; it only matters that you keep one of them.
* Preserve the data format of the columns. For example, if you write the first name column in all lowercase, you haven't met this requirement.

<img src="https://files.training.databricks.com/images/icon_hint_32.png" alt="Hint"> The initial dataset contains 103,000 records.
The de-duplicated result has 100,000 records.

Next, write the results in **Delta** format as a **single data file** to the directory given by the variable *deltaDestDir*.

<img src="https://files.training.databricks.com/images/icon_hint_32.png" alt="Hint"> Remember the relationship between the number of partitions in a DataFrame and the number of files written.

##### Methods
- <a href="https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql.html#input-and-output" target="_blank">DataFrameReader</a>
- <a href="https://spark.apache.org/docs/latest/api/python/reference/api/pyspark.sql.DataFrame.html" target="_blank">DataFrame</a>
- <a href="https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql.html?#functions" target="_blank">Built-In Functions</a>
- <a href="https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql.html#input-and-output" target="_blank">DataFrameWriter</a>

In [0]:
%run ./Includes/Classroom-Setup

It's helpful to look at the file first, so you can check the format. `dbutils.fs.head()` (or just `%fs head`) is a big help here.

In [0]:
%fs head dbfs:/mnt/training/dataframes/people-with-dups.txt

In [0]:
# TODO

sourceFile = "dbfs:/mnt/training/dataframes/people-with-dups.txt"
destFile = workingDir + "/people.parquet"

# In case it already exists
dbutils.fs.rm(destFile, True)

# Complete your work here...
peopleDf = spark.read.csv(sourceFile, sep=':', header=True, inferSchema=True)
display(peopleDf)

firstName,middleName,lastName,gender,birthDate,salary,ssn
Emanuel,Wallace,Panton,M,1988-03-04,101255,935-90-7627
Eloisa,Rubye,Cayouette,F,2000-06-20,204031,935-89-9009
Cathi,Svetlana,Prins,F,2012-12-22,35895,959-30-7957
Mitchel,Andres,Mozdzierz,M,1966-05-06,55108,989-27-8093
Angla,Melba,Hartzheim,F,1938-07-26,13199,935-27-4276
Rachel,Marlin,Borremans,F,1923-02-23,67070,996-41-8616
Catarina,Phylicia,Dominic,F,1969-09-29,201021,999-84-8888
Antione,Randy,Hamacher,M,2004-03-05,271486,917-96-3554
Madaline,Shawanda,Piszczek,F,1996-03-17,183944,963-87-9974
Luciano,Norbert,Sarcone,M,1962-12-14,73069,909-96-1669


In [0]:
peopleDf.count()

Out[9]: 103000

In [0]:
peopleDf.printSchema()

root
 |-- firstName: string (nullable = true)
 |-- middleName: string (nullable = true)
 |-- lastName: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- birthDate: date (nullable = true)
 |-- salary: integer (nullable = true)
 |-- ssn: string (nullable = true)



In [0]:
from pyspark.sql.functions import col, translate, lower

finalDf = (peopleDf
           .select(col('*'), 
                   lower(col('firstName')).alias('lcFName'),
                   lower(col('middleName')).alias('lcMName'),
                   lower(col('lastName')).alias('lcLName'),
                   translate(col('ssn'), '-', '').alias('new_ssn'))
           .dropDuplicates(['lcFName', 'lcMName', 'lcLName', 'gender', 'birthDate', 'salary'])
           .drop('lcFName', 'lcMName', 'lcLName', 'new_ssn')
)

display(finalDf)

firstName,middleName,lastName,gender,birthDate,salary,ssn
Rudolf,Alphonse,Aalbers,M,2006-03-05,261602,945-64-4223
Michel,Jerald,Aalund,M,1987-04-02,195228,994-99-7756
Kristina,Ronni,Aalund,F,1937-07-20,231045,956-65-2871
Aiko,Yuonne,Aamodt,F,2007-03-24,280105,998-15-4939
Adele,Latonya,Aanderud,F,1990-06-27,120551,966-44-1518
Pamelia,Kaitlyn,Aanenson,F,2002-09-12,116613,917-32-8738
Drucilla,Merry,Aanerud,F,1929-09-19,66446,937-81-3613
Elza,Darci,Aarhus,F,1915-01-09,53931,983-28-4817
Kym,Krystal,Abad,F,1987-06-01,293915,930-27-8909
Cristobal,Kent,Abajian,M,1986-06-19,134374,974-24-3648


In [0]:
finalDf.count()

Out[24]: 100000

In [0]:
deltaDestDir = workingDir + "/users.csv"

(finalDf
  .repartition(1)
  .write
  .mode('overwrite')
  .format('delta')
  .save(deltaDestDir)
)

display(dbutils.fs.ls(deltaDestDir))

path,name,size,modificationTime
dbfs:/user/sandhyarani.chinnala@celebaltech.com/dbacademy/spark_programming/asp_3_4_review/users.csv/_delta_log/,_delta_log/,0,1688456369000
dbfs:/user/sandhyarani.chinnala@celebaltech.com/dbacademy/spark_programming/asp_3_4_review/users.csv/part-00000-36d05c8e-8f61-4b06-b689-bf7a58027c9a-c000.snappy.parquet,part-00000-36d05c8e-8f61-4b06-b689-bf7a58027c9a-c000.snappy.parquet,2689207,1688456372000


**CHECK YOUR WORK**

In [0]:
verify_files = dbutils.fs.ls(deltaDestDir)
verify_delta_format = False
verify_num_data_files = 0
for f in verify_files:
    if f.name == '_delta_log/':
        verify_delta_format = True
    elif f.name.endswith('.parquet'):
        verify_num_data_files += 1

assert verify_delta_format, "Data not written in Delta format"
assert verify_num_data_files == 1, "Expected 1 data file written"

verify_record_count = spark.read.format("delta").load(deltaDestDir).count()
assert verify_record_count == 100000, "Expected 100000 records in final result"

del verify_files, verify_delta_format, verify_num_data_files, verify_record_count

## Clean up classroom
Run the cell below to clean up resources.

In [0]:
%run "./Includes/Classroom-Cleanup"

-sandbox
&copy; 2022 Databricks, Inc. All rights reserved.<br/>
Apache, Apache Spark, Spark and the Spark logo are trademarks of the <a href="https://www.apache.org/">Apache Software Foundation</a>.<br/>
<br/>
<a href="https://databricks.com/privacy-policy">Privacy Policy</a> | <a href="https://databricks.com/terms-of-use">Terms of Use</a> | <a href="https://help.databricks.com/">Support</a>