# Job 1 optimized version

In [None]:
import org.apache.spark

### Schema definitions

In [None]:
import org.apache.spark.sql.types._
import java.sql.Timestamp

val reviewSchema = StructType(
  Seq(
    StructField("user_id",  StringType,            nullable = true),
    StructField("name",     StringType,            nullable = true),
    StructField("time",     LongType,              nullable = false),
    StructField("rating",   DoubleType,            nullable = true),
    StructField("text",     StringType,            nullable = true),
    StructField("pics",     ArrayType(StringType), nullable = true),
    StructField("resp",     StructType(
      Seq(
        StructField("time", LongType,              nullable = false),
        StructField("text", StringType,            nullable = true)
      )
    ),                                             nullable = true),
    StructField("gmap_id",  StringType,            nullable = false),
  )
)

case class Response(time: Timestamp, text: Option[String])

case class Review(
  user_id: Option[String],
  name: Option[String],
  time: Timestamp,
  rating: Option[Double],
  text: Option[String],
  pics: Seq[String],
  resp: Option[Response],
  gmap_id: String
)

In [None]:
val metadataSchema = StructType(
  Seq(
    StructField("name",             StringType,                                 nullable = true),
    StructField("address",          StringType,                                 nullable = true),
    StructField("gmap_id",          StringType,                                 nullable = false),
    StructField("description",      StringType,                                 nullable = true),
    StructField("latitude",         DoubleType,                                 nullable = false),
    StructField("longitude",        DoubleType,                                 nullable = false),
    StructField("category",         ArrayType(StringType),                      nullable = true),
    StructField("avg_rating",       DoubleType,                                 nullable = false),
    StructField("num_of_reviews",   IntegerType,                                nullable = false),
    StructField("price",            StringType,                                 nullable = false),
    StructField("hours",            ArrayType(ArrayType(StringType)),           nullable = true),
    StructField("MISC",             MapType(StringType, ArrayType(StringType)), nullable = false),
    StructField("state",            StringType,                                 nullable = true),
    StructField("relative_results", ArrayType(StringType),                      nullable = true),
    StructField("url",              StringType,                                 nullable = false),
  )
)

case class Metadata(
  name: Option[String],
  address: Option[String],
  gmap_id: String,
  description: Option[String],
  latitude: Double,
  longitude: Double,
  category: Seq[String],
  avg_rating: Double,
  num_of_reviews: Int,
  price: String,
  hours: Seq[Seq[String]],
  MISC: Map[String, Seq[String]],
  state: Option[String],
  relative_results: Seq[String],
  url: String
)

### Dataset load and parse

In [None]:
import java.nio.file.Paths
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions.from_unixtime

val projectDir = Paths.get(System.getProperty("user.dir")).getParent.getParent.getParent.toString
val reviewsPath = s"$projectDir/dataset/sample-reviews.ndjson"
val metadataPath = s"$projectDir/dataset/metadata.ndjson"

val spark = SparkSession.builder()
  .appName("NDJSON Reader")
  .master("local[*]") // Needed in local mode
  .getOrCreate()

val reviewsDf = spark.read
  .schema(reviewSchema)
  .json(reviewsPath)
  .withColumn("pics", when (col("pics") isNull, array()) otherwise col("pics"))
  .withColumn("time", from_unixtime(col("time") / 1000).cast("timestamp"))
  .withColumn("resp", 
    when (
      col("resp") isNotNull, 
      struct(
        from_unixtime(col("resp.time") / 1000).cast("timestamp").alias("time"),
        col("resp.text").cast(StringType).alias("text")
      )
    ) otherwise lit(null)
  )
  .as[Review]

val metadataDf = spark.read
  .schema(metadataSchema)
  .json(metadataPath)
  .withColumn("category", when (col("category") isNull, array()) otherwise col("category"))
  .withColumn("hours", when (col("hours") isNull, array()) otherwise col("hours"))
  .withColumn("relative_results", when (col("relative_results") isNull, array()) otherwise col("relative_results"))
  .as[Metadata]

reviewsDf.printSchema()
metadataDf.printSchema()

// Unforturnately, it seems that Spark does not support case classes in RDDs. It throws ArrayStoreException
// when trying to collect the RDD... [see also [here](https://github.com/adtech-labs/spylon-kernel/issues/40)]
val reviewsRdd = reviewsDf.rdd
  .map(Review.unapply(_).get)
  .map { case review @ (_, _, _, _, _, _, resp, _) => review.copy(_7 = resp.map(Response.unapply(_).get)) }
val metaRdd = metadataDf.rdd.map(Metadata.unapply).map(_.get)

### Optimized version

L'obiettivo di questo job è capire, anno per anno, se una maggiore frequenza nella risposta alle recensioni ha un impatto sulla valutazione media ricevuta.
In particolare:
- Per ogni anno e business si calcola la media delle recensioni, il rate e il tempo medio di risposta;
- Sulla base del rate e del tempo medio di risposta viene calcolata un attributo aggiuntivo “response strategy” che categorizza il business in un particolare anno in 4 categorie (“Rapid and frequent”, “Slow but frequent”, “Occasional” o “Rare or none”);
- Aggregazione in base alla "response strategy", l'anno e lo stato per ottenere il rate medio e il numero di business all'interno della categoria.

---

The goal of this job is to understand, year by year, whether greater frequency in responding to reviews has an impact on the average rating received.

Specifically:

- For each year and business, the average rating, rate, and average response time are calculated;
- Based on the rate and average response time, an additional attribute "response strategy" is calculated that categorizes the business in a particular year into four categories ("Rapid and frequent," "Slow but frequent," "Occasional," or "Rare or none");
- Aggregation based on the "response strategy," year, and state to get the average rate and number of businesses within the category.

---

**Metadata**: (name, address, <ins>gmap_id</ins>, description, latitude, longitude, category, avg_rating, num_of_reviews, price, hours, misc, state, relative_results, url)

**Review**: (user_id, name, time, rating, text, pics, responses, <ins>gmap_id</ins>)

---

In [None]:
import java.util.concurrent.TimeUnit
import org.apache.spark.sql.SaveMode
import org.apache.spark.storage.StorageLevel

In [None]:
for ((k,v) <- sc.getPersistentRDDs) {
  v.unpersist()
}

In [None]:
def responseStrategy(avgResponseRate: Double, avgResponseTime: Double): String =
  (avgResponseRate, avgResponseTime) match {
    case (rr, rt) if rr >= 0.5 && rt <= 4 * 24 => "Rapid and frequent"
    case (rr, rt) if rr >= 0.5 => "Slow but frequent"
    case (rr, _)  if rr >= 0.15 => "Occasional"
    case _ => "Rare or none"
  }

In [None]:

def toState(address: Option[String]): String = 
  address.flatMap { addr =>
    // This regex captures the state abbreviation between a comma and the ZIP code
    val StateRegex = """,\s*([A-Z]{2})\s+\d{5}""".r
    StateRegex.findFirstMatchIn(addr).map(_.group(1))
  }.getOrElse("Unknown")

Metadata RDD filtered data is still to large ~42.6MiB. Using broadcast variables is not worth it.

To read all the metadata, only 292.6 MiB, only 3 partitions are created and took 37 seconds to read.
Moreover, partitioning seems unbalanced:

| Index | Duration | Input size / Records | Shuffle Write / Record |
|-------|----------|----------------------|------------------------|
|0      |13 s      | 128 MiB / 129954	  | 4 MiB / 129954	       |
|1  	|6 s	   | 128 MiB / 132020	  | 4 MiB / 132020	       |
|2      |1 s       | 36.5 MiB / 30914	  | 965 KiB / 30914        |  


In [None]:
val partitioner = new org.apache.spark.HashPartitioner(12)

val businessesStates = metaRdd
  .filter(_._2.isDefined) // leave only businesses with a defined address
  .map(b => b._3 -> toState(b._2)) // [(gmap_id, state)*]
  .partitionBy(partitioner)

In [None]:
val reviewsInfo = reviewsRdd
  .filter(_._4.isDefined) // filter out reviews without a rating
  .map { case (_, _, time, rating, _, _, resp, id) => (time.toLocalDateTime.getYear, id) -> (time, rating.get, resp) }
  .aggregateByKey((0.0, 0, 0L, 0))( // (ratings sum, #responses, sum of response time differences, #reviews)
    (acc, v) => {
      val (sumRatings, numResponses, sumResponseTimes, totalReviews) = acc
      val (time, rating, response) = v
      (
        sumRatings + rating,
        numResponses + (if (response.isDefined) 1 else 0),
        sumResponseTimes + (if (response.isDefined) response.get._1.getTime - time.getTime else 0L),
        totalReviews + 1
      )
    },
    (r1, r2) => (r1._1 + r2._1, r1._2 + r2._2, r1._3 + r2._3, r1._4 + r2._4)
  )
  .mapValues { case (sumRatings, numResponses, sumResponseTimes, totalReviews) => 
    val avgResponseRate = numResponses.toDouble / totalReviews
    val avgResponseTime = if (numResponses > 0) TimeUnit.MILLISECONDS.toHours(sumResponseTimes / numResponses) else Double.PositiveInfinity
    (sumRatings / totalReviews, avgResponseRate, avgResponseTime, responseStrategy(avgResponseRate, avgResponseTime))
  } // [((year, gmap_id), (avg_rating, response_rate, avg_response_time, response_strategy))*]

In [None]:
val outcome = reviewsInfo
  .map { case ((year, id), (avgRating, _, _, responseStrategy)) => id -> (year, responseStrategy, avgRating) }
  .partitionBy(partitioner)
  .join(businessesStates) // [(gmap_id, ((year, response_strategy, avg_rating), state))*]
  .map { case (_, ((year, responseStrategy, avgRating), state)) => (year, state, responseStrategy) -> (avgRating, 1) }
  .reduceByKey((a, b) => (a._1 + b._1, a._2 + b._2))
  .mapValues { case (sumRatings, totalBusinesses) => sumRatings / totalBusinesses }

In [None]:
val outputDirPath = s"$projectDir/output"

In [None]:
outcome.map { case ((year, state, responseStrategy), avgRating) => (year, state, responseStrategy, avgRating) }
  .coalesce(1, shuffle = true)
  .toDF("year", "state", "response_strategy", "avg_rating")
  .write.format("csv").option("header", "true")
  .mode(SaveMode.Overwrite)
  .save(s"file://$outputDirPath/job1-optimized-output")