In [144]:
import org.apache.spark.sql.SparkSession
import spark.implicits._
import org.apache.spark.sql.functions._
import org.apache.spark.sql.types.{StructType, IntegerType, StringType, DateType, DoubleType}

In [1]:
val spark = SparkSession
  .builder
  .appName("myapp")
  .master("local[*]")
  .getOrCreate()

spark = org.apache.spark.sql.SparkSession@25db74cb


org.apache.spark.sql.SparkSession@25db74cb

### Load Data

In [135]:
val schema = new StructType()
      .add("id", IntegerType, true)
      .add("name", StringType, true)
      .add("email", StringType, true)
      .add("joinDate", DateType, true)
      .add("status", StringType, true)

val df_customer = spark.read.format("csv")
      .options(Map("delimiter"->"\\t"))
      .schema(schema)
      .load("customer.csv")

schema = StructType(StructField(id,IntegerType,true), StructField(name,StringType,true), StructField(email,StringType,true), StructField(joinDate,DateType,true), StructField(status,StringType,true))
df_customer = [id: int, name: string ... 3 more fields]


[id: int, name: string ... 3 more fields]

In [136]:
val schema = new StructType()
      .add("id", IntegerType, true)
      .add("name", StringType, true)
      .add("price", DoubleType, true)
      .add("numberOfProducts", IntegerType, true)

val df_product = spark.read.format("csv")
      .options(Map("delimiter"->"\\t"))
      .schema(schema)
      .load("product.csv")

schema = StructType(StructField(id,IntegerType,true), StructField(name,StringType,true), StructField(price,DoubleType,true), StructField(numberOfProducts,IntegerType,true))
df_product = [id: int, name: string ... 2 more fields]


[id: int, name: string ... 2 more fields]

In [137]:
val schema = new StructType()
      .add("customerID", IntegerType, true)
      .add("orderID", IntegerType, true)
      .add("productID", IntegerType, true)
      .add("numberOfProduct", IntegerType, true)
      .add("orderDate", DateType, true)
      .add("orderStatus", StringType, true)

val df_order = spark.read.format("csv")
      .options(Map("delimiter"->"\\t"))
      .schema(schema)
      .load("order.csv")

schema = StructType(StructField(customerID,IntegerType,true), StructField(orderID,IntegerType,true), StructField(productID,IntegerType,true), StructField(numberOfProduct,IntegerType,true), StructField(orderDate,DateType,true), StructField(orderStatus,StringType,true))
df_order = [customerID: int, orderID: int ... 4 more fields]


[customerID: int, orderID: int ... 4 more fields]

In [138]:
val df_joined = df_customer.join(df_order, df_customer("id") === df_order("customerID"), "inner").select(
    "customerID", 
    "name", 
    "orderDate", 
    "status", 
    "orderID", 
    "productID", 
    "numberOfProduct", 
    "orderStatus")

df_joined = [customerID: int, name: string ... 6 more fields]


[customerID: int, name: string ... 6 more fields]

In [139]:
df_joined.show(100)

+----------+---------+----------+--------+-------+---------+---------------+-----------+
|customerID|     name| orderDate|  status|orderID|productID|numberOfProduct|orderStatus|
+----------+---------+----------+--------+-------+---------+---------------+-----------+
|         1|     John|2018-02-23|  active|     21|        3|            500|  delivered|
|         1|     John|2018-02-23|  active|     22|        1|            300|  delivered|
|         1|     John|2018-02-23|  active|     23|        2|            300|  delivered|
|         1|     John|2018-03-23|  active|     24|        1|            500|  delivered|
|         1|     John|2018-03-23|  active|     25|        2|            300|  delivered|
|         1|     John|2018-03-23|  active|     26|        3|            300|  delivered|
|         1|     John|2018-04-23|  active|     27|        1|            400|  delivered|
|         1|     John|2018-04-23|  active|     28|        2|            200|  delivered|
|         1|     John

In [66]:
// df_joined.groupBy("customerID", "productID").sum("numberOfProduct").as("sum_salary").show()

Name: Syntax Error.
Message: 
StackTrace: 

In [140]:
df_joined.createOrReplaceTempView("EMP")

In [141]:
var df_client_product = spark.sql("""
    SELECT distinct m.name as customer_name, m.productID
    FROM
        (SELECT
            n.name,
            n.productID,
            n.summary,
            MAX(n.summary) over (Partition By n.name) as popular
        FROM
            (SELECT 
            name, 
            productID, 
            SUM(
                case 
                    when orderStatus = 'delivered' then numberOfProduct
                    else 0
                end
            ) over (Partition By name, productID) as summary 
            FROM EMP
            ) n
        ) m
    WHERE summary = popular
    ORDER BY customer_name
""")

df_client_product = [customer_name: string, productID: int]


[customer_name: string, productID: int]

In [142]:
df_client_product.show()

+-------------+---------+
|customer_name|productID|
+-------------+---------+
|    Anastasia|        1|
|    Anastasia|        2|
|         John|        1|
|       Philip|        1|
|       Philip|        2|
|       Robert|        4|
|         Sara|        8|
|       Vasili|        1|
+-------------+---------+



In [146]:
val df_joined_final = df_client_product
    .join(df_product, df_client_product("productID") === df_product("id"), "inner")
    .select(col("customer_name").as("customer.name"), col("name").as("product.name"))

df_joined_final = [customer.name: string, product.name: string]


[customer.name: string, product.name: string]

In [147]:
df_joined_final.show()

+-------------+-----------------+
|customer.name|     product.name|
+-------------+-----------------+
|    Anastasia|   Apple iPhone 7|
|    Anastasia|   Apple iPhone 8|
|         John|   Apple iPhone 7|
|       Philip|   Apple iPhone 7|
|       Philip|   Apple iPhone 8|
|       Robert|Apple iPad mini 4|
|         Sara|    Apple AirPods|
|       Vasili|   Apple iPhone 7|
+-------------+-----------------+



Resources

https://sparkbyexamples.com/spark/spark-read-csv-file-into-dataframe/