# Lab Exercise
## De-Duping Data

-sandbox
##![Spark Logo Tiny](https://files.training.databricks.com/images/105/logo_spark_tiny.png) Instructions

In this exercise, we're doing ETL on a file we've received from some customer. That file contains data about people, including:

* first, middle and last names
* gender
* birth date
* Social Security number
* salary

But, as is unfortunately common in data we get from this customer, the file contains some duplicate records. Worse:

* In some of the records, the names are mixed case (e.g., "Carol"), while in others, they are uppercase (e.g., "CAROL"). 
* The Social Security numbers aren't consistent, either. Some of them are hyphenated (e.g., "992-83-4829"), while others are missing hyphens ("992834829").

The name fields are guaranteed to match, if you disregard character case, and the birth dates will also match. (The salaries will match, as well,
and the Social Security Numbers *would* match, if they were somehow put in the same format).

Your job is to remove the duplicate records. The specific requirements of your job are:

* Remove duplicates. It doesn't matter which record you keep; it only matters that you keep one of them.
* Preserve the data format of the columns. For example, if you write the first name column in all lower-case, you haven't met this requirement.
* Write the result as a Parquet file, as designated by *destFile*.
* The final Parquet "file" must contain 8 part files (8 files ending in ".parquet").

<img alt="Hint" title="Hint" style="vertical-align: text-bottom; position: relative; height:1.75em; top:0.3em" src="https://files.training.databricks.com/static/images/icon-light-bulb.svg"/>&nbsp;**Hint:** The initial dataset contains 103,000 records.<br/>
The de-duplicated result haves 100,000 records.

##![Spark Logo Tiny](https://files.training.databricks.com/images/105/logo_spark_tiny.png) Getting Started

Run the following cell to configure our "classroom."

In [0]:
%run "./Includes/Classroom-Setup"

In [0]:
%run "./Includes/Initialize-Labs"

##![Spark Logo Tiny](https://files.training.databricks.com/images/105/logo_spark_tiny.png) Hints

* Use the <a href="http://spark.apache.org/docs/latest/api/python/index.html" target="_blank">API docs</a>. Specifically, you might find 
  <a href="http://spark.apache.org/docs/latest/api/python/pyspark.sql.html#pyspark.sql.DataFrame" target="_blank">DataFrame</a> and
  <a href="http://spark.apache.org/docs/latest/api/python/pyspark.sql.html#module-pyspark.sql.functions" target="_blank">functions</a> to be helpful.
* It's helpful to look at the file first, so you can check the format. `dbutils.fs.head()` (or just `%fs head`) is a big help here.

In [0]:
from pyspark.sql.functions import *
from pyspark.sql.types import *
partitions = 8

# Make sure wide operations don't repartition to 200
spark.conf.set("spark.sql.shuffle.partitions", str(partitions))

(source, sasEntity, sasToken) = getAzureDataSource()
spark.conf.set(sasEntity, sasToken)

sourceFile = source + "/dataframes/people-with-dups.txt"
destFile = userhome + "/people.parquet"
dbutils.fs.ls(sourceFile)
print(dbutils.fs.head(sourceFile))

# In case it already exists
#dbutils.fs.rm(destFile, True)

In [0]:
initialDF = (spark.read
   .option("header", "true")
    .option("inferSchema", "true")
    .option("delimiter", ":")
    .csv(sourceFile)
            )
# materialize the cache
initialDF.count()
print(initialDF)

In [0]:
modifiedDF = (initialDF
  .select(
      upper(col("firstName")).alias("modFirstName"),
      upper(col("middleName")).alias("modMiddleName"),
      upper(col("lastName")).alias("modLastName"),
      col("gender"),col("birthDate"),col("salary"),
      translate(col("ssn"), "-", "").alias("ssnNumber")
    
   )
  .dropDuplicates(["modFirstName","modMiddleName","modLastName","gender","birthDate","salary","ssnNumber"])
 
  
)
print(modifiedDF.count())


In [0]:
print(modifiedDF)
display(modifiedDF)


modFirstName,modMiddleName,modLastName,gender,birthDate,salary,ssnNumber
ELOISA,RUBYE,CAYOUETTE,F,2000-06-20T00:00:00.000+0000,204031,935899009
ANTIONE,RANDY,HAMACHER,M,2004-03-05T00:00:00.000+0000,271486,917963554
MALENA,APRYL,KINGS,F,1980-01-08T00:00:00.000+0000,98204,934482334
FERNANDO,LOWELL,ZEBELL,M,1955-07-03T00:00:00.000+0000,246516,951576196
MYLES,STUART,MISKELL,M,1928-05-17T00:00:00.000+0000,280726,992892715
JULIANE,HALLIE,VUILLEMOT,F,1972-01-11T00:00:00.000+0000,203415,950172657
ALEX,GILBERT,TRIGUEROS,M,1990-12-30T00:00:00.000+0000,271591,999603232
MICHEL,LOREN,SANDOZ,M,1925-05-04T00:00:00.000+0000,270046,948564284
SHAUNNA,KATHLENE,AMUNRUD,F,1965-07-01T00:00:00.000+0000,234090,916536216
LEEANN,MADELEINE,CLATER,F,1976-10-26T00:00:00.000+0000,100705,946417003


In [0]:
finalDF= (modifiedDF.toDF("firstName","middleName","lastName","gender","birthDate","salary","ssn")

         )



In [0]:
finalDF.printSchema()
display(finalDF)
print(finalDF.count())



firstName,middleName,lastName,gender,birthDate,salary,ssn
ELOISA,RUBYE,CAYOUETTE,F,2000-06-20T00:00:00.000+0000,204031,935899009
ANTIONE,RANDY,HAMACHER,M,2004-03-05T00:00:00.000+0000,271486,917963554
MALENA,APRYL,KINGS,F,1980-01-08T00:00:00.000+0000,98204,934482334
FERNANDO,LOWELL,ZEBELL,M,1955-07-03T00:00:00.000+0000,246516,951576196
MYLES,STUART,MISKELL,M,1928-05-17T00:00:00.000+0000,280726,992892715
JULIANE,HALLIE,VUILLEMOT,F,1972-01-11T00:00:00.000+0000,203415,950172657
ALEX,GILBERT,TRIGUEROS,M,1990-12-30T00:00:00.000+0000,271591,999603232
MICHEL,LOREN,SANDOZ,M,1925-05-04T00:00:00.000+0000,270046,948564284
SHAUNNA,KATHLENE,AMUNRUD,F,1965-07-01T00:00:00.000+0000,234090,916536216
LEEANN,MADELEINE,CLATER,F,1976-10-26T00:00:00.000+0000,100705,946417003


In [0]:
(finalDF.write
   .mode("overwrite")
   .parquet(destFile)
)
finalDF = spark.read.parquet(destFile)
print("Total Records: {0:,}".format( finalDF.count() ))

In [0]:
display( dbutils.fs.ls(destFile) )

path,name,size,modificationTime
dbfs:/user/farzanam24@gmail.com/people.parquet/_SUCCESS,_SUCCESS,0,1662699283000
dbfs:/user/farzanam24@gmail.com/people.parquet/_committed_6453941352764611838,_committed_6453941352764611838,424,1662699282000
dbfs:/user/farzanam24@gmail.com/people.parquet/_started_6453941352764611838,_started_6453941352764611838,0,1662699282000
dbfs:/user/farzanam24@gmail.com/people.parquet/part-00000-tid-6453941352764611838-a213f959-2405-4290-91ea-66dc4e0f861d-925-1-c000.snappy.parquet,part-00000-tid-6453941352764611838-a213f959-2405-4290-91ea-66dc4e0f861d-925-1-c000.snappy.parquet,771223,1662699282000
dbfs:/user/farzanam24@gmail.com/people.parquet/part-00001-tid-6453941352764611838-a213f959-2405-4290-91ea-66dc4e0f861d-926-1-c000.snappy.parquet,part-00001-tid-6453941352764611838-a213f959-2405-4290-91ea-66dc4e0f861d-926-1-c000.snappy.parquet,771487,1662699282000
dbfs:/user/farzanam24@gmail.com/people.parquet/part-00002-tid-6453941352764611838-a213f959-2405-4290-91ea-66dc4e0f861d-927-1-c000.snappy.parquet,part-00002-tid-6453941352764611838-a213f959-2405-4290-91ea-66dc4e0f861d-927-1-c000.snappy.parquet,768272,1662699282000
dbfs:/user/farzanam24@gmail.com/people.parquet/part-00003-tid-6453941352764611838-a213f959-2405-4290-91ea-66dc4e0f861d-928-1-c000.snappy.parquet,part-00003-tid-6453941352764611838-a213f959-2405-4290-91ea-66dc4e0f861d-928-1-c000.snappy.parquet,764372,1662699282000


##![Spark Logo Tiny](https://s3-us-west-2.amazonaws.com/curriculum-release/images/105/logo_spark_tiny.png) Validate Your Answer

At the bare minimum, we can verify that you wrote the parquet file out to **destFile** and that you have the right number of records.

Running the following cell to confirm your result:

In [0]:
finalDF = spark.read.parquet(destFile)
finalCount = finalDF.count()

clearYourResults()
validateYourAnswer("01 Expected 100000 Records", 972882115, finalCount)
summarizeYourResults()
