
<div style="text-align: center; line-height: 0; padding-top: 9px;">
  <img src="https://cct.ued.vnu.edu.vn/admin//lienketlinkphai/brtunv-7_87_anhlienket.png" alt="Databricks Learning" style="width: 600px">
</div>



Ở trong bài tập này, chúng ta sẽ thực hành tạo luồng ETL từ một file nhận được từ khách hàng. Trong file đó có chứa những thông tin về nhân viên, bao gồm:

| Tên trường | Ý nghĩa |
| --- | --- |
| firstName | Họ |
| middleName | Tên Đệm |
| lastName |  Tên |
| gender | Giới Tính |
| birthDate | Ngày Tháng Năm Sinh |
| salary | Thu Nhập |
| ssn | Mã Số Bảo Hiểm Xã Hội |

##### Link tham khảo
- <a href="https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/io.html" target="_blank">DataFrameReader</a>
- <a href="https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/dataframe.html" target="_blank">DataFrame</a>
- <a href="https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/functions.html" target="_blank">Built-In Functions</a>
- <a href="https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/io.html" target="_blank">DataFrameWriter</a>

In [0]:
val input = "dbfs:/FileStore/people/people_with_dups.txt"

In [0]:
import org.apache.spark.sql.functions._

Bài 1: Thực hiện dùng spark đọc file từ đường dẫn phía trên đã lưu vào `input` và tạo thành DataFrame `inputDF`.

**Chú ý định dạng file và phân cách trường**

In [0]:
val inputDF = spark.read.option("header", "true").option("sep", ":").option("inferSchema", "true").csv(input)
display(inputDF.limit(10))

firstName,middleName,lastName,gender,birthDate,salary,ssn
Emanuel,Wallace,Panton,M,1988-03-04,101255,935-90-7627
Eloisa,Rubye,Cayouette,F,2000-06-20,204031,935-89-9009
Cathi,Svetlana,Prins,F,2012-12-22,35895,959-30-7957
Mitchel,Andres,Mozdzierz,M,1966-05-06,55108,989-27-8093
Angla,Melba,Hartzheim,F,1938-07-26,13199,935-27-4276
Rachel,Marlin,Borremans,F,1923-02-23,67070,996-41-8616
Catarina,Phylicia,Dominic,F,1969-09-29,201021,999-84-8888
Antione,Randy,Hamacher,M,2004-03-05,271486,917-96-3554
Madaline,Shawanda,Piszczek,F,1996-03-17,183944,963-87-9974
Luciano,Norbert,Sarcone,M,1962-12-14,73069,909-96-1669


Bài 2: Thống kê độ tuổi của các nhân viên và số lượng nhân viên trong độ tuổi đó của công ty tính đến thời điểm hiện tại (`2025-04-17`). \
Sắp xếp dữ liệu theo độ tuổi của nhân viên theo thứ tự từ cao xuống thấp

Kiến trúc dữ liệu mong muốn:
| Tên trường | Kiểu dữ liệu |
| --- | --- |
| `age` | Int |
| `count` | Long |

Lưu kết quả vào đường dẫn:
`file:///dbfs/ThucHanhBuoi7/Bai2`

Với định dạng file là `csv` và phân cách cột là dấu `,`


In [0]:
val lesson1 = inputDF.withColumn("age", floor(months_between(to_date(lit("2025-04-17")) , $"birthDate") / 12))
                     .groupBy($"age").agg(count("*").as("count"))
display(lesson1)

age,count
29,1054
26,1039
65,974
54,1010
19,1037
22,1019
77,1066
34,1029
50,1029
94,1107


In [0]:
lesson1.write
       .mode("overwrite")
       .option("header", "true")
       .csv("dbfs:/ThucHanhBuoi7/Bai2")

In [0]:
//kiểm tra writefile
dbutils.fs.ls("dbfs:/ThucHanhBuoi7/Bai2")

Bài 3: Tạo thêm 1 cột `fullName`, ghép từ (`firstName`, `middleName`, `lastName`), thêm cột `age` tính từ thời điểm hiện tại (`2025-04-17`).

Lưu ý: Dữ liệu đang bị trùng lặp ở phần tên và không đúng chuẩn.

Ở một số bản ghi, tên có dạng Mixed Case (VD: `Carol`) và dạng UPPER CASE (VD: `CAROL`). Nhiệm vụ của bạn là loại bỏ trùng lặp theo tên, và chuyển toàn bộ dữ liệu thành dạng Mixed Case.

Định dạng kết quả mong muốn:
| Tên trường | Kiểu dữ liệu |
| --- | --- |
| `fullName` | Int |
| `age` | Int |
| `birthDay` | String |


Lưu kết quả vào đường dẫn:
`file:///dbfs/ThucHanhBuoi7/Bai3`

Với định dạng file là `parquet` và với định dạng nén là: `snappy`

In [0]:
val lesson2 = inputDF.withColumn("fullName", initcap(concat_ws(" ", $"firstName", $"middleName", $"lastName")))
                     .dropDuplicates("fullName")
                     .withColumn("age", floor(months_between(to_date(lit("2025-04-17")), $"birthDate") / 12))
                     .select($"fullName", $"age", $"birthDate".as("birthDay").cast("string"))
display(lesson2)


fullName,age,birthDay
Camilla Lorenza Jeremiah,72,1952-09-24
Vina Pearle Millener,76,1948-05-17
Glen Len Hsueh,55,1969-07-12
Arden Orlando Byler,47,1977-07-02
Erline Yasmine Artola,42,1983-01-07
Refugio Alan Doussan,99,1925-05-11
Carlota Adaline Bez,82,1942-11-15
Madaline Vanessa Couture,96,1928-12-28
Mitzie Tonita Erceg,64,1960-09-20
Lenard Rick Crout,47,1977-09-24


In [0]:
lesson2.write
       .mode("overwrite")
       .option("compression", "snappy")
       .parquet("dbfs:/ThucHanhBuoi7/Bai3")

In [0]:
dbutils.fs.ls("dbfs:/ThucHanhBuoi7/Bai3")

Bài 4: Tương tự bài 3. Nhưng dữ liệu thay cột `ssn` vào dữ liệu.

Chú ý: Dữ liệu `ssn` đang không đúng chuẩn. Một số bản ghi đang ở dạng dấu gạch (VD: 992-83-4829). Một số lại viết liền (VD: 992834829)

Nhiệm vụ của bạn phải chuẩn hóa dữ liệu thành dạng dấu gạch và loại bỏ các bản ghi trùng lặp.

Định dạng kết quả mong muốn:

| Tên trường | Kiểu dữ liệu |
| --- | --- |
| `ssn` | String |
| `age` | Int |
| `birthDay` | String |

Lưu kết quả vào đường dẫn:
`file:///dbfs/ThucHanhBuoi7/Bai4`

Với định dạng file là `json` và với định dạng nén là `gzip`

In [0]:
val lesson3 = inputDF.withColumn("ssn", concat_ws("-", substring(regexp_replace($"ssn","-", ""), 1, 3),
                                                       substring(regexp_replace($"ssn", "-", ""), 4, 2),
                                                       substring(regexp_replace($"ssn", "-", ""), 6, 4)))
                     .dropDuplicates("ssn")
                     .withColumn("age", months_between(to_date(lit("2025-04-17")), $"birthDate") /12)
                     .select($"ssn", $"age", $"birthDate".as("birthDay").cast("string"))
                     

display(lesson3)

ssn,age,birthDay
996-41-8616,102.15053763416668,1923-02-23
992-49-5448,89.163978495,1936-02-18
905-27-3648,105.3064516125,1919-12-27
981-19-5503,102.31182795666666,1922-12-25
911-58-2315,17.801075269166667,2007-06-29
980-94-9483,51.70161290333333,1973-08-04
949-81-5753,43.086021505,1982-03-16
963-51-3797,71.336021505,1953-12-16
964-42-4968,102.03494623666668,1923-04-04
944-45-8744,107.80376344083334,1917-06-28


In [0]:
lesson3.write
       .mode("overwrite")
       .option("compression", "gzip")
       .json("dbfs:/ThucHanhBuoi7/Bai4")

In [0]:
dbutils.fs.ls("dbfs:/ThucHanhBuoi7/Bai4")

Bài 5: Lấy dữ liệu kết quả của bài 3.


Lưu kết quả đó vào đường dẫn 
`file:///dbfs/ThucHanhBuoi7/Bai5`

Với định dạng file là `parquet` và với định dạng nén là: `snappy`. Trong đó, ghi dữ liệu thành 10 files.

In [0]:
lesson2.repartition(10)
       .write
       .mode("overwrite")
       .option("compression", "snappy")
       .parquet("dbfs:/ThucHanhBuoi7/Bai5")

In [0]:
dbutils.fs.ls("dbfs:/ThucHanhBuoi7/Bai5")

Bài 6: Bên Kế Toán đưa ra một yêu cầu đặc biệt cho bạn. Trong đó:

- Thống kê dữ liệu mức lương trung bình của công ty theo giới tính.
- Đưa ra dữ liệu của nhân viên có mức lương cao nhất theo từng giới tính.

Kết quả đưa ra có định dạng như sau:

| Tên trường | Kiểu dữ liệu | Chú ý |
| --- | --- | --- |
| `age` | String ||
| `avgSalary` | Double |Làm tròn đến số 2 sau dấu phẩy|
| `fullName` | String ||
| `ssn` | String ||
| `salary` | Long ||

Chú ý: Loại bỏ trùng lặp dữ liệu trước khi tính `avgSalary`

Lưu kết quả trên vào đường dẫn:

`file:///dbfs/ThucHanhBuoi7/Bai6`

Với định dạng file là `json`


In [0]:
import org.apache.spark.sql.functions._
import org.apache.spark.sql.expressions.Window

val w = Window.partitionBy("gender")

val lesson4 = inputDF
  .dropDuplicates("salary")
  .withColumn("avg_salary", round(avg("salary").over(w),2))
  .withColumn("max_salary", max("salary").over(w))
  .withColumn("fullName", concat_ws(" ", initcap(concat_ws(" ", $"firstName", $"middleName", $"lastName"))))
  .dropDuplicates("fullName")
  .withColumn("age", floor(months_between(to_date(lit("2025-04-17")), $"birthDate")/12))
  .withColumn("ssn", concat_ws("-", substring(regexp_replace($"ssn", "-", ""),1,3),
                                    substring(regexp_replace($"ssn", "-", ""),4,2),
                                    substring(regexp_replace($"ssn", "-", ""),6,4)))
  .dropDuplicates("ssn")
  .filter(col("salary") === col("max_salary"))
  .select(
    col("age"),
    col("avg_salary").as("avgSalary"),
    col("fullName"),
    col("salary").as("max_salary"),
    col("ssn"),
    col("gender")
  )

display(resultDF)


age,avgSalary,fullName,max_salary,ssn,gender
29,155483.57,Rosanne Johnsie Compono,299988,912-83-8713,F
92,154961.55,Steve Mervin Poucher,299986,916-25-5536,M


In [0]:
lesson4.write
       .mode("overwrite")
       .json("dbfs:/ThucHanhBuoi7/Bai6")

In [0]:
dbutils.fs.ls("dbfs:/ThucHanhBuoi7/Bai6")