In [None]:
import $ivy.`org.apache.spark::spark-sql:2.4.7`

import org.apache.log4j.{Level, Logger}
Logger.getLogger("org").setLevel(Level.OFF)

import org.apache.spark.sql._

val spark = NotebookSparkSession
    .builder()
    .master("local[*]")
    .getOrCreate()

In [None]:
def sc = spark.sparkContext

val rdd = sc.parallelize(1 to 100000000, 100)

val n = rdd.map(x => x + 1).filter(x => x > 100).reduce((x, y) => x + y)

// 0 1 2 3 4
// map
// 1 2 3 4 5
// reduce(f)
// f(1, 2) f(3, 4) 5
// 3 7 5
// f(3, 7) 5
// 10 5
// f(10, 5)
// 15

In [None]:
val lines = sc.textFile("data/crime.csv")

val lineLengths = lines.map(s => s.length)

val totalLength = lineLengths.reduce((a, b) => a + b)

In [None]:
val lines = sc.textFile("data/crime.csv").cache()

val totalLength = lines
    .map(s => s.length)
    .reduce((a, b) => a + b)

val counts = lines
    .flatMap(x => x.split(","))
    .map(s => (s, 1))     // RDD[(key, value)]
    .reduceByKey((a, b) => a + b)
    .sortBy(x => -x._2)
    .take(10)

In [None]:
val crimeFacts = spark
    .read
    .option("header", "true")
    .option("inferSchema", "true")
    .csv("data/crime.csv")

crimeFacts.show

In [None]:
crimeFacts.printSchema

In [None]:
import spark.implicits._

$"OFFENSE_CODE" + 1
"OFFENSE_CODE" + 1
// crimeFacts.select($"OFFENSE_CODE" + 1)
// crimeFacts.select("OFFENSE_CODE" + 1)

In [None]:
crimeFacts.createOrReplaceTempView("crimes")

In [None]:
spark.sql("select INCIDENT_NUMBER, DISTRICT from crimes limit 10").show

In [None]:
import spark.implicits._

org.apache.spark.sql.catalyst.encoders.OuterScopes.addOuterScope(this) // Almond bug

case class Crime (
    INCIDENT_NUMBER: Option[String],
    OFFENSE_CODE: Option[Int],
    OFFENSE_CODE_GROUP: Option[String],
    OFFENSE_DESCRIPTION: Option[String],
    DISTRICT: Option[String],
    REPORTING_AREA: Option[String],
    SHOOTING: Option[String],
    OCCURRED_ON_DATE: Option[String],
    YEAR: Option[Int],
    MONTH: Option[Int],
    DAY_OF_WEEK: Option[String],
    HOUR: Option[Int],
    UCR_PART: Option[String],
    STREET: Option[String],
    Lat: Option[Double],
    Long: Option[Double],
    Location: Option[String]
) {
    
    def wasShooting: Boolean = { SHOOTING.nonEmpty }
    
}

crimeFacts.as[Crime].filter(x => x.OFFENSE_DESCRIPTION == Some("VANDALISM")).show
crimeFacts.as[Crime].filter(x => x.wasShooting).show
crimeFacts.as[Crime].take(10).foreach(x => println(x.SHOOTING))

In [None]:
val offenseCodes = spark.read.option("header", "true").option("inferSchema", "true").csv("data/offense_codes.csv")
offenseCodes.show(false)

In [None]:
import spark.implicits._

crimeFacts
    .join(offenseCodes, $"CODE" === $"OFFENSE_CODE")
    .where($"NAME".startsWith("ROBBERY"))
    .groupBy($"NAME")
    .count()
    .orderBy($"count".desc)
    .show(false)

In [None]:
import org.apache.spark.sql.functions.broadcast

val offenseCodesBroadcast = broadcast(offenseCodes)

val robberyStatsWithBroadcast = crimeFacts
    .join(offenseCodesBroadcast, $"CODE" === $"OFFENSE_CODE")
    .filter($"NAME".startsWith("ROBBERY"))
    .groupBy($"NAME")
    .count()
    .orderBy($"count".desc)
    
robberyStatsWithBroadcast.show

In [None]:
// udf