In [10]:
import org.apache.spark.SparkConf
import org.apache.spark.graphx.{Edge, Graph}
import org.apache.spark.sql.functions._
import org.graphframes.GraphFrame
import scala.io.Source
import org.apache.spark.ml.feature.{StringIndexer, OneHotEncoder, VectorAssembler, MinMaxScaler}
import org.apache.spark.ml.Pipeline
import org.apache.spark.graphx._
import org.apache.spark.sql.types._
import org.apache.spark.rdd.RDD
import scala.collection.mutable.ListBuffer
import org.apache.spark.sql.{SparkSession, DataFrame, Row}
val spark = SparkSession.builder.appName("GraphExample").getOrCreate()
import spark.implicits._

import org.apache.spark.SparkConf
import org.apache.spark.graphx.{Edge, Graph}
import org.apache.spark.sql.functions._
import org.graphframes.GraphFrame
import scala.io.Source
import org.apache.spark.ml.feature.{StringIndexer, OneHotEncoder, VectorAssembler, MinMaxScaler}
import org.apache.spark.ml.Pipeline
import org.apache.spark.graphx._
import org.apache.spark.sql.types._
import org.apache.spark.rdd.RDD
import scala.collection.mutable.ListBuffer
import org.apache.spark.sql.{SparkSession, DataFrame, Row}
spark: org.apache.spark.sql.SparkSession = org.apache.spark.sql.SparkSession@5772abbb
import spark.implicits._


# **Dataset Definition**

In [3]:
val definition = "data/Xente_Variable_Definitions.csv"
val fileContent = Source.fromFile(definition).getLines().mkString("\n")

definition: String = data/Xente_Variable_Definitions.csv
fileContent: String =
Column Name,Definition
TransactionId,Unique �transaction identifier on platform
BatchId,Unique number assigned to a batch of transactions for processing
AccountId,Unique number identifying the customer on platform
SubscriptionId,Unique number identifying the customer subscription
CustomerId,Unique identifier attached to Account
CurrencyCode,Country currency
CountryCode,Numerical geographical code of country
ProviderId,Source provider of Item �bought.
ProductId,Item name being bought.
ProductCategory,ProductIds are organized into these broader product categories.
ChannelId,"Identifies if customer used web,Android, IOS, pay later or checkout."
Amount,Value of the transaction. Positive for debits f...


# **Read Data**

In [4]:
val train: DataFrame = spark.read
  .option("header", "true") 
  .option("inferSchema", "true") 
  .csv("data/training.csv")
val test: DataFrame = spark.read
  .option("header", "true") 
  .option("inferSchema", "true") 
  .csv("data/test.csv")

train: org.apache.spark.sql.DataFrame = [TransactionId: string, BatchId: string ... 14 more fields]
test: org.apache.spark.sql.DataFrame = [TransactionId: string, BatchId: string ... 13 more fields]


In [5]:
val df_train = train.withColumn("dataset", lit("train"))
val temp = test.withColumn("FraudResult", lit(null: Integer))
val df_test = temp.withColumn("dataset", lit("test"))

df_train: org.apache.spark.sql.DataFrame = [TransactionId: string, BatchId: string ... 15 more fields]
temp: org.apache.spark.sql.DataFrame = [TransactionId: string, BatchId: string ... 14 more fields]
df_test: org.apache.spark.sql.DataFrame = [TransactionId: string, BatchId: string ... 15 more fields]


In [6]:
val df: DataFrame = df_train.union(df_test)

df: org.apache.spark.sql.DataFrame = [TransactionId: string, BatchId: string ... 15 more fields]


In [7]:
df.printSchema()

root
 |-- TransactionId: string (nullable = true)
 |-- BatchId: string (nullable = true)
 |-- AccountId: string (nullable = true)
 |-- SubscriptionId: string (nullable = true)
 |-- CustomerId: string (nullable = true)
 |-- CurrencyCode: string (nullable = true)
 |-- CountryCode: integer (nullable = true)
 |-- ProviderId: string (nullable = true)
 |-- ProductId: string (nullable = true)
 |-- ProductCategory: string (nullable = true)
 |-- ChannelId: string (nullable = true)
 |-- Amount: double (nullable = true)
 |-- Value: integer (nullable = true)
 |-- TransactionStartTime: timestamp (nullable = true)
 |-- PricingStrategy: integer (nullable = true)
 |-- FraudResult: integer (nullable = true)
 |-- dataset: string (nullable = false)



In [8]:
df.groupBy("dataset").count().show()

+-------+-----+
|dataset|count|
+-------+-----+
|  train|95662|
|   test|45019|
+-------+-----+



# **Features Indexer**

In [8]:
val IndexerCols = Seq("ProviderId", "ProductId", "ProductCategory", "ChannelId", "PricingStrategy", "FraudResult")
var indexerStages = List[StringIndexer]()
for (colName <- IndexerCols) {
  val indexer = new StringIndexer()
    .setInputCol(colName)
    .setOutputCol(s"${colName}_idx")
    .setHandleInvalid("keep")
  indexerStages = indexerStages :+ indexer
}
var indexedDF = df
for (indexer <- indexerStages) {
  indexedDF = indexer.fit(indexedDF).transform(indexedDF)
}
indexedDF = indexedDF.drop(IndexerCols: _*)
indexedDF = indexedDF.withColumn("Amount_encoded", when(col("Amount") > 0, 1).otherwise(0))
indexedDF = indexedDF.withColumn("Value_log", log(col("Value") + 1))

IndexerCols: Seq[String] = List(ProviderId, ProductId, ProductCategory, ChannelId, PricingStrategy, FraudResult)
indexerStages: List[org.apache.spark.ml.feature.StringIndexer] = List(strIdx_4660e9a22251, strIdx_490758806c7c, strIdx_68094e20a67e, strIdx_7617bf1afbe0, strIdx_d7359504e53e, strIdx_8ffaac0e5a66)
indexedDF: org.apache.spark.sql.DataFrame = [TransactionId: string, BatchId: string ... 17 more fields]
indexedDF: org.apache.spark.sql.DataFrame = [TransactionId: string, BatchId: string ... 17 more fields]
indexedDF: org.apache.spark.sql.DataFrame = [TransactionId: string, BatchId: string ... 17 more fields]
indexedDF: org.apache.spark.sql.DataFrame = [TransactionId: string, BatchId: string ... 17 more fields]


In [9]:
val nonNegativeValues = indexedDF.select("FraudResult_idx").filter(col("FraudResult_idx") <= 1).as[Double].collect()
val baseline = if (nonNegativeValues.nonEmpty) nonNegativeValues.sum / nonNegativeValues.length.toDouble else 0.0
baseline

nonNegativeValues: Array[Double] = Array(0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 1.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 1.0, 0.0, 0.0, 0.0, 0.0, 0...


In [10]:
val temp = BigDecimal(baseline).setScale(4, BigDecimal.RoundingMode.HALF_UP).toDouble

temp: Double = 0.002


In [11]:
indexedDF = indexedDF.withColumn(
  "FraudResult_idx",
  when(col("FraudResult_idx") === 2, temp).otherwise(col("FraudResult_idx"))
)

indexedDF: org.apache.spark.sql.DataFrame = [TransactionId: string, BatchId: string ... 17 more fields]


In [12]:
indexedDF.groupBy("FraudResult_idx").count().show()

+---------------+-----+
|FraudResult_idx|count|
+---------------+-----+
|            0.0|95469|
|            1.0|  193|
|          0.002|45019|
+---------------+-----+



# **Construct Nodes**

In [13]:
val transactionNodes = df
  .select("TransactionId")
  .distinct()
  .withColumnRenamed("TransactionId", "id")
  .withColumn("node_type", lit("Transaction"))

val subscriptionNodes = df
  .select("SubscriptionId")
  .distinct()
  .withColumnRenamed("SubscriptionId", "id")
  .withColumn("node_type", lit("Subscription"))

val accountNodes = df
  .select("AccountId")
  .distinct()
  .withColumnRenamed("AccountId", "id")
  .withColumn("node_type", lit("Account"))

var allNodes = transactionNodes.union(subscriptionNodes).union(accountNodes)

transactionNodes: org.apache.spark.sql.DataFrame = [id: string, node_type: string]
subscriptionNodes: org.apache.spark.sql.DataFrame = [id: string, node_type: string]
accountNodes: org.apache.spark.sql.DataFrame = [id: string, node_type: string]
allNodes: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [id: string, node_type: string]


In [14]:
allNodes.groupBy("node_type").count().show()

+------------+------+
|   node_type| count|
+------------+------+
| Transaction|140681|
|Subscription|  4836|
|     Account|  4841|
+------------+------+



In [15]:
indexedDF.count()

res5: Long = 140681


# Node Features

In [16]:
val TransactionNodes = allNodes
  .join(indexedDF, allNodes("id") === indexedDF("TransactionId"), "right_outer")
  .select("id", "node_type", "ProviderId_idx", "ProductId_idx", "ProductCategory_idx", "ChannelId_idx", "PricingStrategy_idx", "Amount_encoded", "Value_log", "FraudResult_idx")

TransactionNodes: org.apache.spark.sql.DataFrame = [id: string, node_type: string ... 8 more fields]


In [17]:
TransactionNodes.count()

res6: Long = 140681


In [18]:
val columnsToMean = List("Amount_encoded", "Value_log")
val colummsToMode = List("ProviderId_idx", "ProductId_idx", "ProductCategory_idx", "ChannelId_idx", "PricingStrategy_idx")
val meanAggregations = columnsToMean.map(colName =>
  mean(col(colName)).alias(s"$colName")
)
val modeAggregations = colummsToMode.map(colName =>
  mode(col(colName)).alias(s"$colName")
)
val allAggregations = modeAggregations ++ meanAggregations
var SubscriptionNodes = indexedDF
  .groupBy("SubscriptionId")
  .agg(allAggregations.head, allAggregations.tail: _*)
var AccountNodes = indexedDF
  .groupBy("AccountId")
  .agg(allAggregations.head, allAggregations.tail: _*)

SubscriptionNodes = allNodes
  .join(SubscriptionNodes, allNodes("id") === SubscriptionNodes("SubscriptionId"), "right_outer")
  .select("id", "node_type", "ProviderId_idx", "ProductId_idx", "ProductCategory_idx", "ChannelId_idx", "PricingStrategy_idx", "Amount_encoded", "Value_log")

SubscriptionNodes = SubscriptionNodes.withColumn("FraudResult_idx",lit(temp))

AccountNodes = allNodes
  .join(AccountNodes, allNodes("id") === AccountNodes("AccountId"), "right_outer")
  .select("id", "node_type", "ProviderId_idx", "ProductId_idx", "ProductCategory_idx", "ChannelId_idx", "PricingStrategy_idx", "Amount_encoded", "Value_log")

AccountNodes = AccountNodes.withColumn("FraudResult_idx",lit(temp))

columnsToMean: List[String] = List(Amount_encoded, Value_log)
colummsToMode: List[String] = List(ProviderId_idx, ProductId_idx, ProductCategory_idx, ChannelId_idx, PricingStrategy_idx)
meanAggregations: List[org.apache.spark.sql.Column] = List(avg(Amount_encoded) AS Amount_encoded, avg(Value_log) AS Value_log)
modeAggregations: List[org.apache.spark.sql.Column] = List(mode(ProviderId_idx) AS ProviderId_idx, mode(ProductId_idx) AS ProductId_idx, mode(ProductCategory_idx) AS ProductCategory_idx, mode(ChannelId_idx) AS ChannelId_idx, mode(PricingStrategy_idx) AS PricingStrategy_idx)
allAggregations: List[org.apache.spark.sql.Column] = List(mode(ProviderId_idx) AS ProviderId_idx, mode(ProductId_idx) AS ProductId_idx, mode(ProductCategory_idx) AS ProductCategory_idx, mode(ChannelId_idx) ...


In [19]:
allNodes = TransactionNodes.union(SubscriptionNodes).union(AccountNodes)

allNodes: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [id: string, node_type: string ... 8 more fields]


# **Encoder + Assembler**

In [20]:
val Featurescolumns = Seq("ProviderId_idx", "ProductId_idx", "ProductCategory_idx", "ChannelId_idx", "PricingStrategy_idx")

Featurescolumns: Seq[String] = List(ProviderId_idx, ProductId_idx, ProductCategory_idx, ChannelId_idx, PricingStrategy_idx)


In [21]:
val encoders = Featurescolumns.map { colName =>
  new OneHotEncoder()
    .setInputCol(s"${colName}")
    .setOutputCol(s"${colName}_encoded")
}

encoders: Seq[org.apache.spark.ml.feature.OneHotEncoder] = List(oneHotEncoder_c786d99415c4, oneHotEncoder_8695692b81cd, oneHotEncoder_b1049acb6aca, oneHotEncoder_479b1b983fbb, oneHotEncoder_b86d5d4411fa)


In [22]:
val featureCols = Featurescolumns.flatMap(colName => Array(s"${colName}_encoded"))
val assembler = new VectorAssembler()
  .setInputCols(featureCols.toArray)
  .setOutputCol("features")

featureCols: Seq[String] = List(ProviderId_idx_encoded, ProductId_idx_encoded, ProductCategory_idx_encoded, ChannelId_idx_encoded, PricingStrategy_idx_encoded)
assembler: org.apache.spark.ml.feature.VectorAssembler = VectorAssembler: uid=vecAssembler_f48958abd8d5, handleInvalid=error, numInputCols=5


In [23]:
val pipeline = new org.apache.spark.ml.Pipeline().setStages((encoders :+ assembler).toArray)

pipeline: org.apache.spark.ml.Pipeline = pipeline_b75eee300fdd


In [24]:
allNodes = pipeline.fit(allNodes).transform(allNodes)

allNodes: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [id: string, node_type: string ... 14 more fields]


In [25]:
allNodes.show()

+--------------------+-----------+--------------+-------------+-------------------+-------------+-------------------+--------------+------------------+---------------+----------------------+---------------------+---------------------------+---------------------+---------------------------+--------------------+
|                  id|  node_type|ProviderId_idx|ProductId_idx|ProductCategory_idx|ChannelId_idx|PricingStrategy_idx|Amount_encoded|         Value_log|FraudResult_idx|ProviderId_idx_encoded|ProductId_idx_encoded|ProductCategory_idx_encoded|ChannelId_idx_encoded|PricingStrategy_idx_encoded|            features|
+--------------------+-----------+--------------+-------------+-------------------+-------------+-------------------+--------------+------------------+---------------+----------------------+---------------------+---------------------------+---------------------+---------------------------+--------------------+
| TransactionId_69578|Transaction|           0.0|          0.0| 

In [26]:
allNodes.count()

res8: Long = 150358


# **Construct edges**

In [27]:
val nodeCols = Seq("TransactionId", "SubscriptionId", "AccountId")
val nodePairs = nodeCols.combinations(2).toList.flatMap(pair => List((pair.head, pair.last), (pair.last, pair.head)))
val edgesDF = nodePairs.toDF("src_type", "dst_type")
val edges = edgesDF.selectExpr(
  "src_type as src",
  "dst_type as dst",
  "concat_ws('-', src_type, dst_type) as relation"
)
edges.show(truncate = false)

+--------------+--------------+----------------------------+
|src           |dst           |relation                    |
+--------------+--------------+----------------------------+
|TransactionId |SubscriptionId|TransactionId-SubscriptionId|
|SubscriptionId|TransactionId |SubscriptionId-TransactionId|
|TransactionId |AccountId     |TransactionId-AccountId     |
|AccountId     |TransactionId |AccountId-TransactionId     |
|SubscriptionId|AccountId     |SubscriptionId-AccountId    |
|AccountId     |SubscriptionId|AccountId-SubscriptionId    |
+--------------+--------------+----------------------------+



nodeCols: Seq[String] = List(TransactionId, SubscriptionId, AccountId)
nodePairs: List[(String, String)] = List((TransactionId,SubscriptionId), (SubscriptionId,TransactionId), (TransactionId,AccountId), (AccountId,TransactionId), (SubscriptionId,AccountId), (AccountId,SubscriptionId))
edgesDF: org.apache.spark.sql.DataFrame = [src_type: string, dst_type: string]
edges: org.apache.spark.sql.DataFrame = [src: string, dst: string ... 1 more field]


In [28]:
var edgesDF: DataFrame = spark.createDataFrame(spark.sparkContext.emptyRDD[(String, String, String)])
var tempDF: DataFrame = spark.createDataFrame(spark.sparkContext.emptyRDD[(String, String)])
def createEdgesAndUpdateDict(srcType: String, dstType: String): Unit = {
    val relation = (s"$srcType-$dstType")
    tempDF = df.select(srcType, dstType).distinct()
    tempDF = tempDF.withColumn("relation", lit(relation))
    edgesDF =  edgesDF.union(tempDF.select(
        col(srcType), 
        col(dstType),
        col("relation"),
    ))
}
for ((srcType, dstType) <- nodePairs) {
  createEdgesAndUpdateDict(srcType, dstType)
}

edgesDF: org.apache.spark.sql.DataFrame = [_1: string, _2: string ... 1 more field]
tempDF: org.apache.spark.sql.DataFrame = [AccountId: string, SubscriptionId: string ... 1 more field]
createEdgesAndUpdateDict: (srcType: String, dstType: String)Unit


In [29]:
edgesDF = edgesDF.withColumnRenamed("_1", "src").withColumnRenamed("_2", "dst").withColumnRenamed("_3", "relation")

edgesDF: org.apache.spark.sql.DataFrame = [src: string, dst: string ... 1 more field]


In [37]:
edgesDF.groupBy("relation").count().show()

+--------------------+------+
|            relation| count|
+--------------------+------+
|TransactionId-Sub...|140681|
|SubscriptionId-Tr...|140681|
|TransactionId-Acc...|140681|
|AccountId-Transac...|140681|
|SubscriptionId-Ac...|  4843|
|AccountId-Subscri...|  4843|
+--------------------+------+



In [31]:
var graph = GraphFrame(allNodes, edgesDF)

graph: org.graphframes.GraphFrame = GraphFrame(v:[id: string, node_type: string ... 14 more fields], e:[src: string, dst: string ... 1 more field])


In [32]:
allNodes.show()

+--------------------+-----------+--------------+-------------+-------------------+-------------+-------------------+--------------+------------------+---------------+----------------------+---------------------+---------------------------+---------------------+---------------------------+--------------------+
|                  id|  node_type|ProviderId_idx|ProductId_idx|ProductCategory_idx|ChannelId_idx|PricingStrategy_idx|Amount_encoded|         Value_log|FraudResult_idx|ProviderId_idx_encoded|ProductId_idx_encoded|ProductCategory_idx_encoded|ChannelId_idx_encoded|PricingStrategy_idx_encoded|            features|
+--------------------+-----------+--------------+-------------+-------------------+-------------+-------------------+--------------+------------------+---------------+----------------------+---------------------+---------------------------+---------------------+---------------------------+--------------------+
| TransactionId_69578|Transaction|           0.0|          0.0| 

In [33]:
// graph.vertices.write.mode("overwrite").parquet("vertices.parquet")
// graph.edges.write.mode("overwrite").parquet("edges.parquet")