### Initialize Spark session

In [12]:
import org.apache.spark.SparkConf
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.SaveMode

import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.FileSystem
import org.apache.hadoop.fs.Path

println("Initializing Spark context...")
val conf = new SparkConf()
               .setAppName("CSV to Kafka")
               .set("spark.cores.max", "2")
val spark: SparkSession = SparkSession.builder.config(conf).getOrCreate()
//if you look in Spark Master UI, a application will be running after this

Initializing Spark context...


conf = org.apache.spark.SparkConf@43334e76
spark = org.apache.spark.sql.SparkSession@1bdfc718


org.apache.spark.sql.SparkSession@1bdfc718

### Initialize HDFS client

In [13]:
val hdfsPrefix = sys.env("HDFS_URL")
val hadoopConf = new Configuration()
hadoopConf.set("fs.defaultFS", sys.env("HDFS_URL"))
val hdfs = FileSystem.get(hadoopConf)

hdfsPrefix = hdfs://namenode1:8020
hadoopConf = Configuration: core-default.xml, core-site.xml, mapred-default.xml, mapred-site.xml, yarn-default.xml, yarn-site.xml, hdfs-default.xml, hdfs-site.xml
hdfs = DFS[DFSClient[clientName=DFSClient_NONMAPREDUCE_-1584413033_43, ugi=root (auth:SIMPLE)]]


DFS[DFSClient[clientName=DFSClient_NONMAPREDUCE_-1584413033_43, ugi=root (auth:SIMPLE)]]

### Load partitioned CSV data from HDFS

In [14]:
println("Load partitioned CSV from HDFS")
val df = spark.read
    .format("com.databricks.spark.csv")
    .option("inferSchema", "true")
    .option("header", "true")
    .load(hdfsPrefix + "/output/obfuscated-samples.csv")

println("Partitions loaded: " + df.rdd.partitions.size)
df.describe().show()

Load partitioned CSV from HDFS
Partitions loaded: 2
+-------+--------------------+------------------+-------------------+----+
|summary|            personId|               lat|                lon|type|
+-------+--------------------+------------------+-------------------+----+
|  count|                   4|                 4|                  4|   4|
|   mean|                null|-5.966223979991914|  -49.1150839571495|null|
| stddev|                null|3.1309343545864228| 18.583792823102733|null|
|    min|f57ded61-bb16-4f5...|-8.824724236896516| -68.94285644612884| gps|
|    max|fe398302-eace-4ed...|-3.157759723087311|-32.287343468170164| gps|
+-------+--------------------+------------------+-------------------+----+



df = [time: timestamp, personId: string ... 3 more fields]


[time: timestamp, personId: string ... 3 more fields]

In [15]:
println("Writing dataframe locations to Kafka topic 'locations'")
df.selectExpr("CAST(personId AS STRING) AS key", "to_json(struct(*)) AS value")
    .write
    .format("kafka")
    .option("kafka.bootstrap.servers", "kafka:9092")
    .option("topic", "locations")
    .save()
println("done")

Writing dataframe locations to Kafka topic 'locations'


Name: org.apache.spark.sql.AnalysisException
Message: 'writeStream' can be called only on streaming Dataset/DataFrame;
StackTrace:   at org.apache.spark.sql.catalyst.analysis.package$AnalysisErrorAt.failAnalysis(package.scala:42)
  at org.apache.spark.sql.Dataset.writeStream(Dataset.scala:3186)

### Save obfuscated file

In [22]:
println("Saving to HDFS")
val fp = hdfsPrefix + "/output/obfuscated-samples.csv"
hdfs.delete(new org.apache.hadoop.fs.Path(fp), true)
rdd6.saveAsTextFile(fp)

Saving to HDFS


fp = hdfs://namenode1:8020/output/obfuscated-samples.csv


hdfs://namenode1:8020/output/obfuscated-samples.csv

In [23]:
import java.io.{BufferedWriter, FileWriter}
import scala.collection.JavaConversions._
import scala.collection.mutable.ListBuffer
import au.com.bytecode.opencsv.CSVWriter
import scala.util.Random

println("Save obfuscated file to workspace")
val obr = rdd6.collect()
val obr2 = obr.toList.map(e => Array(e._1.toString, e._2.toString, e._3.toString, e._4.toString, e._5.toString))
val outputFile = new BufferedWriter(new FileWriter("/notebooks/obfuscate-geo-samples/obfuscated-samples.csv"))
val csvWriter = new CSVWriter(outputFile, ',', CSVWriter.NO_QUOTE_CHARACTER)
val csvSchema = Array("time", "person_id", "lat", "lon", "type")
println("Writing CSV file")
csvWriter.writeNext(csvSchema)
csvWriter.writeAll(obr2.toList)
outputFile.close()

Save obfuscated file to workspace
Writing CSV file


obr = Array((2019-04-18 04:03:31.0,5a64a1e3-e0fa-464e-8f2d-93346abf590a,-3.401548862514496,-60.92761225243187,gps), (2019-04-18 04:09:27.0,200f9033-b734-4c69-aeed-2730018eff66,-8.545849880735396,-32.216972621183395,gps), (2019-04-18 04:24:11.0,5a64a1e3-e0fa-464e-8f2d-93346abf590a,-3.201559862514496,-68.92763725243186,gps), (2019-04-18 04:30:45.0,200f9033-b734-4c69-aeed-2730018eff66,-8.845910880735397,-34.216933621183394,gps))
obr2 = List(Array(2019-04-18 04:03:31.0, 5a64a1e3-e0fa-464e-8f2d-93346abf590a, -3.401548862514496, -60...


List(Array(2019-04-18 04:03:31.0, 5a64a1e3-e0fa-464e-8f2d-93346abf590a, -3.401548862514496, -60...

### Remove temp files

In [24]:
val tmpPath = new Path("/tmp")
hdfs.delete(tmpPath, true)

tmpPath = /tmp


true

### Stop application

In [25]:
println("Stop Spark session")
spark.stop()
//if you look in Spark Master UI, no application will be running after stop

Stop Spark session
