# Last.fm — Data Quality Notebook (Scala + Spark)

**Goal:** Manually verify (and document) data quality for the Last.fm 1K users dataset using **Scala + Spark**.

**What this notebook covers:**
1. Ingestion with **explicit schema** and **UTC timezone**.
2. **Robust timestamp parsing** and counting invalid rows.
3. **Key normalization**: prefer `track_id` (MBID), fallback to `artist_name — track_name`.
4. **String sanitization** (trim, remove control chars).
5. **Data quality metrics** (read/valid/dropped rows, % missing MBIDs).
6. **Policy for empty fields** (user_id / artist_name / track_name).
7. **Semantic rule checks** for session gaps: **= 20 min** vs **> 20 min**.
8. **Duplicate detection** (same user, same timestamp, same track).
9. *(Optional)* **Deequ** constraints (nullability, uniqueness).

**Tested/compatible with:** Scala **2.12** and Spark **3.5.x**. Adjust library coordinates accordingly if you add optional Deequ.

In [1]:
import $ivy.`org.apache.spark::spark-sql:3.5.1`
import $ivy.`org.plotly-scala::plotly-almond:0.8.0`
import plotly._, plotly.element._, plotly.layout._, plotly.Almond._
init()

import org.apache.spark.sql.{SparkSession, DataFrame}
import org.apache.spark.sql.functions._
import org.apache.logging.log4j.{LogManager, Level => LogLevel}
import org.apache.logging.log4j.core.Logger

import org.apache.spark.sql.types._

// Suppress INFO logs
System.setProperty("log4j2.level", "WARN")

val spark = SparkSession.builder()
  .appName("LastFM-DataCleaning")
  .master("local[*]")
  .config("spark.sql.shuffle.partitions", "4")
  .getOrCreate()

spark.conf.set("spark.sql.session.timeZone", "UTC")

// Reduz log para ERROR em loggers Spark e Hadoop
Seq(
  "org.apache.spark",
  "org.apache.spark.sql.execution",
  "org.apache.spark.storage",
  "org.apache.hadoop",
  "org.spark_project"
).foreach { name =>
  LogManager.getLogger(name).asInstanceOf[Logger].setLevel(LogLevel.ERROR)
}

LogManager.getRootLogger.asInstanceOf[Logger].setLevel(LogLevel.ERROR)

import spark.implicits._

// Defaults
var INPUT_PATH: String = "/Users/Felipe/lastfm/data/lastfm/lastfm-dataset-1k/userid-timestamp-artid-artname-traid-traname.tsv" // main play logs TSV
var PROFILE_PATH: String = "/Users/Felipe/lastfm/data/lastfm/lastfm-dataset-1k/userid-profile.tsv"
val SAMPLE_ROWS = 20 // number of rows to show in samples

println(s"Using INPUT_PATH  = ${INPUT_PATH}")
println(s"Using PROFILE_PATH = ${PROFILE_PATH}")

09:57:10.726 [scala-interpreter-1] WARN  org.apache.hadoop.util.NativeCodeLoader - Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Using INPUT_PATH  = /Users/Felipe/lastfm/data/lastfm/lastfm-dataset-1k/userid-timestamp-artid-artname-traid-traname.tsv
Using PROFILE_PATH = /Users/Felipe/lastfm/data/lastfm/lastfm-dataset-1k/userid-profile.tsv


[32mimport [39m[36m$ivy.$[39m
[32mimport [39m[36m$ivy.$[39m
[32mimport [39m[36mplotly._, plotly.element._, plotly.layout._, plotly.Almond._
[39m
[32mimport [39m[36morg.apache.spark.sql.{SparkSession, DataFrame}[39m
[32mimport [39m[36morg.apache.spark.sql.functions._[39m
[32mimport [39m[36morg.apache.logging.log4j.{LogManager, Level => LogLevel}[39m
[32mimport [39m[36morg.apache.logging.log4j.core.Logger[39m
[32mimport [39m[36morg.apache.spark.sql.types._[39m
[36mres1_9[39m: [32mString[39m = [32mnull[39m
[36mspark[39m: [32mSparkSession[39m = org.apache.spark.sql.SparkSession@1750c8c7
[32mimport [39m[36mspark.implicits._[39m
[36mINPUT_PATH[39m: [32mString[39m = [32m"/Users/Felipe/lastfm/data/lastfm/lastfm-dataset-1k/userid-timestamp-artid-artname-traid-traname.tsv"[39m
[36mPROFILE_PATH[39m: [32mString[39m = [32m"/Users/Felipe/lastfm/data/lastfm/lastfm-dataset-1k/userid-profile.tsv"[39m
[36mSAMPLE_ROWS[39m: [32mInt[39m = [32m2

## 1) Ingestion with explicit schema
**Purpose:** Avoid incorrect type inference and lock expected column order.

**Columns:**
 - `user_id` (String, not null)
 - `ts_str` (String, not null) — raw timestamp to be parsed later
 - `artist_id` (String, nullable) — MBID
 - `artist_name` (String, nullable)
 - `track_id` (String, nullable) — MBID
 - `track_name` (String, nullable)

In [2]:
val schema = StructType(Seq(
  StructField("user_id", StringType, nullable = false),
  StructField("ts_str", StringType, nullable = false),
  StructField("artist_id", StringType, nullable = true),
  StructField("artist_name", StringType, nullable = true),
  StructField("track_id", StringType, nullable = true),
  StructField("track_name", StringType, nullable = true)
))

val rawDf = spark.read
  .option("sep", "\t")
  .option("header", "false")
  .schema(schema)
  .csv(INPUT_PATH)

val rowsRead = rawDf.count()
println(s"Rows read (raw): ${rowsRead}")
rawDf.show(SAMPLE_ROWS, truncate = false)
rawDf.printSchema()

Rows read (raw): 19150868
+-----------+--------------------+------------------------------------+---------------+------------------------------------+------------------------------------------+
|user_id    |ts_str              |artist_id                           |artist_name    |track_id                            |track_name                                |
+-----------+--------------------+------------------------------------+---------------+------------------------------------+------------------------------------------+
|user_000001|2009-05-04T23:08:57Z|f1b1cf71-bd35-4e99-8624-24a6e15f133a|Deep Dish      |NULL                                |Fuck Me Im Famous (Pacha Ibiza)-09-28-2007|
|user_000001|2009-05-04T13:54:10Z|a7f7df4a-77d8-4f12-8acd-5c60c93f4de8|坂本龍一       |NULL                                |Composition 0919 (Live_2009_4_15)         |
|user_000001|2009-05-04T13:52:04Z|a7f7df4a-77d8-4f12-8acd-5c60c93f4de8|坂本龍一       |NULL                                |Mc2 (Live_2009_4_1

[36mschema[39m: [32mStructType[39m = [33mSeq[39m(
  [33mStructField[39m(
    name = [32m"user_id"[39m,
    dataType = StringType,
    nullable = [32mfalse[39m,
    metadata = {}
  ),
  [33mStructField[39m(
    name = [32m"ts_str"[39m,
    dataType = StringType,
    nullable = [32mfalse[39m,
    metadata = {}
  ),
  [33mStructField[39m(
    name = [32m"artist_id"[39m,
    dataType = StringType,
    nullable = [32mtrue[39m,
    metadata = {}
  ),
  [33mStructField[39m(
    name = [32m"artist_name"[39m,
    dataType = StringType,
    nullable = [32mtrue[39m,
    metadata = {}
  ),
  [33mStructField[39m(
    name = [32m"track_id"[39m,
    dataType = StringType,
    nullable = [32mtrue[39m,
    metadata = {}
  ),
  [33mStructField[39m(
    name = [32m"track_name"[39m,
    dataType = StringType,
    nullable = [32mtrue[39m,
    metadata = {}
  )
)
[36mrawDf[39m: [32mDataFrame[39m = [user_id: string, ts_str: string ... 4 more fields]
[36mrowsRead

## 2) Robust timestamp parsing & invalid row accounting
**Purpose:** Parse `ts_str` into a proper `timestamp` column (`ts`) and count invalid rows. Keep only rows with a valid timestamp.

**Format used:** `yyyy-MM-dd'T'HH:mm:ss` (UTC)

In [3]:
val withTsDf = rawDf
  .withColumn("ts", to_timestamp(col("ts_str")))  // No format specified - uses ISO 8601 parsing
  .drop("ts_str")

val invalidTsCount = withTsDf.filter(col("ts").isNull).count()
val validTsDf = withTsDf.filter(col("ts").isNotNull)

println(s"Invalid due to timestamp parse: ${invalidTsCount}")
println(s"Valid rows after ts parse: ${validTsDf.count()}")

validTsDf.show(SAMPLE_ROWS, truncate = false)

Invalid due to timestamp parse: 0
Valid rows after ts parse: 19150868
+-----------+------------------------------------+---------------+------------------------------------+------------------------------------------+-------------------+
|user_id    |artist_id                           |artist_name    |track_id                            |track_name                                |ts                 |
+-----------+------------------------------------+---------------+------------------------------------+------------------------------------------+-------------------+
|user_000001|f1b1cf71-bd35-4e99-8624-24a6e15f133a|Deep Dish      |NULL                                |Fuck Me Im Famous (Pacha Ibiza)-09-28-2007|2009-05-04 23:08:57|
|user_000001|a7f7df4a-77d8-4f12-8acd-5c60c93f4de8|坂本龍一       |NULL                                |Composition 0919 (Live_2009_4_15)         |2009-05-04 13:54:10|
|user_000001|a7f7df4a-77d8-4f12-8acd-5c60c93f4de8|坂本龍一       |NULL                                |

[36mwithTsDf[39m: [32mDataFrame[39m = [user_id: string, artist_id: string ... 4 more fields]
[36minvalidTsCount[39m: [32mLong[39m = [32m0L[39m
[36mvalidTsDf[39m: [32morg[39m.[32mapache[39m.[32mspark[39m.[32msql[39m.[32mDataset[39m[[32morg[39m.[32mapache[39m.[32mspark[39m.[32msql[39m.[32mRow[39m] = [user_id: string, artist_id: string ... 4 more fields]

## 3) Key normalization (MBID preferred; fallback to `artist_name — track_name`)
**Purpose:** Build a stable `track_key` used for counts/joins even when MBIDs are missing.

**Rule:** If `track_id` (MBID) is present & non-empty, use it; otherwise use `${artist_name} — ${track_name}` with nulls replaced by `?`.

In [4]:
val normalizedDf = validTsDf.withColumn(
  "track_key",
  when(col("track_id").isNotNull && length(col("track_id")) > 0, col("track_id"))
    .otherwise(concat_ws(" — ", coalesce(col("artist_name"), lit("?")), coalesce(col("track_name"), lit("?"))))
)

normalizedDf.select("user_id","ts","artist_id","artist_name","track_id","track_name","track_key")
  .show(SAMPLE_ROWS, truncate = false)

+-----------+-------------------+------------------------------------+---------------+------------------------------------+------------------------------------------+------------------------------------------------------+
|user_id    |ts                 |artist_id                           |artist_name    |track_id                            |track_name                                |track_key                                             |
+-----------+-------------------+------------------------------------+---------------+------------------------------------+------------------------------------------+------------------------------------------------------+
|user_000001|2009-05-04 23:08:57|f1b1cf71-bd35-4e99-8624-24a6e15f133a|Deep Dish      |NULL                                |Fuck Me Im Famous (Pacha Ibiza)-09-28-2007|Deep Dish — Fuck Me Im Famous (Pacha Ibiza)-09-28-2007|
|user_000001|2009-05-04 13:54:10|a7f7df4a-77d8-4f12-8acd-5c60c93f4de8|坂本龍一       |NULL                          

[36mnormalizedDf[39m: [32mDataFrame[39m = [user_id: string, artist_id: string ... 5 more fields]

## 4) String sanitization
**Purpose:** Remove control characters and trim whitespace from key string fields.

In [5]:
val sanitizeUdf = udf { s: String =>
  if (s == null) null
  else s.replaceAll("\\p{Cntrl}", "").trim
}

val cleanDf = normalizedDf
  .withColumn("artist_name", sanitizeUdf(col("artist_name")))
  .withColumn("track_name", sanitizeUdf(col("track_name")))
  .withColumn("user_id", sanitizeUdf(col("user_id")))

cleanDf.select("user_id","artist_name","track_name").show(SAMPLE_ROWS, truncate = false)

+-----------+---------------+------------------------------------------+
|user_id    |artist_name    |track_name                                |
+-----------+---------------+------------------------------------------+
|user_000001|Deep Dish      |Fuck Me Im Famous (Pacha Ibiza)-09-28-2007|
|user_000001|坂本龍一       |Composition 0919 (Live_2009_4_15)         |
|user_000001|坂本龍一       |Mc2 (Live_2009_4_15)                      |
|user_000001|坂本龍一       |Hibari (Live_2009_4_15)                   |
|user_000001|坂本龍一       |Mc1 (Live_2009_4_15)                      |
|user_000001|坂本龍一       |To Stanford (Live_2009_4_15)              |
|user_000001|坂本龍一       |Improvisation (Live_2009_4_15)            |
|user_000001|坂本龍一       |Glacier (Live_2009_4_15)                  |
|user_000001|坂本龍一       |Parolibre (Live_2009_4_15)                |
|user_000001|坂本龍一       |Bibo No Aozora (Live_2009_4_15)           |
|user_000001|坂本龍一       |The Last Emperor (Theme)                  |
|user_000001|坂本龍一 

[36msanitizeUdf[39m: [32morg[39m.[32mapache[39m.[32mspark[39m.[32msql[39m.[32mexpressions[39m.[32mUserDefinedFunction[39m = [33mSparkUserDefinedFunction[39m(
  f = ammonite.$sess.cmd5$Helper$$Lambda$7274/0x0000000a0205b040@36a354f6,
  dataType = StringType,
  inputEncoders = [33mList[39m(
    [33mSome[39m(
      value = [33mExpressionEncoder[39m(
        objSerializer = [33mStaticInvoke[39m(
          staticObject = class org.apache.spark.unsafe.types.UTF8String,
          dataType = StringType,
          functionName = [32m"fromString"[39m,
          arguments = [33mList[39m(
            [33mBoundReference[39m(
              ordinal = [32m0[39m,
              dataType = [33mObjectType[39m(cls = class java.lang.String),
              nullable = [32mtrue[39m
            )
          ),
          inputTypes = [33mList[39m(),
          propagateNull = [32mtrue[39m,
          returnNullable = [32mfalse[39m,
          isDeterministic = [32mtrue[39m

## 5) Data Quality metrics summary
**Purpose:** Summarize read/valid/dropped counts and percent of missing MBIDs.

In [6]:
def calculateDataQualityMetrics(originalCount: Long, finalDf: DataFrame): Map[String, Any] = {
  val finalCount = finalDf.count()
  val missingTrackId = finalDf.filter(col("track_id").isNull || length(col("track_id")) === 0).count()
  
  Map(
    "rows_read" -> originalCount,
    "rows_valid" -> finalCount,
    "rows_dropped" -> (originalCount - finalCount),
    "missing_track_id" -> missingTrackId,
    "pct_missing_track_id" -> (if (finalCount == 0) 0.0 else missingTrackId.toDouble / finalCount * 100.0)
  )
}

case class DataQualityReport(
  timestamp: String,
  dataset: String,
  metrics: Map[String, Any],
  warnings: List[String] = List.empty,
  recommendations: List[String] = List.empty
)

def generateQualityReport(originalCount: Long, cleanDf: DataFrame): DataQualityReport = {
  val metrics = calculateDataQualityMetrics(originalCount, cleanDf)
  val warnings = scala.collection.mutable.ListBuffer[String]()
  val recommendations = scala.collection.mutable.ListBuffer[String]()
  
  // Add business logic checks
  val pctMissing = metrics("pct_missing_track_id").asInstanceOf[Double]
  if (pctMissing > 50.0) {
    warnings += s"High percentage of missing track IDs: ${pctMissing}%"
    recommendations += "Consider improving track ID matching or using fallback keys"
  }
  
  DataQualityReport(
    timestamp = java.time.Instant.now().toString,
    dataset = "lastfm-1k",
    metrics = metrics,
    warnings = warnings.toList,
    recommendations = recommendations.toList
  )
}

defined [32mfunction[39m [36mcalculateDataQualityMetrics[39m
defined [32mclass[39m [36mDataQualityReport[39m
defined [32mfunction[39m [36mgenerateQualityReport[39m

In [7]:
// Place this after your data cleaning pipeline, before the final summary
println("\n" + "=" * 60)
println("GENERATING DATA QUALITY REPORT")
println("=" * 60)

try {
  val qualityReport = generateQualityReport(rowsRead, cleanDf)
  
  // Display report
  println(s"\n📊 Data Quality Report (${qualityReport.dataset})")
  println(s"🕐 Generated: ${qualityReport.timestamp}")
  
  println(s"\n📈 METRICS:")
  qualityReport.metrics.foreach { case (key, value) =>
    val formattedKey = key.replace("_", " ").capitalize
    println(f"   $formattedKey%-25s: $value")
  }
  
  if (qualityReport.warnings.nonEmpty) {
    println(s"\n⚠️  WARNINGS (${qualityReport.warnings.length}):")
    qualityReport.warnings.zipWithIndex.foreach { case (warning, idx) =>
      println(s"   ${idx + 1}. $warning")
    }
  }
  
  if (qualityReport.recommendations.nonEmpty) {
    println(s"\n💡 RECOMMENDATIONS (${qualityReport.recommendations.length}):")
    qualityReport.recommendations.zipWithIndex.foreach { case (rec, idx) =>
      println(s"   ${idx + 1}. $rec")
    }
  }
  
  println(s"\n✅ Data quality assessment completed successfully!")
  
} catch {
  case ex: Exception =>
    println(s"❌ Error generating quality report: ${ex.getMessage}")
    ex.printStackTrace()
}


GENERATING DATA QUALITY REPORT

📊 Data Quality Report (lastfm-1k)
🕐 Generated: 2025-09-11T06:00:29.963686Z

📈 METRICS:
   Rows valid               : 19150868
   Rows read                : 19150868
   Missing track id         : 2168588
   Pct missing track id     : 11.323706058649666
   Rows dropped             : 0

✅ Data quality assessment completed successfully!


In [17]:
def validateDataQuality(df: DataFrame): List[String] = {
  val issues = scala.collection.mutable.ListBuffer[String]()
  
  // Check for negative timestamps (if converted to epoch)
  val futureTimestamps = df.filter(col("ts") > current_timestamp()).count()
  if (futureTimestamps > 0) {
    issues += s"Found $futureTimestamps records with future timestamps"
  }
  
  // Check for extremely old timestamps (before music streaming era)
  val veryOldTimestamps = df.filter(col("ts") < lit("2000-01-01")).count()
  if (veryOldTimestamps > 0) {
    issues += s"Found $veryOldTimestamps records with timestamps before 2000"
  }
  
  // Check for users with unrealistic play counts
  val suspiciousUsers = df.groupBy("user_id")
    .count()
    .filter(col("count") > 100000) // Adjust threshold as needed
    .count()
  
  if (suspiciousUsers > 0) {
    issues += s"Found $suspiciousUsers users with >100k plays (potential data quality issue)"
  }
  
  issues.toList
}

defined [32mfunction[39m [36mvalidateDataQuality[39m

In [18]:
println(validateDataQuality(cleanDf))

List(Found 13 users with >100k plays (potential data quality issue))


## 6) Empty-field policies
**Purpose:** Inspect how many rows have empty `user_id`, `artist_name`, or `track_name` and decide whether to drop or impute.

**Recommendation:** Drop rows with empty `user_id` and either drop or mark unknown `artist/track` depending on downstream needs (document in README).


In [8]:
val emptyUser = cleanDf.filter(coalesce(col("user_id"), lit("")) === "").count()
val emptyArtist = cleanDf.filter(coalesce(col("artist_name"), lit("")) === "").count()
val emptyTrack = cleanDf.filter(coalesce(col("track_name"), lit("")) === "").count()

println(s"Rows with empty user_id     : $emptyUser")
println(s"Rows with empty artist_name : $emptyArtist")
println(s"Rows with empty track_name  : $emptyTrack")

val DROP_EMPTY = true // toggle this policy if needed
val dqDf = if (DROP_EMPTY) {
  cleanDf.filter(col("user_id") =!= "" && col("artist_name") =!= "" && col("track_name") =!= "")
} else cleanDf

println(s"Rows after empty-field policy: ${dqDf.count()}")

Rows with empty user_id     : 0
Rows with empty artist_name : 0
Rows with empty track_name  : 8
Rows after empty-field policy: 19150860


[36memptyUser[39m: [32mLong[39m = [32m0L[39m
[36memptyArtist[39m: [32mLong[39m = [32m0L[39m
[36memptyTrack[39m: [32mLong[39m = [32m8L[39m
[36mDROP_EMPTY[39m: [32mBoolean[39m = [32mtrue[39m
[36mdqDf[39m: [32morg[39m.[32mapache[39m.[32mspark[39m.[32msql[39m.[32mDataset[39m[[32morg[39m.[32mapache[39m.[32mspark[39m.[32msql[39m.[32mRow[39m] = [user_id: string, artist_id: string ... 5 more fields]

## 7) Semantic rule — session gap boundary (=20 vs >20 minutes)
**Purpose:** Build a tiny synthetic dataset to verify the session split rule at 20 minutes.

**Rule:**
- gap **≤ 20** minutes → same session
- gap **> 20** minutes → new session

In [9]:
import java.sql.Timestamp

def ts(s: String) = Timestamp.valueOf(s.replace("T", " "))

val sessionCheck = Seq(
  ("u1", ts("2023-01-01T10:00:00"), "A"),
  ("u1", ts("2023-01-01T10:20:00"), "B"), // exactly 20 min → SAME session
  ("u1", ts("2023-01-01T10:40:01"), "C"), // > 20 min → NEW session
  ("u2", ts("2023-01-01T09:00:00"), "X"),
  ("u2", ts("2023-01-01T09:15:00"), "Y")  // < 20 min → SAME session
).toDF("user_id","ts","track_key")

sessionCheck.orderBy("user_id","ts").show(truncate = false)

import org.apache.spark.sql.expressions.Window
val w = Window.partitionBy("user_id").orderBy(col("ts").asc)

val prevTs = lag(col("ts"), 1).over(w)
val gapSec = (col("ts").cast("long") - prevTs.cast("long"))
val gapMin = when(prevTs.isNull, lit(null).cast("double")).otherwise(gapSec / 60.0)

val withGaps = sessionCheck
  .withColumn("prev_ts", prevTs)
  .withColumn("gap_minutes", gapMin)
  .withColumn("is_new_session", when(prevTs.isNull, 1).when(col("gap_minutes") > 20.0, 1).otherwise(0))
  .withColumn("session_seq", sum(col("is_new_session")).over(w))

withGaps.orderBy("user_id","ts").show(truncate = false)

+-------+-------------------+---------+
|user_id|ts                 |track_key|
+-------+-------------------+---------+
|u1     |2023-01-01 06:00:00|A        |
|u1     |2023-01-01 06:20:00|B        |
|u1     |2023-01-01 06:40:01|C        |
|u2     |2023-01-01 05:00:00|X        |
|u2     |2023-01-01 05:15:00|Y        |
+-------+-------------------+---------+

+-------+-------------------+---------+-------------------+------------------+--------------+-----------+
|user_id|ts                 |track_key|prev_ts            |gap_minutes       |is_new_session|session_seq|
+-------+-------------------+---------+-------------------+------------------+--------------+-----------+
|u1     |2023-01-01 06:00:00|A        |NULL               |NULL              |1             |1          |
|u1     |2023-01-01 06:20:00|B        |2023-01-01 06:00:00|20.0              |0             |1          |
|u1     |2023-01-01 06:40:01|C        |2023-01-01 06:20:00|20.016666666666666|1             |2          |
|u2

[32mimport [39m[36mjava.sql.Timestamp[39m
defined [32mfunction[39m [36mts[39m
[36msessionCheck[39m: [32mDataFrame[39m = [user_id: string, ts: timestamp ... 1 more field]
[32mimport [39m[36morg.apache.spark.sql.expressions.Window[39m
[36mw[39m: [32morg[39m.[32mapache[39m.[32mspark[39m.[32msql[39m.[32mexpressions[39m.[32mWindowSpec[39m = org.apache.spark.sql.expressions.WindowSpec@34064b08
[36mprevTs[39m: [32morg[39m.[32mapache[39m.[32mspark[39m.[32msql[39m.[32mColumn[39m = lag(ts, 1, NULL) OVER (PARTITION BY user_id ORDER BY ts ASC NULLS FIRST unspecifiedframe$())
[36mgapSec[39m: [32morg[39m.[32mapache[39m.[32mspark[39m.[32msql[39m.[32mColumn[39m = (CAST(ts AS BIGINT) - CAST(lag(ts, 1, NULL) OVER (PARTITION BY user_id ORDER BY ts ASC NULLS FIRST unspecifiedframe$()) AS BIGINT))
[36mgapMin[39m: [32morg[39m.[32mapache[39m.[32mspark[39m.[32msql[39m.[32mColumn[39m = CASE WHEN (lag(ts, 1, NULL) OVER (PARTITION BY user_id ORDER

## 8) Duplicate detection
**Purpose:** Identify potential duplicates defined as **same user, same timestamp, same track**.

**Action:** Count duplicates and preview a few; decide whether to drop or keep (document in README).

In [11]:
val dupCols = Seq("user_id","ts","track_key")

val dupCounts = dqDf
  .groupBy(dupCols.map(col): _*)
  .agg(count(lit(1)).alias("cnt"))
  .filter(col("cnt") > 1)

val totalDupRows = if (dupCounts.head(1).isEmpty) 0L else dupCounts.select(sum("cnt")).first().getLong(0)

println(s"Distinct duplicate keys: ${dupCounts.count()}")
println(s"Total duplicated rows   : ${totalDupRows}")

dupCounts.orderBy(col("cnt").desc).show(20, truncate = false)

Distinct duplicate keys: 1
Total duplicated rows   : 2
+-----------+-------------------+------------------------------------+---+
|user_id    |ts                 |track_key                           |cnt|
+-----------+-------------------+------------------------------------+---+
|user_000274|2008-10-31 21:52:29|f8317357-7fea-47d4-9e2f-3fe91ef349c9|2  |
+-----------+-------------------+------------------------------------+---+



[36mdupCols[39m: [32mSeq[39m[[32mString[39m] = [33mList[39m([32m"user_id"[39m, [32m"ts"[39m, [32m"track_key"[39m)
[36mdupCounts[39m: [32morg[39m.[32mapache[39m.[32mspark[39m.[32msql[39m.[32mDataset[39m[[32morg[39m.[32mapache[39m.[32mspark[39m.[32msql[39m.[32mRow[39m] = [user_id: string, ts: timestamp ... 2 more fields]
[36mtotalDupRows[39m: [32mLong[39m = [32m2L[39m

## 9) (Optional) Deequ constraints
**Purpose:** Validate constraints like nullability and uniqueness using **AWS Deequ**. This section is optional and requires adding Deequ as a dependency.

**How to enable:**
- Add library: `"com.amazon.deequ" %% "deequ" % "2.0.7-spark-3.3"` (or a version compatible with your Spark).
- Then run checks such as: not-null on `user_id`, timestamp; uniqueness on `(user_id, ts)` if required.

**Note:** The exact coordinates vary by Spark/Scala versions; confirm compatibility.

## 10) Summary & Next steps
**What we verified:**
- Explicit schema & UTC timezone.
- Timestamp parsing with invalid-row accounting.
- Track key normalization (MBID preferred, fallback safe).
- String sanitization.
- DQ metrics (read/valid/dropped, % missing MBIDs).
- Empty-field policies and their impact.
- Semantic rule at the 20-minute boundary (session split).
- Duplicate detection and preview.
 
**Next steps (suggested):**
- Decide and enforce final policies (drop vs. impute) and document in README.
- Persist cleaned datasets to a curated zone (e.g., Parquet, partitioned).
- Integrate this DQ notebook in CI (smaller synthetic samples) to prevent regressions.
- (Optional) Add Deequ checks into automated pipelines for continuous monitoring.

## 11) Save curated data (example)
**Purpose:** Demonstrate how to persist the cleaned DataFrame into Parquet format with partitioning.

In [None]:
val OUTPUT_PATH = "/Users/Felipe/lastfm/output/curated"

dqDf.write
  .mode("overwrite")
  .partitionBy("user_id")
  .parquet(OUTPUT_PATH)

println(s"Curated dataset saved to ${OUTPUT_PATH}")

## 12) Export DQ metrics
**Purpose:** Persist the summary metrics into a CSV/Parquet for reporting.

In [None]:
val dqMetrics = Seq(
  ("rows_read", rowsRead.toString),
  ("rows_valid", totalValid.toString),
  ("rows_dropped", totalDropped.toString),
  ("missing_track_id", missingTrackId.toString),
  ("pct_missing_track_id", f"${pctMissingTrackId}%.2f%%")
).toDF("metric","value")

val METRICS_PATH = "/Users/Felipe/lastfm/output/metrics"

dqMetrics.write.mode("overwrite").option("header","true").csv(METRICS_PATH)

println(s"DQ metrics exported to ${METRICS_PATH}")

## 13) Join with profile data (optional)
**Purpose:** Combine plays with user profile info for enriched analysis.

In [12]:
val profileSchema = StructType(Seq(
  StructField("user_id", StringType, nullable = false),
  StructField("gender", StringType, nullable = true),
  StructField("age", IntegerType, nullable = true),
  StructField("country", StringType, nullable = true),
  StructField("signup", StringType, nullable = true)
))

val profileDf = spark.read
  .option("sep", "	")
  .option("header", "false")
  .schema(profileSchema)
  .csv(PROFILE_PATH)

val enrichedDf = dqDf.join(profileDf, Seq("user_id"), "left")

enrichedDf.show(SAMPLE_ROWS, truncate = false)

+-----------+------------------------------------+---------------+------------------------------------+------------------------------------------+-------------------+------------------------------------------------------+------+----+-------+------------+
|user_id    |artist_id                           |artist_name    |track_id                            |track_name                                |ts                 |track_key                                             |gender|age |country|signup      |
+-----------+------------------------------------+---------------+------------------------------------+------------------------------------------+-------------------+------------------------------------------------------+------+----+-------+------------+
|user_000001|f1b1cf71-bd35-4e99-8624-24a6e15f133a|Deep Dish      |NULL                                |Fuck Me Im Famous (Pacha Ibiza)-09-28-2007|2009-05-04 23:08:57|Deep Dish — Fuck Me Im Famous (Pacha Ibiza)-09-28-2007|m     |NULL|Ja

[36mprofileSchema[39m: [32mStructType[39m = [33mSeq[39m(
  [33mStructField[39m(
    name = [32m"user_id"[39m,
    dataType = StringType,
    nullable = [32mfalse[39m,
    metadata = {}
  ),
  [33mStructField[39m(
    name = [32m"gender"[39m,
    dataType = StringType,
    nullable = [32mtrue[39m,
    metadata = {}
  ),
  [33mStructField[39m(
    name = [32m"age"[39m,
    dataType = IntegerType,
    nullable = [32mtrue[39m,
    metadata = {}
  ),
  [33mStructField[39m(
    name = [32m"country"[39m,
    dataType = StringType,
    nullable = [32mtrue[39m,
    metadata = {}
  ),
  [33mStructField[39m(
    name = [32m"signup"[39m,
    dataType = StringType,
    nullable = [32mtrue[39m,
    metadata = {}
  )
)
[36mprofileDf[39m: [32mDataFrame[39m = [user_id: string, gender: string ... 3 more fields]
[36menrichedDf[39m: [32mDataFrame[39m = [user_id: string, artist_id: string ... 9 more fields]

## 14) Wrap-up
**Key takeaways:**
- Data quality checks caught invalid timestamps, missing IDs, and empty fields.
- Clear policies and documented assumptions make the pipeline reproducible.
- Outputs (curated plays + metrics + profiles) are ready for downstream use in sessionization and top-track analysis.

**End of Notebook**