Pour les clients qui ont cliqué ou achetés sur le site de Netvin, on recommandera à eux des produits personnalisés

# Importer des librairies

In [ ]:
// Importer des librairies
import com.mongodb.spark._
import com.mongodb.spark.sql._
import com.mongodb.spark.config._
import org.bson._

import org.apache.spark.sql.SQLContext
import org.apache.spark.{SparkConf, SparkContext, HashPartitioner}

import org.apache.spark.mllib.recommendation.ALS
import org.apache.spark.mllib.recommendation.MatrixFactorizationModel
import org.apache.spark.mllib.recommendation.Rating
import org.apache.spark.mllib.linalg.{Vector, Vectors}
import org.apache.spark.mllib.linalg.distributed.RowMatrix
import org.joda.time.DateTime;

import com.typesafe.config._

# Paramètres

In [ ]:
// Declaration des paramètres
val customer = "netvin"
val nbProductALS = 20
val nbProductReco = 5
val nbIterations = 20
val rank = 10
val lambda = 0.5
val mongoUrl = "mongodb://netvin:Ap-knoj6@mongo2.pro.hupi.loc,mongo3.pro.hupi.loc,mongo1.pro.hupi.loc"

In [ ]:
val today = DateTime.now
val days3MonthsAgo = today.minusMonths(6)

val ts_today = today.getMillis()/1000
val ts_3MonthsAgo = days3MonthsAgo.getMillis()/1000

# Checkpointing

In [ ]:
// Ajouter checkpointing pour éviter la récursion utilisée quand on crée modèle ALS 
// https://stackoverflow.com/questions/31484460/spark-gives-a-stackoverflowerror-when-training-using-als
sc.setCheckpointDir("target")

# Lecture des données dans MongoDB

On lit la collection 
- product_mapping(liste des produits en stock)
- click_data (pour voir les scores de produits pour chaque client) 

### Chercher des produits en stock

In [ ]:
// Lire le product_mapping dans Mongo
val read_productMapping = ReadConfig(Map("uri" -> s"$mongoUrl/$customer.product_mapping", "format" -> "com.mongodb.spark.sql"))

val mongo_productMapping = MongoSpark.load(sparkSession, read_productMapping)

val productMaps = mongo_productMapping.filter((mongo_productMapping("idsite") =!= "99"))
.map(l => (l(2).asInstanceOf[Long], l(3).asInstanceOf[String]))
.map{case (index, pid) => (index, pid)}
.rdd

In [ ]:
val productMapsCollect = productMaps.collect()

In [ ]:
// Liste des produits en stock (pid dans product_mapping)
val prodInStock = productMapsCollect.map { case (index, pid) => "'" + pid + "'"}.mkString("[", ",", "]")

### Chercher score des produits 

In [ ]:
val read_clickData = ReadConfig(Map("uri" -> s"$mongoUrl/$customer.click_data", "format" -> "com.mongodb.spark.sql"))
// On filtre des lignes avec idsite == 99 et on garde que des evenements de 2 derniers mois avec des produits actifs et limite nb d'éléments
val dataMongo = MongoSpark.load(sc, read_clickData)

// On filtre dans dataMongo les vts_last >= ts_1MonthsAgo et pid appartenant dans la liste prodInStock
val aggregatedRdd = dataMongo.withPipeline(Seq(Document.parse("""
{ 
  $match: 
 { $and: 
  [ 
    { vts_last: { $gt:""" + s"'$ts_3MonthsAgo'" + """ } },
    { pid: {$in : """ + s"$prodInStock" + """}} 

  ] 
 } 
},
{
  $sort : {vts_last: -1}
}"""
)))

In [ ]:
val clientsWithUid = dataMongo.withPipeline(Seq(Document.parse("""
{ 
  $match: 
 { $and: 
  [ 
    { uid: { $ne:""" + "'0'" + """ } }
  ] 
 } 
},
{
  $sort : {vts_last: -1}
}"""
))).map(l => (l.getString("vid"), (l.getString("uid")))).distinct

val getUid = clientsWithUid.collectAsMap()
val listClientsWithUid = getUid.keys.toList

In [ ]:
// On choisit les evenements dont le idsite != 99 et on prend en compte que les clients ayant uid (pour retrouver son email après)
// create rdd from aggregatedRdd
val partitioner = new HashPartitioner(100)

// On prend visitor_id, produit_id et score et filtre les vid qui n'ont pas de uid 
val listScoresRDD = aggregatedRdd.map(l => (l.getString("vid"), (l.getString("pid"), l.getInteger("score")))).partitionBy(partitioner)
          .map(l => (l._1, l._2._1, l._2._2))
          .filter(l => listClientsWithUid.contains(l._1))
          .rdd.map(l => (getUid(l._1), l._2, l._3)).cache()

# Créer map des index de idprod pour retrouver id_produits

In [ ]:
val listScoresRDD_collect = listScoresRDD.collect()

val productMap = productMaps.map { case (index, pid) => (pid, index)}.distinct

val listScorePropre = listScoresRDD.map(l => (l._2, (l._1, l._3))).join(productMap)
    .map(l => (l._2._1._1.toInt, l._2._2.toInt, l._2._1._2.toDouble)).cache()

In [ ]:
listScorePropre.take(5)

In [ ]:
listScorePropre.count()

# Créer modèle ALS

Les étapes :

1/ Diviser la base en 3 base : training_data (70%), validation_data (15%) et test_data (15%).
Attention : les users dans validation_data et dans test_data doivent apparaitre deja dans training_data
2/ Créer modèle d'abord avec training_data

# Préparer input pour créer modèle ALS

In [ ]:
val table_ALS = listScorePropre.map( l => Rating( l._1, l._2 , l._3 ) )

# Construire modèle ALS

In [ ]:
// Construction du modèle ALS
val model = new ALS().setSeed(123).setIterations(nbIterations).setRank(rank).setLambda(lambda).run(table_ALS)

# Préparer input pour sauvegarder dans MongoDB

In [ ]:
// On cherche les emails des clients
val getEmail = sc.broadcast(sparkSession.read.option("header","true")
              .csv(s"hdfs://$customer.node1.pro.hupi.loc:8020/user/$customer/input_data/sites/$customer/customers/*")
             .select("id", "email").map(l => (l(0).asInstanceOf[String], l(1).asInstanceOf[String])).rdd.collectAsMap())

In [ ]:
// On cherche liste des produits achetés des clients (score = 4)
val listPurchase = listScorePropre.map( l => ( l._1 , l._2.toString, l._3 ) ).filter(l => l._3 == 4)
                           .map(l => (l._1, List(l._2))).reduceByKey(_++_).map(l => (l._1.toInt, l._2.toList))

In [ ]:
// On cherche les produits recommandés pour chaque client, filtre les clients sans email, filtre des produtis recommandés déjà achetés 
// et prend nbProductReco parmi les recommandations

val final_predict = model.recommendProductsForUsers( nbProductALS ).map( l=> ( l._1 , l._2.map( w => w.product ) ) )
// on cherche les emails et filtre des client n'ayant pas email
.map(l => (getEmail.value.getOrElse(l._1.toString, ""), l._1, l._2.map(j => j))).filter(l => l._1 != "")
// on fait jointure avec listPurchase pour filtrer des produits dejà achetés
.map(l => (l._2, (l._1, l._3))).leftOuterJoin(listPurchase)
.map(l => (l._2._1._1, l._1, l._2._1._2, l._2._2.getOrElse(List(""))))
// on filtre des produits déjà achetés et prend nbProductReco
.map(l => (l._1, l._2, l._3.filter(s => !l._4.contains(s.toString)).take(nbProductReco).toList))
.map(l => l._3.map(s => (l._1, l._2, s))).flatMap(l => l)
// retrouver bon id_produit grace a productMap
.map(l => (l._3, (l._1, l._2))).join(productMap.map(l => (l._2.toInt, l._1)))
.map(l => (l._2._1._1, l._2._1._2, l._2._2))
.toDF("email", "uid", "idR")

In [ ]:
final_predict.take(10)

# Sauvegarder résultat dans MongoDB

In [ ]:
/*
// sauvegarder en Mongo 
final_predict.write.format("com.mongodb.spark.sql").option("uri", s"$mongoUrl/$customer.newsletters")
.mode("overwrite").save()
*/