In [None]:
val accessKey = ""
val secretKey = ""
val bucketName = "streamingdata"
val endpoint = "s3-api.us-geo.objectstorage.service.networklayer.com"

In [2]:
sc.hadoopConfiguration.set("fs.s3.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem");
sc.hadoopConfiguration.set("fs.s3a.access.key", accessKey)
sc.hadoopConfiguration.set("fs.s3a.secret.key", secretKey)
sc.hadoopConfiguration.set("fs.s3a.endpoint", endpoint)

val s3Url = s"s3a://${bucketName}/"

import org.apache.spark.sql.types._

val schema = (new StructType()
    .add("InvoiceNo", LongType)
    .add("StockCode", LongType)
    .add("Description", StringType)
    .add("Quantity", ShortType)
    .add("InvoiceDate", LongType)
    .add("UnitPrice", DoubleType)
    .add("CustomerID", IntegerType)
    .add("Country", StringType)
    .add("LineNo", ShortType)
    .add("InvoiceTime", StringType)
    .add("StoreID", ShortType)
    .add("TransactionID", StringType))

var df = spark.read.schema(schema).json(s"${s3Url}/data/*")

println(df.count())
df.cache()

Waiting for a Spark session to start...

Waiting for a Spark session to start...

s3Url = s3a://streamingdata/
schema = StructType(StructField(InvoiceNo,LongType,true), StructField(StockCode,LongType,true), StructField(Description,StringType,true), StructField(Quantity,ShortType,true), StructField(InvoiceDate,LongType,true), StructField(UnitPrice,DoubleType,true), StructField(CustomerID,IntegerType,true), StructField(Country,StringType,true), StructField(LineNo,ShortType,true), StructField(InvoiceTime,StringType,true), StructField(StoreID,ShortType,true), StructField(TransactionID,StringType,true))
df = [InvoiceNo: bigint, StockCode: bigint ... 10 more fields]


[InvoiceNo: bigint, StockCode: bigint ... 10 more fields]

In [3]:
df.take(5)

0,1,2,3,4,5,6,7,8,9,10,11
5373891.0,22197.0,SMALL POPCORN HOLDER,10.0,1535636460000.0,0.85,17757.0,United Kingdom,15.0,13:41:00,0.0,5373891150180830.0
5373891.0,20752.0,BLUE POLKADOT WASHING UP GLOVES,4.0,1535636460000.0,2.1,17757.0,United Kingdom,17.0,13:41:00,0.0,5373891170180830.0
5405261.0,22624.0,IVORY KITCHEN SCALES,1.0,1535636460000.0,8.5,14606.0,United Kingdom,3.0,13:41:00,0.0,540526130180830.0
5405261.0,22199.0,FRYING PAN RED RETROSPOT,1.0,1535636460000.0,4.25,14606.0,United Kingdom,8.0,13:41:00,0.0,540526180180830.0
,,,,,,,,,,,


In [4]:
import org.apache.spark.sql.functions._

In [5]:
df =  ( df
            .filter(col("InvoiceNo").isNotNull)
            .filter(col("CustomerID").isNotNull)
            // Create a column 'Cancelled' which has the values 1=Cancelled, 0=Not Cancelled
            .withColumn("Cancelled",
              when(col("InvoiceNO").startsWith("C"), lit(1)).otherwise(lit(0))
            )
            .withColumn("UnitPrice", abs($"UnitPrice"))
            .withColumn("Quantity", abs($"Quantity"))
 )

df = [InvoiceNo: bigint, StockCode: bigint ... 11 more fields]


[InvoiceNo: bigint, StockCode: bigint ... 11 more fields]

In [6]:
df.take(5)

0,1,2,3,4,5,6,7,8,9,10,11,12
5373891,22197,SMALL POPCORN HOLDER,10,1535636460000,0.85,17757,United Kingdom,15,13:41:00,0,5373891150180830,0
5373891,20752,BLUE POLKADOT WASHING UP GLOVES,4,1535636460000,2.1,17757,United Kingdom,17,13:41:00,0,5373891170180830,0
5405261,22624,IVORY KITCHEN SCALES,1,1535636460000,8.5,14606,United Kingdom,3,13:41:00,0,540526130180830,0
5405261,22199,FRYING PAN RED RETROSPOT,1,1535636460000,4.25,14606,United Kingdom,8,13:41:00,0,540526180180830,0
5412141,22431,WATERING CAN BLUE ELEPHANT,6,1535636460000,1.95,15570,United Kingdom,16,13:41:00,0,5412141160180830,0


In [7]:
import org.apache.spark.ml.Pipeline
import org.apache.spark.ml.classification.DecisionTreeClassificationModel
import org.apache.spark.ml.classification.DecisionTreeClassifier
import org.apache.spark.ml.evaluation.MulticlassClassificationEvaluator
import org.apache.spark.ml.feature.{IndexToString, StringIndexer, VectorIndexer, VectorAssembler}

val countryIndexer = new StringIndexer()
  .setInputCol("Country")
  .setOutputCol("indexedCountry")
  .fit(df)

val assembler = new VectorAssembler()
  .setInputCols(Array("indexedCountry", "StockCode", "Quantity", "UnitPrice"))
  .setOutputCol("features")

val Array(trainingData, testData) = df.randomSplit(Array(0.7, 0.3))

val dtc = new DecisionTreeClassifier()
  .setLabelCol("Cancelled")
  .setFeaturesCol("features")

val pipeline = new Pipeline()
  .setStages(Array(countryIndexer, assembler, dtc))

val model = pipeline.fit(trainingData)

// Make predictions.
val predictions = model.transform(testData)

countryIndexer = strIdx_5b722d61fa70
assembler = vecAssembler_65c8cfdde2f5
trainingData = [InvoiceNo: bigint, StockCode: bigint ... 11 more fields]
testData = [InvoiceNo: bigint, StockCode: bigint ... 11 more fields]


[InvoiceNo: bigint, StockCode: bigint ... 11 more fields]

dtc: org.apache.spark.ml.class...


In [8]:
// Select example rows to display.
predictions.select("TransactionID", "Country", "StockCode", "Cancelled", "Prediction").show(5)

+----------------+--------------+---------+---------+----------+
|   TransactionID|       Country|StockCode|Cancelled|Prediction|
+----------------+--------------+---------+---------+----------+
| 536537160180830|United Kingdom|    22183|        0|       0.0|
| 536537120180830|United Kingdom|    22333|        0|       0.0|
| 536538110180830|United Kingdom|    21466|        0|       0.0|
| 536538120180830|United Kingdom|    21467|        0|       0.0|
|5365381130180830|United Kingdom|    21690|        0|       0.0|
+----------------+--------------+---------+---------+----------+
only showing top 5 rows



In [9]:
val evaluator = new MulticlassClassificationEvaluator()
  .setLabelCol("Cancelled")
  .setPredictionCol("prediction")
  .setMetricName("accuracy")

val accuracy = evaluator.evaluate(predictions)
println("Test Error = " + (1.0 - accuracy))

Test Error = 0.0


evaluator = mcEval_7fa7a079f23e
accuracy = 1.0


1.0