In [None]:
val filePath = "NYPD_Complaint_Data_Historic.csv"

val rawDF = spark.read
  .option("header", "true")
  .option("multiLine", "true")
  .option("inferSchema", "true")
  .option("escape", "\"")
  .csv(filePath)

z.show(rawDF)

In [1]:
val baseDF = rawDF.select(
  "cmplnt_num",
  "cmplnt_fr_dt",
  "cmplnt_fr_tm",
  "cmplnt_to_dt",
  "cmplnt_to_tm",
  "rpt_dt",
  "ofns_desc",
  "law_cat_cd",
  "boro_nm",
  "susp_age_group",
  "susp_race",
  "susp_sex",
  "latitude",
  "longitude",
  "vic_age_group",
  "vic_race",
  "vic_sex",
)

baseDF.cache().count

In [2]:
z.show(baseDF)

In [3]:
import org.apache.spark.sql.functions._
import org.apache.spark.sql.types._

val baseDF1 = baseDF.na.drop().withColumn("cmplnt_fr_dt", to_date(col("cmplnt_fr_dt"), "MM/dd/yyyy")).
                      withColumn("cmplnt_to_dt",  to_date(col("cmplnt_fr_dt"), "MM/dd/yyyy")).
                      withColumn("rpt_dt",  to_date(col("cmplnt_fr_dt"), "MM/dd/yyyy"))


In [4]:
z.show(baseDF1)

In [5]:
val startDate = "2020-01-01"
val endDate = "2022-12-31"

val baseDF2 = baseDF1.filter($"rpt_dt" >= startDate && $"rpt_dt" <= endDate)

In [6]:
baseDF2.count()

In [7]:
val allowedAges = Seq("<18", "18-24", "25-44", "45-64", "65+")
val baseDF3 = baseDF2.withColumn("susp_age_group", when(col("susp_age_group").isin(allowedAges: _*), col("susp_age_group")).otherwise(null))
                        .withColumn("vic_age_group", when(col("vic_age_group").isin(allowedAges: _*), col("vic_age_group")).otherwise(null))

In [8]:
val columns = baseDF3.columns
val replacementValues = Seq("(null)", "UNKNOWN")
val baseDF4 = columns.foldLeft(baseDF3) { (tempDF, colName) =>
  tempDF.withColumn(colName, when(col(colName).isin(replacementValues: _*), null).otherwise(col(colName)))
}

In [9]:
z.show(baseDF4)

In [10]:
baseDF4.count()

In [11]:
import org.apache.spark.sql.types._

val baseDF5 = baseDF4.withColumn("latitude", col("latitude").cast(DoubleType))
                .withColumn("longitude", col("longitude").cast(DoubleType))
                .withColumn("cmplnt_fr_tm", date_format(to_timestamp(col("cmplnt_fr_tm"), "HH:mm:ss"), "HH:mm:ss"))
                .withColumn("cmplnt_to_tm", date_format(to_timestamp(col("cmplnt_to_tm"), "HH:mm:ss"), "HH:mm:ss"))


In [12]:
z.show(baseDF5)

In [13]:
baseDF5.printSchema

In [14]:
z.show(baseDF5.select("boro_nm").distinct())

In [15]:
z.show(baseDF5.select("vic_age_group").distinct())

In [16]:
z.show(baseDF5.select("susp_race").distinct())

In [17]:
z.show(baseDF5.select("ofns_desc").distinct())

In [18]:
val filePath = "zipcodes.csv"

val zipcodes = spark.read
  .option("header", "true")
  .option("multiLine", "true")
  .option("inferSchema", "false")
  .option("escape", "\"")
  .csv(filePath)

z.show(zipcodes)

In [19]:
val zipcodesDF = zipcodes.withColumn("LAT", col("LAT").cast(DoubleType))
                         .withColumn("LNG", col("LNG").cast(DoubleType))

In [20]:
import org.apache.spark.sql.DataFrame
import org.apache.spark.sql.functions.broadcast
import org.apache.spark.sql.functions.udf

def haversine(lat1: Double, lon1: Double, lat2: Double, lon2: Double): Double = {
    val latDistance = Math.toRadians(lat1 - lat2)
    val lngDistance = Math.toRadians(lon1 - lon2)
    val sinLat = Math.sin(latDistance / 2)
    val sinLng = Math.sin(lngDistance / 2)
    val a = sinLat * sinLat +
    (Math.cos(Math.toRadians(lat1)) *
        Math.cos(Math.toRadians(lat2)) *
        sinLng * sinLng)
    val c = 2 * Math.atan2(Math.sqrt(a), Math.sqrt(1 - a))
    (6371 * c).toInt
}

val haversineUDF = udf(haversine _)

val broadcastUSZipDf = sc.broadcast(zipcodesDF.collect())

val getNearestZipCode = udf((lat1: Double, lon1: Double) => {
    var minDistance = Double.MaxValue
    var nearestZipCode = ""

    broadcastUSZipDf.value.foreach { row =>
        val lat2 = row.getAs[Double]("LAT")
        val lon2 = row.getAs[Double]("LNG")
        val distance = haversine(lat1, lon1, lat2, lon2)
        if (distance < minDistance) {
            minDistance = distance
            nearestZipCode = row.getAs[String]("ZIP")
        }
    }
    nearestZipCode
})

val baseDF6 = baseDF5.withColumn("zipcode", getNearestZipCode(col("latitude"), col("longitude")))

In [21]:
z.show(baseDF6)