# Feature Engineering and Tagging for Customer Churn Demo
We can use Spark Execution Engine to do the feature engineering and tagging for both train data and score data.

----------
## Notebook setup

When using Spark kernel notebooks on HDInsight, there is no need to create a SparkContext or a HiveContext; those are all created for you automatically when you run the first code cell, and you'll be able to see the progress printed. The contexts are created with the following variable names:
- SparkContext (sc)
- HiveContext (sqlContext)

To run the cells below, place the cursor in the cell and then press **SHIFT + ENTER**.

----------
## Variables set up

We need to set up three variables:

ChurnPeroid: This is the period you want to set to define the customer churn. Default value is 21 days. 

ChurnThreshold: This is the threshold you want to set to define the customer churn. The threshold defines as the number transactions a customer has at the churnPeriod. Default value is 0,  which means a customer churned if he/she doesn’t have any transaction during the churnPeriod.

DataDir: This is the storage path, please replace `$datacontainer` and `$storagename` with the real values

In [None]:
val churnPeriodVal=21;
val churnThresholdVal=0;
val dataDir="wasb://$datacontainer@$storagename.blob.core.windows.net;


We create a dataframe on these two variables in order to to joins later.

In [None]:
val ChurnVarsDF = sqlContext.createDataFrame(Seq((churnPeriodVal, churnThresholdVal)))

-----
## Reading data from Hive

To start with, let's first see what we have in our Hive store. The database in hive for the demo is `customerchurn`

In [None]:
%%sql
use customerchurn;
SHOW TABLES

We then create dataframe for user transaction activities using HIVE partitioned table `activities`, The snippet below creates a dataframe that you can perform any dataframe operation on. This dataframe contains all the data in the `activities`.

In [None]:
val activityTableDF = sqlContext.sql("select * from customerchurn.activities")

Get the max Datetime for user transaction activities and assign it to a variable.

In [None]:
val maxTimeDF = activityTableDF.select(max($"TransactionTime").alias("maxAllTransDate"))
val maxAllTransDateVal = (maxTimeDF.rdd.first())(0).toString()

We then create dataframe for user demograph data using HIVE partitioned table `users`. 

In [None]:
val userTableDF = sqlContext.sql("select * from customerchurn.users").join(ChurnVarsDF.withColumnRenamed("_1", "ChurnPeriod").withColumnRenamed("_2", "ChurnThreshold")).join(maxTimeDF)

Using Analytic Functions to get lag of the TransactionTime for each user. 

In [None]:
val w = Window.partitionBy("UserId").orderBy("TransactionTime")
val activityLagTableDF = activityTableDF.select($"*", datediff($"TransactionTime", lag($"TransactionTime", 1).over(w)).alias("TransactionInterval"))

produce a Pre-Churn flag: 

In [None]:
val exprStr = "case when datediff(TransactionTime, date_add('" + maxAllTransDateVal + "', -1*" + churnPeriodVal + ")) <= 0 then 1 else 0 end"
val activityFlagTableDF = activityLagTableDF.withColumn("preChurnPeriodTransFlag", expr(exprStr))

Produce all the features by join two dataframes 
Tag as Churn or Non-Churn for train data. For score data, we also tag it for comparison purpose with Prediction

In [None]:
val featuredDF = (
         activityFlagTableDF
         .groupBy($"UserId")
         .agg(  sum(expr("case when preChurnPeriodTransFlag = 1 then 1 else 0 end")).alias("PrechurnProductsPurchased"), 
                count($"TransactionId").alias("OverallProductsPurchased"),
                sum(expr("case when preChurnPeriodTransFlag = 1 then Quantity else 0 end")).alias("TotalQuantity"), 
                sum(expr("case when preChurnPeriodTransFlag = 1 then Value else 0 end")).alias("TotalValue"),
                stddev_samp(expr("case when preChurnPeriodTransFlag = 1 then Quantity else null end")).alias("StDevQuantity"), 
                stddev_samp(expr("case when preChurnPeriodTransFlag = 1 then Value else null end")).alias("StDevValue"),             
                avg(expr("case when preChurnPeriodTransFlag = 1 then TransactionInterval else null end")).alias("AvgTimeDelta"),
                (max(expr("case when preChurnPeriodTransFlag = 1 then TransactionTime else null end"))).alias("RecencyDate"),
                (countDistinct(expr("case when preChurnPeriodTransFlag = 1 then TransactionId else '-1' end")) 
                 - sumDistinct(expr("case when (case when preChurnPeriodTransFlag = 1 then TransactionId else null end) is null then 1 else 0 end"))).alias("UniqueTransactionId"),
                (countDistinct(expr("case when preChurnPeriodTransFlag = 1 then ItemId else '-1' end")) 
                 - sumDistinct(expr("case when (case when preChurnPeriodTransFlag = 1 then ItemId else null end) is null then 1 else 0 end"))).alias("UniqueItemId"),
                (countDistinct(expr("case when preChurnPeriodTransFlag = 1 then Location else '-1' end")) 
                 - sumDistinct(expr("case when (case when preChurnPeriodTransFlag = 1 then Location else null end) is null then 1 else 0 end"))).alias("UniqueLocation"),
                (countDistinct(expr("case when preChurnPeriodTransFlag = 1 then ProductCategory else '-1' end")) 
                 - sumDistinct(expr("case when (case when preChurnPeriodTransFlag = 1 then ProductCategory else null end) is null then 1 else 0 end"))).alias("UniqueProductCategory")
          )
          .join(userTableDF.withColumnRenamed("UserID", "UId"), $"UId"===activityFlagTableDF("UserId"))
          .select($"UserId", 
          $"TotalQuantity", 
          $"TotalValue", 
          $"StDevQuantity", 
          $"StDevValue", 
          $"AvgTimeDelta", 
                   (datediff($"maxAllTransDate", $"RecencyDate") - $"ChurnPeriod").alias("Recency"), 
                   $"UniqueTransactionId", $"UniqueItemId", $"UniqueLocation", $"UniqueProductCategory", 
                   ($"TotalQuantity" /($"UniqueTransactionId"+1)).alias("TotalQuantityperUniqueTransactionId"), 
                   ($"TotalQuantity" /($"UniqueItemId"+1)).alias("TotalQuantityperUniqueItemId"), 
                   ($"TotalQuantity" /($"UniqueLocation"+1)).alias("TotalQuantityperUniqueLocation"), 
                   ($"TotalQuantity" /($"UniqueProductCategory"+1)).alias("TotalQuantityperUniqueProductCategory"), 
                   ($"TotalValue" /($"UniqueTransactionId"+1)).alias("TotalValueperUniqueTransactionId"), 
                   ($"TotalValue" /($"UniqueItemId"+1)).alias("TotalValueperUniqueItemId"), 
                   ($"TotalValue" /($"UniqueLocation"+1)).alias("TotalValueperUniqueLocation"), 
                   ($"TotalValue" /($"UniqueProductCategory"+1)).alias("TotalValueperUniqueProductCategory"),
                   $"Age",
                   $"Address",
                   $"Gender",
                   $"UserType",
                    expr("case when PrechurnProductsPurchased = 0 then 0 when PrechurnProductsPurchased >=0 and (( OverallProductsPurchased- PrechurnProductsPurchased)<= ChurnThreshold)  then 1 else 0 end").alias("churn"),
                    $"PrechurnProductsPurchased",
                    $"OverallProductsPurchased"                   
                  )
)

-------------
## Save the featured/tagged data back to storage

First prepare the blob. Because for Hive External table, Scala write.mode(SaveMode.Overwrite) could not produce the data properly,  
Because we have to use write.mode(SaveMode.Overwrite), we have to remove the old data may pre-existing there.

In [None]:
val filePath= dataDir + "/customerchurn/data/traindatauserfeatured/"
Seq("hadoop","fs","-mkdir", "-p",filePath).!!    
Seq("hadoop","fs","-rm", "-r",filePath).!!

If you have a dataframe that was created with a HiveContext and you want to persist that data to Hive, you can create a table and then insert the dataframe into the table: 

In [None]:
sqlContext.sql("use customerchurn")
sqlContext.sql("drop table traindata_user_Featured")

val sqlStr = """
    CREATE EXTERNAL TABLE traindata_user_Featured(
        UserId varchar(50) ,
        TotalQuantity bigint ,
        TotalValue float ,
        StDevQuantity float ,
        StDevValue float ,
        AvgTimeDelta float ,
        Recency int ,
        UniqueTransactionId bigint ,
        UniqueItemId bigint ,
        UniqueLocation bigint ,
        UniqueProductCategory bigint ,
        TotalQuantityperUniqueTransactionId float ,
        TotalQuantityperUniqueItemId float ,
        TotalQuantityperUniqueLocation float ,
        TotalQuantityperUniqueProductCategory float ,
        TotalValueperUniqueTransactionId float ,
        TotalValueperUniqueItemId float ,
        TotalValueperUniqueLocation float ,
        TotalValueperUniqueProductCategory float ,
        Age varchar(50) ,
        Address varchar(50) ,
        Gender varchar(50),
        UserType varchar(50),
        tag   varchar(10),
        PrechurnProductsPurchased bigint ,
        OverallProductsPurchased bigint 
    )
    ROW FORMAT DELIMITED FIELDS TERMINATED BY ',' 
    LINES TERMINATED BY '10' 
    STORED AS TEXTFILE LOCATION 
    """ + "'" + dataDir + "/customerchurn/data/traindatauserfeatured/'"

We choose 70% data as train data, and then save as hive table to blob

In [None]:
featuredDF.sample(false, 0.7, 123).coalesce(1).write.mode(SaveMode.Append).saveAsTable("traindata_user_Featured");

Delete the log files produced by Hive, because MRS could not recongized them

In [None]:
val lsFilePath= (Seq("hadoop","fs","-ls",filePath).!!).replace("\n", " ")
val tempFileList= lsFilePath.split(" ").filter(x => (x.contains(".hive-staging_hive")))
    

for(tempFilePath<- tempFileList)
    {
       Seq("hadoop","fs","-rm", "-r",tempFilePath).!!
    }