-sandbox
# DataFrames and Transformations Review
##![Spark Logo Tiny](https://files.training.databricks.com/images/105/logo_spark_tiny.png) De-Duping Data Lab

In this exercise, we're doing ETL on a file we've received from a customer. That file contains data about people, including:

* first, middle and last names
* gender
* birth date
* Social Security number
* salary

But, as is unfortunately common in data we get from this customer, the file contains some duplicate records. Worse:

* In some of the records, the names are mixed case (e.g., "Carol"), while in others, they are uppercase (e.g., "CAROL").
* The Social Security numbers aren't consistent either. Some of them are hyphenated (e.g., "992-83-4829"), while others are missing hyphens ("992834829").

The name fields are guaranteed to match, if you disregard character case, and the birth dates will also match. The salaries will match as well,
and the Social Security Numbers *would* match if they were somehow put in the same format.

Your job is to remove the duplicate records. The specific requirements of your job are:

* Remove duplicates. It doesn't matter which record you keep; it only matters that you keep one of them.
* Preserve the data format of the columns. For example, if you write the first name column in all lowercase, you haven't met this requirement.

Next, you will write the results using two methods:

* First, write the result as a Parquet file, as designated by *destFile*. It must contain 8 part files (8 files ending in ".parquet").
* Next, as is the best practice, write the result to Delta as designated by the filepath *deltaDestFile*.

<img alt="Hint" title="Hint" style="vertical-align: text-bottom; position: relative; height:1.75em; top:0.3em" src="https://files.training.databricks.com/static/images/icon-light-bulb.svg"/>&nbsp;**Hint:** The initial dataset contains 103,000 records.<br/>
The de-duplicated result has 100,000 records.

##### Methods
- DataFrameReader (<a href="https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql.html#input-and-output" target="_blank">Python</a>/<a href="http://spark.apache.org/docs/latest/api/scala/org/apache/spark/sql/DataFrameReader.html" target="_blank">Scala</a>)
- DataFrame (<a href="https://spark.apache.org/docs/latest/api/python/reference/api/pyspark.sql.DataFrame.html" target="_blank">Python</a>/<a href="http://spark.apache.org/docs/latest/api/scala/org/apache/spark/sql/Dataset.html" target="_blank">Scala</a>)
- Built-In Functions (<a href="https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql.html?#functions" target="_blank">Python</a>/<a href="http://spark.apache.org/docs/latest/api/scala/org/apache/spark/sql/functions$.html" target="_blank">Scala</a>)
- DataFrameWriter (<a href="https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql.html#input-and-output" target="_blank">Python</a>/<a href="https://spark.apache.org/docs/latest/api/scala/org/apache/spark/sql/DataFrameWriter.html" target="_blank">Scala</a>)

In [0]:
%run ./Includes/Classroom-Setup

It's helpful to look at the file first, so you can check the format. `dbutils.fs.head()` (or just `%fs head`) is a big help here.

In [0]:
%fs head dbfs:/mnt/training/dataframes/people-with-dups.txt

In [0]:
# TODO

sourceFile = "dbfs:/mnt/training/dataframes/people-with-dups.txt"
destFile = workingDir + "/people.parquet"

# In case it already exists
dbutils.fs.rm(destFile, True)

# Complete your work here...


In [0]:
df = (spark.read
  .option("sep", ":")
  .option("header", True)
  .option("inferSchema", True)
  .csv(sourceFile))

df.printSchema()

In [0]:
display(df)

firstName,middleName,lastName,gender,birthDate,salary,ssn
Emanuel,Wallace,Panton,M,1988-03-04T00:00:00.000+0000,101255,935-90-7627
Eloisa,Rubye,Cayouette,F,2000-06-20T00:00:00.000+0000,204031,935-89-9009
Cathi,Svetlana,Prins,F,2012-12-22T00:00:00.000+0000,35895,959-30-7957
Mitchel,Andres,Mozdzierz,M,1966-05-06T00:00:00.000+0000,55108,989-27-8093
Angla,Melba,Hartzheim,F,1938-07-26T00:00:00.000+0000,13199,935-27-4276
Rachel,Marlin,Borremans,F,1923-02-23T00:00:00.000+0000,67070,996-41-8616
Catarina,Phylicia,Dominic,F,1969-09-29T00:00:00.000+0000,201021,999-84-8888
Antione,Randy,Hamacher,M,2004-03-05T00:00:00.000+0000,271486,917-96-3554
Madaline,Shawanda,Piszczek,F,1996-03-17T00:00:00.000+0000,183944,963-87-9974
Luciano,Norbert,Sarcone,M,1962-12-14T00:00:00.000+0000,73069,909-96-1669


In [0]:

from pyspark.sql.functions import initcap, col, regexp_replace, concat_ws
df_final = (df.withColumn('firstName',initcap(col('firstName')))    # Preserve the data format of names
                  .withColumn('middleName',initcap(col('middleName')))
                  .withColumn('lastName',initcap(col('lastName')))
                  .withColumn('ssn_nohyp',regexp_replace(col('ssn'),'-',''))    # preserve the data format of ssn
                  .withColumn('ssn',concat_ws('-',col('ssn_nohyp').substr(0,3), col('ssn_nohyp').substr(4,2), col('ssn_nohyp').substr(6,4)))
                  .dropDuplicates(['firstName','middleName','lastName','ssn'])
                  .drop('ssn_nohyp')
           )
display(df_final)

firstName,middleName,lastName,gender,birthDate,salary,ssn
Abigail,Ingrid,Garrahan,F,2000-01-11T00:00:00.000+0000,45189,900-55-5967
Abraham,Bertram,Laperouse,M,1922-08-26T00:00:00.000+0000,55988,993-76-6776
Adena,Valrie,Hormell,F,1957-03-10T00:00:00.000+0000,47573,935-93-7329
Adolph,Rod,Bajek,M,1974-01-14T00:00:00.000+0000,242130,909-47-8978
Agripina,Leann,Lecuyer,F,1995-05-28T00:00:00.000+0000,52239,963-55-6670
Ahmad,Kent,Francolino,M,1943-12-07T00:00:00.000+0000,140547,989-56-4105
Aleen,Gisela,Comfort,F,1981-07-09T00:00:00.000+0000,170651,926-39-5615
Alvin,Antonio,Ehleiter,M,1936-09-14T00:00:00.000+0000,261496,994-64-9864
Andre,Avery,Mirra,M,1995-03-09T00:00:00.000+0000,49223,993-78-2883
Abbie,Evelin,Nichol,F,1985-01-10T00:00:00.000+0000,95861,919-95-6712


In [0]:
# partition the data and output to parquet files
df_remove.repartition(8).write.parquet(destFile)

-sandbox
##### <img alt="Best Practice" title="Best Practice" style="vertical-align: text-bottom; position: relative; height:1.75em; top:0.3em" src="https://files.training.databricks.com/static/images/icon-blue-ribbon.svg"/> Check your work

Verify that you wrote the parquet file out to **destFile** and that you have the right number of records.

In [0]:
partFiles = len(list(filter(lambda f: f.path.endswith(".parquet"), dbutils.fs.ls(destFile))))

finalDF = spark.read.parquet(destFile)
finalCount = finalDF.count()

assert partFiles == 8, "expected 8 parquet files located in destFile"
assert finalCount == 100000, "expected 100000 records in finalDF"

## Clean up classroom
Run the cell below to clean up resources.

In [0]:
%run "./Includes/Classroom-Cleanup"
