# CSCIGA 2437 Project: Data Ingestion
## Chenmeinian Guo (cg3972)

## Data Profiling

In [2]:
import org.apache.spark.sql.SparkSession

val spark = SparkSession.builder()
  .appName("NYC Property Sales Analysis")
  .getOrCreate()

val filePath = "./nyc-property-sales.csv"
val rawData = spark.read
  .option("header", "true")
  .option("inferSchema", "true")
  .csv(filePath)

rawData.printSchema()
z.show(rawData)

In [3]:
val totalEntries = rawData.count().toDouble

val nullCounts = rawData.columns.map { colName =>
  val nullCount = rawData.filter(col(colName).isNull).count()
  val nullPercentage = (nullCount / totalEntries) * 100
  (colName, nullCount, nullPercentage)
}

println("Column-wise null value analysis:")
nullCounts.foreach { case (colName, nullCount, nullPercentage) =>
  println(f"$colName%-30s $nullCount%-10d ($nullPercentage%.2f%%)")
}

## Data Cleaning

In [5]:
// here we verified that the column "EASE-MENT" is useless because it's mostly either null or empty, only 14 entries are valid strings
import org.apache.spark.sql.functions._

rawData.filter(col("EASE-MENT").isNotNull).select("EASE-MENT").distinct().show(5) // unique vals for EASE-MENT

val countNullOrEmpty = cleanedData.filter(col("EASE-MENT").isNull || trim(col("EASE-MENT")) === "").count()
val countNonNullOrEmpty = cleanedData.filter(!(col("EASE-MENT").isNull || trim(col("EASE-MENT")) === "")).count()

println(s"Number of null and empty values in EASE-MENT: $countNullOrEmpty")
println(s"Number of string values in EASE-MENT: $countNonNullOrEmpty")


In [6]:
// a quick look on the columns that have null vals
cleanedData.select("RESIDENTIAL UNITS").show(5) 
cleanedData.select("COMMERCIAL UNITS").show(5) 
cleanedData.select("TOTAL UNITS").show(5)
cleanedData.select("GROSS SQUARE FEET").show(5)
cleanedData.select("LAND SQUARE FEET").show(5)

In [7]:
/* 
clean dataset: 
    1. change " ZIP CODE" (with an unexpected space) to "ZIP CODE"
    2. drop the few entries (e.g. "ZIP CODE") that have important null values
    3. columns "APARTMENT NUMBER" and "EASE-MENT" are not useful for our analysis, so we simply drop it
*/
val cleanedData = rawData
                .withColumnRenamed(" ZIP CODE", "ZIP CODE")
                .na.drop(Seq(
                "TAX CLASS AT PRESENT", "BUILDING CLASS AT PRESENT",
                "ZIP CODE",
                "YEAR BUILT",
                "RESIDENTIAL UNITS", "COMMERCIAL UNITS"
                ))
                .drop("APARTMENT NUMBER")
                .drop("EASE-MENT")

In [8]:
val nullCountsAfterCleansing = cleanedData.columns.map { colName =>
  val nullCount = cleanedData.filter(col(colName).isNull).count()
  val nullPercentage = (nullCount / totalEntries) * 100
  (colName, nullCount, nullPercentage)
}

println("Column-wise null value analysis:")
nullCountsAfterCleansing.foreach { case (colName, nullCount, nullPercentage) =>
  println(f"$colName%-30s $nullCount%-10d ($nullPercentage%.2f%%)")
}

In [9]:
/*
    while the column "LAND SQUARE FEET" is very important,
    until this step, we've checked there's just that one entry that has a null value 
    (which is "LAND SQAURE FEET"). Therefore, we can safely drop this entry without 
    significantly affecting our dataset
*/
val fullyCleanedData = cleanedData
                    .filter(col("LAND SQUARE FEET").isNotNull)

// we should only consider sucessfull sales (where column "SALE PRICE" is above zero)
// but we found that too much data have zero for sales so we need to analyze it
val cleanedDataRatio = fullyCleanedData.filter($"SALE PRICE" === 0).count() / totalEntries
val totalCleanedEntries = fullyCleanedData.count()

println("Percentage of data reserved after cleaning: $cleanedDataRatio")
println("Remaining records after cleaning: $totalCleanedEntries")

Initially, I hypothesized that `SALE PRICE = 0` entries might be mostly from the earlier years, such as 2003, due to incomplete records or less robust data collection practices at the time. However, the analysis shows that zero-sale cases occur consistently across all years, including 19,643 instances in 2021, though in declining trend. **(see below)**

In [11]:
val dataWithYear = fullyCleanedData.withColumn(
  "YEAR",
  year(to_date(col("SALE DATE"), "yyyy-MM-dd HH:mm:ss"))
)

val zeroSaleByYear = dataWithYear
  .filter(col("SALE PRICE") === 0)
  .groupBy("YEAR")
  .agg(count("*").alias("ZERO_SALE_COUNT"))
  .orderBy(desc("ZERO_SALE_COUNT"))

val zeroSalePercentageByYear = dataWithYear
  .filter(col("SALE PRICE") === 0)
  .groupBy("YEAR")
  .agg(
    count("*").alias("ZERO_SALE_COUNT"),
    (count("*") / fullyCleanedData.count() * 100).alias("PERCENTAGE_OF_TOTAL")
  )
  .orderBy(desc("PERCENTAGE_OF_TOTAL"))

z.show(zeroSalePercentageByYear)


After doing some research online, I noticed that entries with `SALE PRICE = 0` often represent non-standard transactions such as donations, inheritances, family transfers, government acquisitions, or incomplete sales. These do not reflect typical market transactions and may skew price trend analyses. To improve accuracy, I removed all entries with SALE PRICE = 0. However, analyzing these cases separately could reveal insights into specific property transfer patterns or anomalies, though the primary focus will remain on valid sales for investment analysis.

In [13]:
// calculate mean and std
val stats = fullyCleanedData.select(mean("SALE PRICE"), stddev("SALE PRICE")).first()
val meanPrice = stats.getDouble(0)
val stdDevPrice = stats.getDouble(1)

// remove all entries where the sale price is zero
val filteredData = fullyCleanedData.filter(col("SALE PRICE") !== 0)

println(s"Remaining records after filtering: ${filteredData.count()}")


In [14]:
z.show(filteredData)