In [3]:
// Vars
val executors = sc.getConf.getInt("spark.executor.instances", 1)
val corePerExecutor = sc.getConf.getInt("spark.executor.cores", 1)
val partitions = 4 * executors * corePerExecutor
val replicationFactor = 100

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

executors: Int = 1
corePerExecutor: Int = 2
partitions: Int = 8
replicationFactor: Int = 100


In [4]:
// Read DF
val sqlContext = spark.sqlContext
val table = "usedcars.gg6_bigdata_used_cars"
val rawDF = sqlContext.sql("select * from " + table)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

sqlContext: org.apache.spark.sql.SQLContext = org.apache.spark.sql.SQLContext@7ed718ab
table: String = usedcars.gg6_bigdata_used_cars
rawDF: org.apache.spark.sql.DataFrame = [id: bigint, url: string ... 23 more fields]


In [6]:
// 1.Preprocessing
val cleanDF = rawDF.select("region", "price", "manufacturer", "fuel", "odometer").withColumnRenamed("manufacturer", "brand").na.drop

import org.apache.spark.sql.functions._
val preprocessedDF = cleanDF.withColumn("dummy", explode(array((0 until replicationFactor).map(lit): _*))).selectExpr(cleanDF.columns: _*).repartition(partitions)

preprocessedDF.cache

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

cleanDF: org.apache.spark.sql.DataFrame = [region: string, price: bigint ... 3 more fields]
import org.apache.spark.sql.functions._
preprocessedDF: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [region: string, price: bigint ... 3 more fields]
res11: preprocessedDF.type = [region: string, price: bigint ... 3 more fields]


In [7]:
// 2a.Opi
val df2a = preprocessedDF.withColumn("opi", $"odometer" / $"price").select("brand", "opi").groupBy("brand").avg().withColumnRenamed("avg(opi)", "opi")

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

df2a: org.apache.spark.sql.DataFrame = [brand: string, opi: double]


In [8]:

// 2b.Region
val brandCount = preprocessedDF.select("region", "brand").groupBy("region", "brand").count
brandCount.cache 
val brandMax = brandCount.groupBy("region").max("count")
val df2b = brandMax.withColumn("count", col(brandMax.columns(1))).join(broadcast(brandCount), Seq("region", "count")).filter($"count" === $"max(count)").groupBy("region").agg(first("brand").as("brand"))

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

brandCount: org.apache.spark.sql.DataFrame = [region: string, brand: string ... 1 more field]
res14: brandCount.type = [region: string, brand: string ... 1 more field]
brandMax: org.apache.spark.sql.DataFrame = [region: string, max(count): bigint]
df2b: org.apache.spark.sql.DataFrame = [region: string, brand: string]


In [9]:
// 3.Join
val res = df2a.join(broadcast(df2b), Seq("brand")).select("region", "brand", "opi")
//res.collect

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

res: org.apache.spark.sql.DataFrame = [region: string, brand: string ... 1 more field]
res16: Array[org.apache.spark.sql.Row] = Array([auburn,toyota,109.84899295984363], [los angeles,toyota,109.84899295984363], [sierra vista,toyota,109.84899295984363], [santa fe / taos,toyota,109.84899295984363], [boulder,toyota,109.84899295984363], [hawaii,toyota,109.84899295984363], [owensboro,toyota,109.84899295984363], [sacramento,toyota,109.84899295984363], [san diego,toyota,109.84899295984363], [hanford-corcoran,toyota,109.84899295984363], [cape cod / islands,toyota,109.84899295984363], [clovis / portales,toyota,109.84899295984363], [stockton,toyota,109.84899295984363], [logan,jeep,110.58977788127979], [st louis,dodge,302.8080474828895], [tyler / east TX,ford,133.02811289726213], [athens,ford,133.02811289726213], [santa barbara,ford,133.02811289726213], [norfolk / hampton roads,ford...

In [15]:
// Write results
res.write.option("path", "s3://gg6-used-cars-results/res").saveAsTable("Results")

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…