In [1]:
import org.apache.spark.ml._
import org.apache.spark.ml.feature._
import org.apache.spark.ml.classification.LogisticRegression
import org.apache.spark.sql.functions._
import org.apache.spark.sql.SaveMode
import org.apache.spark.sql.DataFrame
import org.apache.spark.sql.types.IntegerType
import org.apache.spark.ml.tuning.ParamGridBuilder
import org.apache.spark.ml.param.ParamMap
import org.apache.spark.ml.tuning.CrossValidator
import org.apache.spark.ml.evaluation.BinaryClassificationEvaluator
import org.apache.spark.mllib.evaluation.MulticlassMetrics
import org.apache.spark.mllib.evaluation.BinaryClassificationMetrics

In [2]:
//read the full file
val originalDf = spark.read
    .format("csv")
    .option("header","true")
    .option("mode","DROPMALFORMED")
    .load("../data/creditcard.csv") // <- change
    .withColumn("id", monotonicallyIncreasingId)
    .withColumn("Time",'Time.cast("Int"))
    .withColumn("Class", 'Class.cast("Int"))

originalDf = [Time: int, V1: string ... 30 more fields]




[Time: int, V1: string ... 30 more fields]

In [3]:
val floatColumns = originalDf.columns.filter(x => (x contains "V") || (x contains "Amount"))

floatColumns = Array(V1, V2, V3, V4, V5, V6, V7, V8, V9, V10, V11, V12, V13, V14, V15, V16, V17, V18, V19, V20, V21, V22, V23, V24, V25, V26, V27, V28, Amount)


[V1, V2, V3, V4, V5, V6, V7, V8, V9, V10, V11, V12, V13, V14, V15, V16, V17, V18, V19, V20, V21, V22, V23, V24, V25, V26, V27, V28, Amount]

In [5]:
//cast all columns containing float to float
val casted_df = floatColumns.foldLeft(originalDf){ case (acc, col) => acc.withColumn(col, originalDf(col).cast("Float"))}

casted_df = [Time: int, V1: float ... 30 more fields]


[Time: int, V1: float ... 30 more fields]

In [12]:
// take only one row
val casted_df_head = casted_df.where('id === 1)

casted_df_head = [Time: int, V1: float ... 30 more fields]


[Time: int, V1: float ... 30 more fields]

In [1]:
casted_df_head.columns

[Time, V1, V2, V3, V4, V5, V6, V7, V8, V9, V10, V11, V12, V13, V14, V15, V16, V17, V18, V19, V20, V21, V22, V23, V24, V25, V26, V27, V28, Amount, Class, id]

In [25]:
//load the model
val model = PipelineModel.load("../data/logistic_regression_model") // <- change here

model = pipeline_80e25ff27998


pipeline_80e25ff27998

In [26]:
// the model needs a dataframe to make a prediction
// so you will need to make a dataframe made of one row, as I did above
// Sorry about that...
val predictions = model.transform(casted_df_head)

predictions = [Time: int, V1: float ... 36 more fields]


[Time: int, V1: float ... 36 more fields]

In [17]:
predictions.printSchema

root
 |-- Time: integer (nullable = true)
 |-- V1: float (nullable = true)
 |-- V2: float (nullable = true)
 |-- V3: float (nullable = true)
 |-- V4: float (nullable = true)
 |-- V5: float (nullable = true)
 |-- V6: float (nullable = true)
 |-- V7: float (nullable = true)
 |-- V8: float (nullable = true)
 |-- V9: float (nullable = true)
 |-- V10: float (nullable = true)
 |-- V11: float (nullable = true)
 |-- V12: float (nullable = true)
 |-- V13: float (nullable = true)
 |-- V14: float (nullable = true)
 |-- V15: float (nullable = true)
 |-- V16: float (nullable = true)
 |-- V17: float (nullable = true)
 |-- V18: float (nullable = true)
 |-- V19: float (nullable = true)
 |-- V20: float (nullable = true)
 |-- V21: float (nullable = true)
 |-- V22: float (nullable = true)
 |-- V23: float (nullable = true)
 |-- V24: float (nullable = true)
 |-- V25: float (nullable = true)
 |-- V26: float (nullable = true)
 |-- V27: float (nullable = true)
 |-- V28: float (nullable = true)
 |-- Amount: fl

In [24]:
// "prediction" is the predicted class
// "Class" is the ground truth (the original class)
// "probability" (no need to look at it) is an array of 2 values, the first value is the probability the value
// is of class 0, the second is the probability the value is of class 1
// in this case the model is sure the record is legit, because I used the same record in the training process, so 
// he "kind of remembers it" (not true strictlyb speaking)
predictions.select('id,'Time,'Amount,'Class,'prediction, 'probability).show()

+---+----+------+-----+----------+--------------------+                         
| id|Time|Amount|Class|prediction|         probability|
+---+----+------+-----+----------+--------------------+
|  1|   0|  2.69|    0|       0.0|[0.99982720734412...|
+---+----+------+-----+----------+--------------------+



In [18]:


val asd = predictions.select('Class,'prediction).collect.map(
    row => {
        val pred = row.getDouble(1).toInt
        val theClass = row.getInt(0)
        s"$pred$theClass" match {
            case "11" => 0 //tp
            case "01" => 1 //fn
            case "10" => 2 //fp
            case "00" => 3 //tn
        }
        
    }
    
)

[Stage 74:>                                                         (0 + 4) / 4]

asd = Array(3)


[3]

In [19]:
asd(0)

3

In [None]:
sqlC