In [1]:
import org.apache.spark.sql.functions._
import org.apache.spark.sql.types._

val sqlC = new org.apache.spark.sql.SQLContext(sc)
import sqlC.implicits._

val rr = spark.read.format("csv").
    option("header", "true").
    option("inferSchema", "true").
    load("Documents/kaggle/russian_realestate/train.csv").
    select("id","timestamp","full_sq","build_year","num_room","price_doc", "state")

In [2]:
//view the record count by state

  
rr.where($"state" =!= "NA").
    groupBy($"state").
    agg(count("id")).
    sort($"count(id)".desc).
    show

+-----+---------+
|state|count(id)|
+-----+---------+
|    2|     5844|
|    3|     5790|
|    1|     4855|
|    4|      422|
|   33|        1|
+-----+---------+



In [5]:
//change timestamp type to date


val rr_date = rr.withColumn("timestamp", 
                            'timestamp.cast("date"))
                            
rr_date.createOrReplaceTempView("rr_date")

In [14]:
/*WINDOW EXAMPLE #1
  compute the "price_doc" percentile ranking
  for each "state" and "timestamp" combination
  */

import org.apache.spark.sql.expressions.Window

val windowSpec = Window.
    partitionBy("state", "timestamp").
    orderBy($"price_doc".desc).
    rowsBetween(Window.unboundedPreceding, Window.currentRow)
    
rr_date.
    withColumn("rank", percent_rank.over(windowSpec)).show()

                                                                                +-----+----------+-------+----------+--------+---------+-----+-------------------+
|   id| timestamp|full_sq|build_year|num_room|price_doc|state|               rank|
+-----+----------+-------+----------+--------+---------+-----+-------------------+
| 9672|2013-08-06|     78|      2012|       2|  7074000|    1|                0.0|
|13588|2014-01-09|    102|      2014|       3| 16861350|    1|                0.0|
|13601|2014-01-09|    121|      2012|       3|  9007500|    1|0.16666666666666666|
|13581|2014-01-09|     86|        NA|       3|  8500800|    1| 0.3333333333333333|
|13587|2014-01-09|     79|        NA|       3|  7528752|    1|                0.5|
|13586|2014-01-09|     52|        NA|       2|  5326755|    1| 0.6666666666666666|
|13579|2014-01-09|     45|        NA|       1|  2937917|    1| 0.8333333333333334|
|13585|2014-01-09|     29|      2014|       1|  2027818|    1|                1.0|
|2806

In [8]:
/*WINDOW EXAMPLE #2
  compute the 3 month moving average by state
  */

val rr_monthly = rr_date.
    where($"state" =!= "NA").
    withColumn("month", month($"timestamp")).
    withColumn("year", year($"timestamp")).
    groupBy($"month", $"year", $"state").
    agg(expr("sum(price_doc) as monthly_sales"),
        expr("count(price_doc) as sale_count")).
    sort($"state", $"year", $"month")

val windowSpec2 = Window.
    partitionBy("state").
    orderBy($"year", $"month").
    rowsBetween(-2, 0)
    
rr_monthly.
    withColumn("moving_avg", avg($"monthly_sales").
    over(windowSpec2)).
    withColumn("moving_avg", 'moving_avg.cast("Int")).
    show()

+-----+----+-----+-------------+----------+----------+
|month|year|state|monthly_sales|sale_count|moving_avg|
+-----+----+-----+-------------+----------+----------+
|    5|2013|    3|     30950000|         4|  30950000|
|    6|2013|    3|    217846600|        32| 124398300|
|    7|2013|    3|    521778056|        70| 256858218|
|    8|2013|    3|    923726417|       126| 554450357|
|    9|2013|    3|   1357393641|       184| 934299371|
|   10|2013|    3|   1994553122|       276|1425224393|
|   11|2013|    3|   2123674708|       276|1825207157|
|   12|2013|    3|   2000393830|       255|2039540553|
|    1|2014|    3|   1770753850|       235|1964940796|
|    2|2014|    3|   2450430643|       327|2073859441|
|    3|2014|    3|   2587131989|       324|2147483647|
|    4|2014|    3|   2851532969|       355|2147483647|
|    5|2014|    3|   2639660342|       310|2147483647|
|    6|2014|    3|   2663189543|       346|2147483647|
|    7|2014|    3|   1697161681|       223|2147483647|
|    8|201

In [13]:
/*WINDOW EXAMPLE #3
  count successive months of sales by state
 */
     
val r2 = rr_monthly.
     withColumn("month_id", concat($"year", lit("-"), $"month", lit("-01")))
     
val r3 = r2.withColumn("month_id", 'month_id.cast("date")) 

val windowSpec3 = Window.
    partitionBy("state").
    orderBy($"month_id")
    
r3.
    withColumn("prev_month",
                lag($"month_id", 1).over(windowSpec3)).
    withColumn("month_diff", months_between($"month_id", $"prev_month")).
    show(20)




+-----+----+-----+-------------+----------+----------+----------+----------+
|month|year|state|monthly_sales|sale_count|  month_id|prev_month|month_diff|
+-----+----+-----+-------------+----------+----------+----------+----------+
|    5|2013|    3|     30950000|         4|2013-05-01|      null|      null|
|    6|2013|    3|    217846600|        32|2013-06-01|2013-05-01|       1.0|
|    7|2013|    3|    521778056|        70|2013-07-01|2013-06-01|       1.0|
|    8|2013|    3|    923726417|       126|2013-08-01|2013-07-01|       1.0|
|    9|2013|    3|   1357393641|       184|2013-09-01|2013-08-01|       1.0|
|   10|2013|    3|   1994553122|       276|2013-10-01|2013-09-01|       1.0|
|   11|2013|    3|   2123674708|       276|2013-11-01|2013-10-01|       1.0|
|   12|2013|    3|   2000393830|       255|2013-12-01|2013-11-01|       1.0|
|    1|2014|    3|   1770753850|       235|2014-01-01|2013-12-01|       1.0|
|    2|2014|    3|   2450430643|       327|2014-02-01|2014-01-01|       1.0|

In [9]:
/*ROLLUP EXAMPLE
  show total sales, avg monthly sales and total sale count 
  rolled up by total, by year, by year + state
 */
 
val rr_rolledup = rr_monthly.rollup("year", "state").
  agg(expr("sum(monthly_sales) as total_sales"), 
      expr("avg(monthly_sales) as avg_sales"),
      expr("sum(sale_count) as sale_count")).
  select("year", "state", "total_sales", "sale_count", "avg_sales").
  sort($"year", $"state").
  withColumn("avg_sales", 'avg_sales.cast("Int"))
  
rr_rolledup.
    withColumn("year", 'year.cast("String")).
    withColumn("state", 'state.cast("String")).
    na.fill(Map("year" -> "total", 
                "state" -> "total")).
    show()

+-----+-----+------------+----------+----------+
| year|state| total_sales|sale_count| avg_sales|
+-----+-----+------------+----------+----------+
|total|total|129189711517|     16912|1266565799|
| 2013|total| 21826233730|      3095| 727541124|
| 2013|    1|  4704118611|       717| 588014826|
| 2013|    2|  6956268549|      1065| 993752649|
| 2013|    3|  9170316374|      1223|1146289546|
| 2013|   33|     9000000|         1|   9000000|
| 2013|    4|   986530196|        89| 164421699|
| 2014|total| 84002476237|     11040|1750051588|
| 2014|    1| 21631815551|      3003|1802651295|
| 2014|    2| 28112799931|      3981|2147483647|
| 2014|    3| 30355558335|      3773|2147483647|
| 2014|    4|  3902302420|       283| 325191868|
| 2015|total| 23361001550|      2777| 973375064|
| 2015|    1|  9180526281|      1135|1530087713|
| 2015|    2|  6189946115|       798|1031657685|
| 2015|    3|  7247573933|       794|1207928988|
| 2015|    4|   742955221|        50| 123825870|
+-----+-----+-------

In [11]:
/*CUBED EXAMPLE
  show total sales, avg monthly sales and total sale count 
  cubeded up by total, by year, by year + state
  the difference between cube and rollup is that cube totals 
  for the first sort column by all other columns
 */
 
val rr_cubed = rr_monthly.cube("year", "state").
  agg(expr("sum(monthly_sales) as total_sales"), 
      expr("avg(monthly_sales) as avg_sales"),
      expr("sum(sale_count) as sale_count")).
  select("year", "state", "total_sales", "sale_count", "avg_sales").
  sort($"year", $"state").
  withColumn("avg_sales", 'avg_sales.cast("Int"))
  
rr_cubed.
    withColumn("year", 'year.cast("String")).
    withColumn("state", 'state.cast("String")).
    na.fill(Map("year" -> "total", 
                "state" -> "total")).
    show()

                                                                                +-----+-----+------------+----------+----------+
| year|state| total_sales|sale_count| avg_sales|
+-----+-----+------------+----------+----------+
|total|total|129189711517|     16912|1266565799|
|total|    1| 35516460443|      4855|1366017709|
|total|    2| 41259014595|      5844|1650360583|
|total|    3| 46773448642|      5790|1798978793|
|total|   33|     9000000|         1|   9000000|
|total|    4|  5631787837|       422| 234657826|
| 2013|total| 21826233730|      3095| 727541124|
| 2013|    1|  4704118611|       717| 588014826|
| 2013|    2|  6956268549|      1065| 993752649|
| 2013|    3|  9170316374|      1223|1146289546|
| 2013|   33|     9000000|         1|   9000000|
| 2013|    4|   986530196|        89| 164421699|
| 2014|total| 84002476237|     11040|1750051588|
| 2014|    1| 21631815551|      3003|1802651295|
| 2014|    2| 28112799931|      3981|2147483647|
| 2014|    3| 30355558335|      3773

In [12]:
/*PIVOT EXAMPLE
  equivelant to R's dcast
 */

val rr_pivot = rr_monthly.
  groupBy("year").
  pivot("state").
  agg(expr("sum(monthly_sales) as total_sales"))
  
rr_pivot.sort("year").na.fill(0).show()

+----+-----------+-----------+-----------+-------+----------+
|year|          1|          2|          3|     33|         4|
+----+-----------+-----------+-----------+-------+----------+
|2013| 4704118611| 6956268549| 9170316374|9000000| 986530196|
|2014|21631815551|28112799931|30355558335|      0|3902302420|
|2015| 9180526281| 6189946115| 7247573933|      0| 742955221|
+----+-----------+-----------+-----------+-------+----------+

