In [2]:
import org.apache.spark.sql.functions.udf
import org.joda.time.{DateTime, Days}

import org.apache.spark.sql.functions.udf
import org.joda.time.{DateTime, Days}


In [3]:
val BASE_PATH = "file:///Users/ytu-egemen-zeytinci/Lessons/spark_scala_tutorial/data"
val mapping = spark.read.option("header", "true").csv(s"${BASE_PATH}/trivago_mapping.csv")
val prices = spark.read.option("header", "true").csv(s"${BASE_PATH}/trivago_prices.csv")

BASE_PATH: String = file:///Users/ytu-egemen-zeytinci/Lessons/spark_scala_tutorial/data
mapping: org.apache.spark.sql.DataFrame = [trivago_code: string, cosmos_code: string]
prices: org.apache.spark.sql.DataFrame = [code: string, check_in: string ... 5 more fields]


In [4]:
mapping.show(5)

+------------+-----------+
|trivago_code|cosmos_code|
+------------+-----------+
|      100000|     1133fc|
|     1000001|     13239e|
|    10000022|     1e6c78|
|     1000005|     13e9d2|
|     1000005|     1a6005|
+------------+-----------+
only showing top 5 rows



In [5]:
prices.show(5)

+-------+----------+----------+---+---+------+-----------+
|   code|  check_in| check_out|pos|pax| price|create_date|
+-------+----------+----------+---+---+------+-----------+
|1000005|2019-10-23|2019-10-27| ph|  2| 59.16| 2019-10-21|
|1000005|2019-11-12|2019-11-15| us|2,4|160.27| 2019-10-21|
|1000005|2020-02-28|2020-03-06| ca|  1|    72| 2019-11-06|
|1000005|2020-02-28|2020-03-06| ca|  2|    72| 2019-11-06|
|1000005|2020-04-09|2020-04-12| ph|3,4|187.56| 2019-11-01|
+-------+----------+----------+---+---+------+-----------+
only showing top 5 rows



In [7]:
val columns = List(
    col("cosmos_code"), 
    col("check_in"), 
    col("check_out"), 
    col("pos"), 
    col("pax"), 
    col("price"), 
    col("create_date")
)
val merged = mapping.alias("m")
    .join(prices.alias("p"), col("p.code") === col("m.trivago_code"))
    .select(columns: _*)

columns: List[org.apache.spark.sql.Column] = List(cosmos_code, check_in, check_out, pos, pax, price, create_date)
merged: org.apache.spark.sql.DataFrame = [cosmos_code: string, check_in: string ... 5 more fields]


In [84]:
val uniqueColumns = List[String](
    "cosmos_code", 
    "check_in", 
    "check_out", 
    "pos", 
    "pax",
    "create_date"
)
val dropped = merged.dropDuplicates(uniqueColumns)

uniqueColumns: List[String] = List(cosmos_code, check_in, check_out, pos, pax, create_date)
dropped: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [cosmos_code: string, check_in: string ... 5 more fields]


In [85]:
dropped.show(5)

+-----------+----------+----------+---+---+------+-----------+
|cosmos_code|  check_in| check_out|pos|pax| price|create_date|
+-----------+----------+----------+---+---+------+-----------+
|     14ba0e|2019-10-24|2019-10-26| ES|  2| 63.89| 2019-10-10|
|     1e4fdf|2020-06-27|2020-07-09| ES|  2|149.82| 2019-10-10|
|     192ddc|2019-10-20|2019-10-21| FR|  2|122.49| 2019-10-10|
|     193cda|2019-10-24|2019-10-27| FR|  2|111.16| 2019-10-10|
|     13705c|2019-11-09|2019-11-10| IT|2,2|109.88| 2019-10-10|
+-----------+----------+----------+---+---+------+-----------+
only showing top 5 rows



In [86]:
dropped.schema

res19: org.apache.spark.sql.types.StructType = StructType(StructField(cosmos_code,StringType,true), StructField(check_in,StringType,true), StructField(check_out,StringType,true), StructField(pos,StringType,true), StructField(pax,StringType,true), StructField(price,StringType,true), StructField(create_date,StringType,true))


In [14]:
def findTotal(price: Double, checkIn: String, checkOut: String): Double = {
    val ci = DateTime.parse(checkIn)
    val co = DateTime.parse(checkOut)
    
    price * Days.daysBetween(ci, co).getDays
}

findTotal: (price: Double, checkIn: String, checkOut: String)Double


In [12]:
val totalUDF = udf[Double, Double, String, String](findTotal)

totalUDF: org.apache.spark.sql.expressions.UserDefinedFunction = UserDefinedFunction(<function3>,DoubleType,Some(List(DoubleType, StringType, StringType)))


In [95]:
val newDF = dropped.withColumn("totalPrice", totalUDF(col("price"), col("check_in"), col("check_out")))

newDF: org.apache.spark.sql.DataFrame = [cosmos_code: string, check_in: string ... 6 more fields]


In [13]:
val lower: String => String = _.toLowerCase

lower: String => String = <function1>


In [97]:
val lowerUDF = udf(lower)

lowerUDF: org.apache.spark.sql.expressions.UserDefinedFunction = UserDefinedFunction(<function1>,StringType,Some(List(StringType)))


In [98]:
val lastDF = newDF.withColumn("pos", lowerUDF(col("pos")))

lastDF: org.apache.spark.sql.DataFrame = [cosmos_code: string, check_in: string ... 6 more fields]


In [99]:
lastDF.show(5)

+-----------+----------+----------+---+---+------+-----------+----------+
|cosmos_code|  check_in| check_out|pos|pax| price|create_date|totalPrice|
+-----------+----------+----------+---+---+------+-----------+----------+
|     14ba0e|2019-10-24|2019-10-26| es|  2| 63.89| 2019-10-10|    127.78|
|     1e4fdf|2020-06-27|2020-07-09| es|  2|149.82| 2019-10-10|   1797.84|
|     192ddc|2019-10-20|2019-10-21| fr|  2|122.49| 2019-10-10|    122.49|
|     193cda|2019-10-24|2019-10-27| fr|  2|111.16| 2019-10-10|    333.48|
|     13705c|2019-11-09|2019-11-10| it|2,2|109.88| 2019-10-10|    109.88|
+-----------+----------+----------+---+---+------+-----------+----------+
only showing top 5 rows

