# Lab Exercise
## De-Duping Data

-sandbox
##![Spark Logo Tiny](https://files.training.databricks.com/images/105/logo_spark_tiny.png) Instructions

In this exercise, we're doing ETL on a file we've received from some customer. That file contains data about people, including:

* first, middle and last names
* gender
* birth date
* Social Security number
* salary

But, as is unfortunately common in data we get from this customer, the file contains some duplicate records. Worse:

* In some of the records, the names are mixed case (e.g., "Carol"), while in others, they are uppercase (e.g., "CAROL"). 
* The Social Security numbers aren't consistent, either. Some of them are hyphenated (e.g., "992-83-4829"), while others are missing hyphens ("992834829").

The name fields are guaranteed to match, if you disregard character case, and the birth dates will also match. (The salaries will match, as well,
and the Social Security Numbers *would* match, if they were somehow put in the same format).

Your job is to remove the duplicate records. The specific requirements of your job are:

* Remove duplicates. It doesn't matter which record you keep; it only matters that you keep one of them.
* Preserve the data format of the columns. For example, if you write the first name column in all lower-case, you haven't met this requirement.
* Write the result as a Parquet file, as designated by *destFile*.
* The final Parquet "file" must contain 8 part files (8 files ending in ".parquet").

<img alt="Hint" title="Hint" style="vertical-align: text-bottom; position: relative; height:1.75em; top:0.3em" src="https://files.training.databricks.com/static/images/icon-light-bulb.svg"/>&nbsp;**Hint:** The initial dataset contains 103,000 records.<br/>
The de-duplicated result haves 100,000 records.

##![Spark Logo Tiny](https://files.training.databricks.com/images/105/logo_spark_tiny.png) Getting Started

Run the following cell to configure our "classroom."

In [0]:
%run "./Includes/Classroom-Setup"

In [0]:
%run "./Includes/Initialize-Labs"

##![Spark Logo Tiny](https://files.training.databricks.com/images/105/logo_spark_tiny.png) Hints

* Use the <a href="http://spark.apache.org/docs/latest/api/python/index.html" target="_blank">API docs</a>. Specifically, you might find 
  <a href="http://spark.apache.org/docs/latest/api/python/pyspark.sql.html#pyspark.sql.DataFrame" target="_blank">DataFrame</a> and
  <a href="http://spark.apache.org/docs/latest/api/python/pyspark.sql.html#module-pyspark.sql.functions" target="_blank">functions</a> to be helpful.
* It's helpful to look at the file first, so you can check the format. `dbutils.fs.head()` (or just `%fs head`) is a big help here.

In [0]:
# TODO

(source, sasEntity, sasToken) = getAzureDataSource()
spark.conf.set(sasEntity, sasToken)

sourceFile = "wasbs://training@dbtraineastus.blob.core.windows.net/dataframes/people-with-dups.csv"
destFile = "dbfs:/user/vasu.v.2022@outlook.com/people.parquet"

# In case it already exists
dbutils.fs.rm(destFile, True)

In [0]:
dbutils.fs.ls('wasbs://training@dbtraineastus.blob.core.windows.net/dataframes/')

In [0]:
initialDF = (spark.read
  .option("inferSchema", "true") # The default, but not costly w/Parquet
  .csv(sourceFile,header=True)               # Read the data in
  .repartition(8)                # From 7 >>> 8 partitions
  .cache()                       # Cache the expensive operation
)
# materialize the cache
initialDF.count()

In [0]:
display(initialDF)

id,firstName,middleName,lastName,gender,birthDate,salary,ssn
18744,Melda,Lashaunda,Steifle,F,2004-07-30T00:00:00.000+0000,139469,906-88-3466
12129,Herbert,Carson,Ellsworth,M,1937-07-19T00:00:00.000+0000,67486,937-33-3241
45340,Mervin,Troy,Peine,M,1963-07-01T00:00:00.000+0000,216244,976-66-5446
53487,Mable,Sherita,Bergesen,F,1941-04-07T00:00:00.000+0000,242251,928-26-6609
52171,Keneth,Eric,Champany,M,1975-10-18T00:00:00.000+0000,70306,953-74-5082
52629,Cassandra,Cherie,Dellavalle,F,1923-04-01T00:00:00.000+0000,226569,965-44-8814
17840,Hoyt,Alva,Ritari,M,1966-01-31T00:00:00.000+0000,112016,938-82-2810
24260,Jerrold,Shayne,Sedlachek,M,1926-07-01T00:00:00.000+0000,118720,973-32-1707
2121,Marcellus,Jonathon,Beninato,M,1989-07-21T00:00:00.000+0000,160591,965-40-1817
19362,Conchita,Gwenn,Giesing,F,1979-06-16T00:00:00.000+0000,209248,987-20-5249


In [0]:
finalDF.head(5)

In [0]:
finalDF = (initialDF
     .select(col("*"),
      lower(col("firstName")).alias("lcFirstName"),
      lower(col("lastName")).alias("lcLastName"),
      lower(col("middleName")).alias("lcMiddleName"),
      translate(col("ssn"), "-", "").alias("ssnNums")
      # regexp_replace(col("ssn"), "-", "").alias("ssnNums")
      # regexp_replace(col("ssn"), """^(\d{3})(\d{2})(\d{4})$""", "$1-$2-$3").alias("ssnNums")
   )
  .dropDuplicates(["lcFirstName", "lcMiddleName", "lcLastName", "ssnNums", "gender", "birthDate", "salary"])
  .drop("lcFirstName", "lcMiddleName", "lcLastName", "ssnNums")
)
#display(finalDF)
finalDF.count()

In [0]:
finalDF.write.mode("overwrite").parquet(destFile)

##![Spark Logo Tiny](https://s3-us-west-2.amazonaws.com/curriculum-release/images/105/logo_spark_tiny.png) Validate Your Answer

At the bare minimum, we can verify that you wrote the parquet file out to **destFile** and that you have the right number of records.

Running the following cell to confirm your result:

In [0]:
finalDF = spark.read.parquet(destFile)
finalCount = finalDF.count()

clearYourResults()
validateYourAnswer("01 Expected 100000 Records", 972882115, finalCount)
summarizeYourResults()


0,1,2
01 Expected 100000 Records:,passed,100000
