In [None]:
case class OrderLine (
    sku : String,
    productName : String,
    thumbnailImage: String,
    quantity : Double,
    unitPrice : Double,
    totalPrice : Double
)

case class Order (
    customerId : java.util.UUID ,
    orderId : java.util.UUID ,
    date : java.util.Date ,
    OrderLines_ : List[OrderLine] ,
    totalPrice : Double
)

case class RecommendedProduct (
    sku : String,
    product_name : String,
    regular_price : Double,
    thumbnail_image : String
);

case class ProductRecommendations (
    sku : String,
    product_name : String,
    recommended_products : List[RecommendedProduct]
);

case class Top50SellingProducts (
    sku : String,
    productName : String,
    saleCount : Double,
    saleValue : Double,
    thumbnailImage : String
);



In [None]:
val orders = sc.cassandraTable[Order]("retail_ks","orders").persist(org.apache.spark.storage.StorageLevel.MEMORY_AND_DISK)

In [None]:
orders.count

In [None]:
val orderlines = orders.flatMap ( order => order.OrderLines_).map( ol => (ol.sku, (ol.productName, ol.thumbnailImage, ol.quantity, ol. unitPrice, ol.totalPrice)))

In [None]:
orderlines.take(10)

In [None]:
val soldproducts = orderlines.reduceByKey( (a,b) => (a._1, a._2, a._3+b._3, a._4, a._5+b._5)).
    map( { case ( sku, (productName, thumbnailImage, count, unitPrice, value) ) => Top50SellingProducts (sku, productName, count, value, thumbnailImage) })

In [None]:
val Top50CountSellingProducts = soldproducts.sortBy(  -_.saleValue  ).
    zipWithIndex.
    filter{case (_, idx) => idx < 50}.
    keys

In [None]:
Top50CountSellingProducts.take(1)

In [None]:
Top50CountSellingProducts.saveToCassandra("retail_ks","top50_selling_products" )

In [None]:
val ProductCoOccurance = orders.
    flatMap( order => order.OrderLines_.
        map(  ol => ( ol.sku , order.OrderLines_.
            map(ol => (ol.sku, (ol.productName, ol.thumbnailImage, ol.quantity, ol. unitPrice, ol.totalPrice))).filter ( ol3 => ol3._1!=`ol`.sku)
                ) // for each order, make a list of product cooccurance (product1, product2)
            )
            ).reduceByKey{ (a,b) =>
                          // merge the list per product1
                          val mergedBySku = (a++b)
                          // for each product1, merge list on key product2
                          val groupedBySku = mergedBySku.groupBy( { case (sku,(pn, ti, q, up, tp)) => sku } ).values.toList
                          // aggregate value sum of product2
                          groupedBySku.map( listOfProduct => listOfProduct.reduce( (a,b) => (a._1, (a._2._1, a._2._2, a._2._3+b._2._3, a._2._4, a._2._5+b._2._5))) )
                           }.
                mapValues { TotalSumBySku =>
                // take top50 product2 sorted on summed value
                val Top50Value = TotalSumBySku.sortBy(-_._2._5).slice(0,50)
                // create a RecommendedProduct item to fit table structure
                Top50Value.map( lop => RecommendedProduct(lop._1, lop._2._1, lop._2._4  , lop._2._2  ))
                        }.
            // create a ProductRecommendation to fit table structure
            map( pco => ProductRecommendations(pco._1, "", pco._2))

In [None]:
ProductCoOccurance.take(1)

In [None]:
val sqlContext = new org.apache.spark.sql.hive.HiveContext(sc)
import sqlContext.implicits._

In [None]:
val pcodf = ProductCoOccurance.toDF
pcodf.printSchema

In [None]:
pcodf.write.format("org.apache.spark.sql.cassandra").
    options(Map( "table" -> "product_recommendations", "keyspace" -> "retail_ks")).
    mode("overwrite").
    save()


In [None]:
ProductCoOccurance.saveToCassandra("retail_ks","product_recommendations")

In [None]:
val pcodf_read = sqlContext.read.format("org.apache.spark.sql.cassandra").
	options(Map( "table" -> "product_recommendations", "keyspace" -> "retail_ks")).
	load()


In [None]:
pcodf_read.count

In [None]:
sc.cassandraTable("retail_ks","product_catalog").count

In [None]:
sc.cassandraTable("retail_ks","product_accessories").count