In [13]:
df = sqlContext.read.format("com.mongodb.spark.sql.DefaultSource").load().cache()

In [105]:
from collections import OrderedDict
from pyspark.sql.functions import udf, array, round, col
from pyspark.sql.types import IntegerType, DateType, DoubleType,FloatType
from datetime import datetime

ref_date = datetime.strptime("2000-1-1", "%Y-%m-%d")

udf_convert_to_dt = udf(lambda i1, i2, i3: datetime.strptime("%s-%s-%s" %(i1, i2, i3), "%Y-%m-%d"), returnType=DateType())
udf_is_missing_data = udf(lambda element: 0 if element else 1, returnType=IntegerType())

udf_to_date = udf(lambda item: datetime.strftime(item, "%Y-%m-%d"))


data = df\
    .select(
        col("serial_number").alias("SN"), 
        col("time").alias("Time"),
        col("WL1.WL1-MEAN").alias("WL1-MEAN"),
        col("WL2.WL2-MEAN").alias("WL2-MEAN"),
        col("WL3.WL3-MEAN").alias("WL3-MEAN"),
        col("WHRL1.WHRL1-ACC").alias("WHRL1-ACC"),
        col("WHRL2.WHRL2-ACC").alias("WHRL2-ACC"),
        col("WHRL3.WHRL3-ACC").alias("WHRL3-ACC")
    )\
    .filter(df['time'] > ref_date)\
    .withColumn('missing', udf_is_missing_data('WL1-MEAN'))\
    .withColumn('date', udf_to_date("time"))\
    .withColumn('WL-MEAN', col("WL1-MEAN") + col("WL2-MEAN") + col("WL3-MEAN")/3)\
    .withColumn('WHRL-SUM', col("WHRL1-ACC") + col("WHRL2-ACC") + col("WHRL3-ACC"))\
    .groupBy("date")\
    .agg({
        'WL1-MEAN': "mean", 
        'WL2-MEAN': "mean", 
        "WL3-MEAN": "mean", 
        "missing":"sum", 
        'WL-MEAN':'mean',
        'WHRL1-ACC':'sum',
        'WHRL2-ACC':'sum',
        'WHRL3-ACC':'sum',
        'WHRL-SUM':'sum'
    })\
    .orderBy("date")\
    .drop('Time')\
    .select(
        'date', 
        col('sum(missing)').alias("Missing points"),
        #round(col('avg(WL1-MEAN)'), 4).alias('WL1-MEAN'),
        #round(col('avg(WL2-MEAN)'), 4).alias('WL2-MEAN'),
        #round(col('avg(WL3-MEAN)'), 4).alias('WL3-MEAN'),
        round(col('avg(WL-MEAN)'), 4).alias('Summed active power'),
        round(col('sum(WHRL1-ACC)'), 4).alias('WHRL1-ACC'),
        round(col('sum(WHRL2-ACC)'), 4).alias('WHRL2-ACC'),
        round(col('sum(WHRL3-ACC)'), 4).alias('WHRL3-ACC'),
        round(col('sum(WHRL-SUM)'), 4).alias('Mean active energy')
    )
data.show(10)

+----------+--------------+-------------------+---------+---------+---------+------------------+
|      date|Missing points|Summed active power|WHRL1-ACC|WHRL2-ACC|WHRL3-ACC|Mean active energy|
+----------+--------------+-------------------+---------+---------+---------+------------------+
|2016-05-23|             8|               null|     null|     null|     null|              null|
|2016-05-24|            69|             5.3115|   17.053|   17.032|   16.988|            51.073|
|2016-05-25|            88|             5.3952|    4.906|      4.9|    4.903|            14.709|
|2016-05-26|            96|               null|     null|     null|     null|              null|
|2016-05-27|            64|              0.949|    3.798|    2.796|    2.782|             9.376|
|2016-05-28|             1|             0.5617|    6.537|    4.829|    4.764|             16.13|
|2016-05-29|             0|             0.6367|    7.541|    5.572|    5.474|            18.587|
|2016-05-30|             1|   