# Spark ETL to Transform Sensor Data on MapR

Before we can dive into our data we need to convert it to a format we can more readily deal with.  The sensor writes data in XML format, but we'll want to convert that to CSV.  We'll launch a Spark Session to perform the ETL job at scale.

## Configure the Spark Session

We'll need to link to the `com.databrocks:spark-xml` package to do the conversion.  We can also set up a few other configurations.  Setting configurations with PySpark and Jupyter is easy using a `Configure` block as shown.

In [1]:
%%configure -f
{"kind": "spark",
"driverMemory": "2048M",
"executorCores": 2,
 "conf":{"spark.jars.packages":"com.databricks:spark-xml_2.10:0.4.1"
        }
}

## Starting the Spark Session

Because we're using a PySpark kernel, any code we run will run on the MapR Hadoop Cluster automatically.  When we execute the first line of code, we'll initialize a Spark Application to do our work.

In [2]:
#PYSPARK Executable script
#import libraries
print ("Importing dependencies....")
import sys
import os
from pyspark.sql import SparkSession
import pyspark.sql.functions as func
from pyspark.sql.functions import *
from pyspark.sql.types import StringType, IntegerType, StructType, StructField, DoubleType, FloatType, DateType, TimestampType
from pyspark.sql.functions import date_format, col, desc, udf, from_unixtime, unix_timestamp, date_sub, date_add, last_day
import time
print("Import complete.\n") 

Starting Spark application


ID,YARN Application ID,Kind,State,Spark UI,Driver log,Current session?
0,application_1500400165929_0670,pyspark,idle,Link,Link,✔


SparkSession available as 'spark'.
Importing dependencies....
Import complete.

## Starting the ETL Job

The code below will read ~2000 XML files and convert them to a single CSV file.  The data is stored on the Hadoop file system on our MapR Cluster, and will be transformed there using Spark.

In [3]:
def xmlConvert(spark):

    etl_time = time.time()
    df = spark.read.format('com.databricks.spark.xml').options(rowTag='HistoricalTextData').load('hdfs:///data/predictive-maintenance/rw_XML_train')
    df = df.withColumn("TimeStamp", df["TimeStamp"].cast("timestamp")).groupBy("TimeStamp").pivot("TagName").sum("TagValue").na.fill(0)
    df.repartition(1).write.csv("hdfs:///data/predictive-maintenance/training-set/rw_etl_csv", header=True, sep=",")
    print ("Time taken to do xml transformation: --- %s seconds ---" % (time.time() - etl_time))

In [4]:
spark = SparkSession.builder.appName('XML ETL').getOrCreate()
print('Session created')
xmlConvert(spark)

Session created
Time taken to do xml transformation: --- 121.611049891 seconds ---

In [5]:
%%local
! hadoop fs -ls /data/predictive-maintenance/training-set/rw_etl_csv

Found 2 items
-rwxr-xr-x   3 5007 5007          0 2018-02-02 06:35 /data/predictive-maintenance/training-set/rw_etl_csv/_SUCCESS
-rwxr-xr-x   3 5007 5007   15043746 2018-02-02 06:35 /data/predictive-maintenance/training-set/rw_etl_csv/part-00000-6d90a100-9dc0-49e1-b2ea-d3b5b0df7140.csv
