## How to write results into HDFS

After submitting a job, we will need to retrieve the result. This can be stored in HDFS or elsewhere. Depending on the output size this can be a convenient approach or not. If so, we will need to write it in some format in order to read it back afterwards.

To produce this test I'm using Spark 1.5.1 (Pyspark 1.5.1) and ```spark-avro``` libraries loaded like this:

```bash
spark-submit --packages com.databricks:spark-avro_2.10:2.0.1 [...]
```

For Spark 1.3.0 use

```bash
spark-submit --packages com.databricks:spark-avro_2.10:1.0.0 [...]
```
Example with Spark 1.3.0 is provided in a separated file.

Index:

* [How to store aggregation results](#Aggregation-example)
 * [Example #1](#First-example)
 * [Example #2](#Second-example)

In [15]:
# is SparkContext already loaded?
sc

<pyspark.context.SparkContext at 0x7f52e406a690>

In [14]:
# Make sure you have a HiveContext
sqlContext

<pyspark.sql.context.HiveContext at 0x7f52d7922e90>

In [17]:
# Which is the version?
sc.version

u'1.5.1'

In [29]:
# load a dataframe from Avro files
df = sqlContext.read.format("com.databricks.spark.avro").load("/cms/wmarchive/test/avro/2016/01/01/")

In [26]:
from pyspark.sql import functions as F

In [20]:
df.printSchema()

root
 |-- PFNArrayRef: array (nullable = false)
 |    |-- element: string (containsNull = false)
 |-- task: string (nullable = false)
 |-- skippedFiles: array (nullable = false)
 |    |-- element: long (containsNull = false)
 |-- wmaid: string (nullable = false)
 |-- wmats: double (nullable = false)
 |-- fallbackFiles: array (nullable = false)
 |    |-- element: long (containsNull = false)
 |-- LFNArray: array (nullable = false)
 |    |-- element: string (containsNull = false)
 |-- meta_data: struct (nullable = false)
 |    |-- agent_ver: string (nullable = false)
 |    |-- fwjr_id: string (nullable = false)
 |    |-- host: string (nullable = false)
 |    |-- ts: long (nullable = false)
 |-- steps: array (nullable = false)
 |    |-- element: struct (containsNull = false)
 |    |    |-- status: long (nullable = false)
 |    |    |-- errors: array (nullable = false)
 |    |    |    |-- element: struct (containsNull = false)
 |    |    |    |    |-- type: string (nullable = false)
 |    |

In [31]:
df.persist(storageLevel=StorageLevel(True, True, False, False, 1))

DataFrame[PFNArrayRef: array<string>, task: string, skippedFiles: array<bigint>, wmaid: string, wmats: double, fallbackFiles: array<bigint>, LFNArray: array<string>, meta_data: struct<agent_ver:string,fwjr_id:string,host:string,ts:bigint>, steps: array<struct<status:bigint,errors:array<struct<type:string,details:string,exitCode:bigint>>,name:string,output:array<struct<branch_hash:string,guid:string,size:bigint,applicationName:string,acquisitionEra:string,applicationVersion:string,inputPFNs:array<bigint>,configURL:string,outputDataset:string,location:string,inputLFNs:array<bigint>,async_dest:string,events:bigint,merged:bigint,validStatus:string,adler32:string,ouput_module_class:string,globalTag:string,catalog:string,module_label:string,cksum:string,StageOutCommand:string,outputPFNs:array<bigint>,inputDataset:string,runs:array<struct<runNumber:bigint,lumis:array<bigint>>>,outputLFNs:array<bigint>,processingVer:bigint,processingStr:string,prep_id:string>>,stop:bigint,site:string,start:big

In [32]:
df.count()

200000

## Aggregation examples

### First example
1) Aggregated sum of all ```steps.performance.cpu``` values. In this case the result is a single line that can be easily stored back in HDFS, also in a textual format. (I'm not saying it is convenient to do with a "one line example", it's just)

In [102]:
aggregation1 = df.select("steps.performance.cpu") \
    .rdd \
    .flatMap(lambda cpuArrayRows: cpuArrayRows[0]) \
    .map(lambda row: row.asDict()) \
    .flatMap(lambda rowDict: [(k,v) for k,v in rowDict.iteritems()]) \
    .reduceByKey(lambda x,y: x+y)

In [104]:
aggregation1.collect()

[('TotalJobCPU', 152097347.4204235),
 ('TotalJobTime', 1652265.155986641),
 ('MinEventCPU', 1349112.0),
 ('MinEventTime', 300451.93912530516),
 ('AvgEventCPU', 299587.125072929),
 ('MaxEventTime', 1656177.8017510779),
 ('TotalEventCPU', 151599144),
 ('AvgEventTime', 300340.72760207055),
 ('MaxEventCPU', 1351100.0)]

In [105]:
# Store the file as a simple text file
aggregation1.saveAsTextFile("wmarchive/test-plaintext-aggregation1")

In [111]:
%%bash
hadoop fs -text wmarchive/test-plaintext-aggregation1/*

('TotalJobCPU', 152097347.4204235)
('TotalJobTime', 1652265.155986641)
('MinEventCPU', 1349112.0)
('MinEventTime', 300451.93912530516)
('AvgEventCPU', 299587.125072929)
('MaxEventTime', 1656177.8017510779)
('TotalEventCPU', 151599144)
('AvgEventTime', 300340.72760207055)
('MaxEventCPU', 1351100.0)


In [161]:
aggregated1DF = sqlContext.createDataFrame([{v[0]:v[1] for v in aggregation1.collect()}])

In [163]:
# saving in Json format
aggregated1DF.toJSON().saveAsTextFile("wmarchive/test-json-aggregation1")

In [164]:
%%bash
hadoop fs -text wmarchive/test-json-aggregation1/*

{"AvgEventCPU":299587.125072929,"AvgEventTime":300340.72760207055,"MaxEventCPU":1351100.0,"MaxEventTime":1656177.8017510779,"MinEventCPU":1349112.0,"MinEventTime":300451.93912530516,"TotalEventCPU":151599144,"TotalJobCPU":1.520973474204235E8,"TotalJobTime":1652265.155986641}


In [165]:
aggregated1DF.write.format("com.databricks.spark.avro").save("wmarchive/test-avro-aggregation1")

In [166]:
%%bash
hadoop fs -text wmarchive/test-avro-aggregation1/*

{"AvgEventCPU":{"double":299587.125072929},"AvgEventTime":{"double":300340.72760207055},"MaxEventCPU":{"double":1351100.0},"MaxEventTime":{"double":1656177.8017510779},"MinEventCPU":{"double":1349112.0},"MinEventTime":{"double":300451.93912530516},"TotalEventCPU":{"long":151599144},"TotalJobCPU":{"double":1.520973474204235E8},"TotalJobTime":{"double":1652265.155986641}}


### Second example

2) Coming soon