# Lab Exercise
## De-Duping Data

-sandbox
##![Spark Logo Tiny](https://files.training.databricks.com/images/105/logo_spark_tiny.png) Instructions

In this exercise, we're doing ETL on a file we've received from some customer. That file contains data about people, including:

* first, middle and last names
* gender
* birth date
* Social Security number
* salary

But, as is unfortunately common in data we get from this customer, the file contains some duplicate records. Worse:

* In some of the records, the names are mixed case (e.g., "Carol"), while in others, they are uppercase (e.g., "CAROL"). 
* The Social Security numbers aren't consistent, either. Some of them are hyphenated (e.g., "992-83-4829"), while others are missing hyphens ("992834829").

The name fields are guaranteed to match, if you disregard character case, and the birth dates will also match. (The salaries will match, as well,
and the Social Security Numbers *would* match, if they were somehow put in the same format).

Your job is to remove the duplicate records. The specific requirements of your job are:

* Remove duplicates. It doesn't matter which record you keep; it only matters that you keep one of them.
* Preserve the data format of the columns. For example, if you write the first name column in all lower-case, you haven't met this requirement.
* Write the result as a Parquet file, as designated by *destFile*.
* The final Parquet "file" must contain 8 part files (8 files ending in ".parquet").

<img alt="Hint" title="Hint" style="vertical-align: text-bottom; position: relative; height:1.75em; top:0.3em" src="https://files.training.databricks.com/static/images/icon-light-bulb.svg"/>&nbsp;**Hint:** The initial dataset contains 103,000 records.<br/>
The de-duplicated result haves 100,000 records.

##![Spark Logo Tiny](https://files.training.databricks.com/images/105/logo_spark_tiny.png) Getting Started

Run the following cell to configure our "classroom."

In [0]:
%run "./Includes/Classroom-Setup"

In [0]:
%run "./Includes/Initialize-Labs"

In [0]:
partitions = 8

# Make sure wide operations don't repartition to 200
spark.conf.set("spark.sql.shuffle.partitions", str(partitions))

(source, sasEntity, sasToken) = getAzureDataSource()
spark.conf.set(sasEntity, sasToken)

sourceFile = source + "/dataframes/people-with-dups.txt/"
destFile = userhome + "/people.parquet"
print(sourceFile)
print(destFile)

##![Spark Logo Tiny](https://files.training.databricks.com/images/105/logo_spark_tiny.png) Hints

* Use the <a href="http://spark.apache.org/docs/latest/api/python/index.html" target="_blank">API docs</a>. Specifically, you might find 
  <a href="http://spark.apache.org/docs/latest/api/python/pyspark.sql.html#pyspark.sql.DataFrame" target="_blank">DataFrame</a> and
  <a href="http://spark.apache.org/docs/latest/api/python/pyspark.sql.html#module-pyspark.sql.functions" target="_blank">functions</a> to be helpful.
* It's helpful to look at the file first, so you can check the format. `dbutils.fs.head()` (or just `%fs head`) is a big help here.

In [0]:
# TODO

initialDF = (spark.read
  .option("inferSchema", "true") # The default, but not costly w/Parquet
  .option("sep", ":")
  .csv(sourceFile,header = True)         # Read the data in
  .repartition(partitions)       # From 7 >>> 8 partitions
  .cache()                       # Cache the expensive operation
)
# materialize the cache
initialDF.count()

print(initialDF)

#dbutils.fs.rm(destFile, True)




# In case it already exists
#dbutils.fs.rm(destFile, True)

In [0]:
#pageviewsDF = initialDF.upper()
pageviewsDF = (initialDF.select(col("*"),
      lower(col("firstName")).alias("lcFirstName"),
      lower(col("lastName")).alias("lcLastName"),
      lower(col("middleName")).alias("lcMiddleName"),
      translate(col("ssn"), "-", "").alias("ssnNums")))
      # regexp_replace(col("ssn"), "-", "").alias("ssnNums")
      # regexp_replace(col("ssn"), """^(\d{3})(\d{2})(\d{4})$""", "$1-$2-$3").alias("ssnNums")
    
display(pageviewsDF)    

firstName,middleName,lastName,gender,birthDate,salary,ssn,lcFirstName,lcLastName,lcMiddleName,ssnNums
Jaime,Dante,Spelts,M,2001-11-05T00:00:00.000+0000,182484,986-89-9716,jaime,spelts,dante,986899716
Lee,Cyndi,Haroutunian,F,1979-08-04T00:00:00.000+0000,286899,923-20-6320,lee,haroutunian,cyndi,923206320
Anibal,Kurtis,Loehlein,M,2004-12-24T00:00:00.000+0000,108289,914-80-9430,anibal,loehlein,kurtis,914809430
Fay,Yasmine,Rosenthal,F,1980-06-27T00:00:00.000+0000,82763,903-44-2299,fay,rosenthal,yasmine,903442299
Mack,Reed,Mckinney,M,1936-07-31T00:00:00.000+0000,90363,973-26-9458,mack,mckinney,reed,973269458
Jannie,Christinia,Deloye,F,1993-02-08T00:00:00.000+0000,82213,974-63-8144,jannie,deloye,christinia,974638144
Reid,Vance,Augustyniak,M,1939-09-01T00:00:00.000+0000,294406,990-61-1413,reid,augustyniak,vance,990611413
Gordon,Nestor,Flanary,M,1966-06-20T00:00:00.000+0000,255444,930-74-5682,gordon,flanary,nestor,930745682
Kelly,Lynnette,Mcclurken,F,1930-07-19T00:00:00.000+0000,21108,964-30-8392,kelly,mcclurken,lynnette,964308392
Horacio,Maxwell,Spaur,M,2010-02-19T00:00:00.000+0000,57712,978-64-1059,horacio,spaur,maxwell,978641059


In [0]:
from pyspark.sql.functions import *
from pyspark.sql.types import *


In [0]:
DF = pageviewsDF.dropDuplicates(["lcFirstName","lcLastName","lcMiddleName","ssnNums","gender","birthDate","salary"])
DF.count()


DF.write.parquet(destFile)



In [0]:
finalDF = spark.read.parquet(destFile)
finalCount = finalDF.count()
print(finalCount)


##![Spark Logo Tiny](https://s3-us-west-2.amazonaws.com/curriculum-release/images/105/logo_spark_tiny.png) Validate Your Answer

At the bare minimum, we can verify that you wrote the parquet file out to **destFile** and that you have the right number of records.

Running the following cell to confirm your result:

In [0]:
finalDF = spark.read.parquet(destFile)
finalCount = finalDF.count()

clearYourResults()
validateYourAnswer("01 Expected 100000 Records", 972882115, finalCount)
summarizeYourResults()
