# Spark Filodb Read write

### 1. Reading raw points

In [2]:
raw_points_df = sqlContext.read.format('com.databricks.spark.csv') \
               .options(header='true', inferSchema='true') \
               .load('timeseries/input/points.csv')
raw_points_df = raw_points_df.drop("_c0")
raw_points_df.show(5)

+--------------------+------+----+---------+-----------+---------+------+------+-----------+-------+-------+----+------+---------+----+------------+------+-----+---------+------------------+---------+----+-------+---------+----+--------+--------------------+------+--------+----------+----------+-----------+----------+-----------+-----+------------------+-------+----+-----+------+------------+---------------+--------------------+--------------+----+-------+----+---+--------------------+---------+------+-------+------+---------+-------+--------------------+---------+--------------+----+--------+----+--------+------+-------+----------+----+---------+----+---------+-------+--------------+--------+-------+----+--------+-------+-----+--------+--------------+----+----------------+------+----+--------+------+---------+-------+----+-----+-----+------+----------+----+-----------+------+----+-----+----+----+-----+--------------+-------+----+--------------------+
|                  id|ahuRef| air|

### 2. Reading raw histories

In [13]:
# Reading history file
history_df = sqlContext.read.format('com.databricks.spark.csv') \
               .options(header='true') \
               .load('timeseries/input/histories.csv')
               
history_df.cache()
               
history_df.show(5)

+--------------------+----+----+---------+---------+---------+----------+------------+---------+-----------+---------+---------+---------+----------+---------+----------+----------+----------+------------+------------+------------+------------+----------------+----------------+----------+----------+----------+-------+-------+-------+-------+-------+---------+----------+----------+----------+-----------+-----------+-----------+------------+------------+------------+------------+---------------+---------------+---------------+---------------+---------------+---------+---------+---------+---------+-----------+---------+---------+----------+---------+---------+----------+----------+----------+----------+----------+----------+-------+-------+-------+-------+-------+---------+---------+---------+---------+----------------+----------------+---------------+---------------+---------------+---------------+---------+---------+----------+----------+----------+-------------+-------------+----------

In [14]:
import datetime, time
from pyspark.sql.types import *
columns = history_df.columns
if "Timestamp" in columns:
    columns.remove('Timestamp')

def clean_as_schema(row, columns):
    date_string = row['Timestamp']
    datetime = convert_to_timestamp(date_string)
    rows = []
    for column in columns:
        value = row[column]
        if value is not None and value != 'null' and value != '':
            new_row = [datetime, column, value]
            rows.append(Row(*new_row))
    return rows


def convert_to_timestamp(date_string):
    date_string = date_string[:19]
    return int((datetime.datetime.strptime(date_string, "%Y-%m-%dT%H:%M:%S") - datetime.datetime.utcfromtimestamp(
        0)).total_seconds() * 1000*1000*1000)
        
cleaned_his_rdd = history_df.rdd.flatMap(lambda row: clean_as_schema(row, columns))

schema = StructType([
    StructField("datetime", LongType(), False),
    StructField("pointName", StringType(), False),
    StructField("raw_value", StringType(), False)
    ])
    
cleaned_his_df = sqlContext.createDataFrame(cleaned_his_rdd, schema)
cleaned_his_df.registerTempTable("cleanedHisDf")
cleaned_his_df.show()

+-------------------+----------------+---------+
|           datetime|       pointName|raw_value|
+-------------------+----------------+---------+
|1478310223000000000|OAF-1-Tenant_ENB|    false|
|1478310223000000000|RAF-1-Tenant_ENB|    false|
|1478310246000000000|OAF-1-Tenant_STS|    false|
|1478310246000000000|RAF-1-Tenant_STS|    false|
|1486105205000000000|OAF-2-Tenant_ENB|     true|
|1486105205000000000|RAF-2-Tenant_ENB|     true|
|1486105210000000000|OAF-2-Tenant_STS|     true|
|1486105211000000000|RAF-2-Tenant_STS|     true|
|1502117823000000000|     STCWP-1_STS|     true|
|1502117823000000000|     PTCWP-1_ENB|     true|
|1505982247000000000|    Boiler-1_ENB|    false|
|1505982247000000000|    Boiler-2_ENB|    false|
|1505982546000000000|       HWP-9_ENB|    false|
|1505982547000000000|       HWP-8_ENB|    false|
|1506586598000000000| Tenant-CT-2_ENB|    false|
|1506586628000000000| Tenant-CT-2_STS|    false|
|1506586761000000000|     PTCWP-1_STS|     true|
|1506591532000000000

In [15]:
points_df.registerTempTable("points")
# We have registered both as temp table. Let's use sql to do the join
joined_his_df = sqlContext.sql("SELECT * from cleanedHisDf as h left join points as p on h.pointName = p.dis")
# This could be achieved by using api as well
joined_his_df = joined_his_df.drop("dis")
joined_his_df.show()

+-------------------+----------+--------------------+--------------------+------+---+---------+-----------+---------+------+------+-----------+-------+-------+----+------+---------+----+------------+------+-----+--------+---------+----+-------+---------+----+--------+-------------+------+--------+----+----------+-----------+----------+-----------+-----+----------+-------+----+-----+------+------------+---------------+--------------------+--------------+----+-------+----+---+--------------------+----------+------+-------+-----+---------+-------+--------------------+---------+--------------+----+--------+----+--------+------+-------+----------+----+---------+----+---------+-------+--------------------+--------+-------+----+--------+-------+-----+--------+--------------+----+----------------+------+----+--------+------+---------+-------+----+-----+-----+------+----------+----+-----------+------+----+-----+----+----+-----+--------------+-------+----+--------------------+
|           dateti

In [16]:
from pyspark.sql.functions import udf, col
def clean_raw_value(value, unit, kind):
    # todo write logic
    if "Bool" == kind:
        if "true" == value:
            return 1.0
        else:
            return 0.0
    else:
        return float(value.replace(unit,"").strip())

clean_udf = udf(lambda value, unit, kind: clean_raw_value(value, unit, kind), DoubleType())
joined_his_df = joined_his_df.withColumn("value", clean_udf(col("raw_value"), col("unit"), col("kind")))
joined_his_df.show()

+-------------------+----------+--------------------+--------------------+------+---+---------+-----------+---------+------+------+-----------+-------+-------+----+------+---------+----+------------+------+-----+--------+---------+----+-------+---------+----+--------+-------------+------+--------+----+----------+-----------+----------+-----------+-----+----------+-------+----+-----+------+------------+---------------+--------------------+--------------+----+-------+----+---+--------------------+----------+------+-------+-----+---------+-------+--------------------+---------+--------------+----+--------+----+--------+------+-------+----------+----+---------+----+---------+-------+--------------------+--------+-------+----+--------+-------+-----+--------+--------------+----+----------------+------+----+--------+------+---------+-------+----+-----+-----+------+----------+----+-----------+------+----+-----+----+----+-----+--------------+-------+----+--------------------+------------------+

In [17]:
joined_his_df = joined_his_df.drop("raw_value")
joined_his_df.printSchema()

root
 |-- datetime: long (nullable = false)
 |-- pointName: string (nullable = false)
 |-- id: string (nullable = true)
 |-- ahuRef: string (nullable = true)
 |-- air: string (nullable = true)
 |-- analytics: string (nullable = true)
 |-- armsAssetId: string (nullable = true)
 |-- averazing: string (nullable = true)
 |-- boiler: string (nullable = true)
 |-- bypass: string (nullable = true)
 |-- calendarRef: string (nullable = true)
 |-- chilled: string (nullable = true)
 |-- chiller: string (nullable = true)
 |-- cmd: string (nullable = true)
 |-- common: string (nullable = true)
 |-- condenser: string (nullable = true)
 |-- cool: string (nullable = true)
 |-- coolingTower: string (nullable = true)
 |-- damper: string (nullable = true)
 |-- delta: string (nullable = true)
 |-- disMacro: string (nullable = true)
 |-- discharge: string (nullable = true)
 |-- dk: string (nullable = true)
 |-- economy: string (nullable = true)
 |-- effective: string (nullable = true)
 |-- elec: string (nu

In [18]:
import hashlib
from pyspark.sql.functions import lit

def generate_id(siteRef, levelRef, point, separator):
    return hashlib.md5(separator.join([siteRef, levelRef, point]).encode()).hexdigest()
    
generate_id_udf = udf(lambda siteRef, levelRef, point, separator: generate_id(siteRef, levelRef, point, separator), StringType())

joined_his_df = joined_his_df.withColumn("pointId", generate_id_udf(col("siteRef"), col("levelRef"), col("pointName"), lit(":")))

In [23]:
def get_year_month(val):
    return datetime.datetime.fromtimestamp(val/(1000*1000*1000)).strftime("%Y-%m")
    
year_month_udf = udf(lambda date_val: get_year_month(date_val), StringType())
final_df = joined_his_df.withColumn("yearMonth", year_month_udf(col("datetime"))) # generating partition key as using only siteRef as partition may produce to large partition and cassandra partition should be < 1 GB
final_df = final_df.select("datetime", "yearMonth", "pointName", "siteRef", "levelRef", "equipRef", "value")
final_df.printSchema()

root
 |-- datetime: long (nullable = false)
 |-- yearMonth: string (nullable = true)
 |-- pointName: string (nullable = false)
 |-- siteRef: string (nullable = true)
 |-- levelRef: string (nullable = true)
 |-- equipRef: string (nullable = true)
 |-- value: double (nullable = true)



In [26]:
final_df.write.parquet("timeseries/output/metadataLessHistory.parquet", mode="overwrite")

In [27]:
finalDF = sqlContext.read.parquet("timeseries/output/metadataLessHistory.parquet")
finalDF.count()

21952

In [29]:
#writing into filodb using native api
finalDF.write.format("filodb.spark") \
        .option("dataset", "iot_history_metaless") \
        .option("partition_keys", "siteRef,yearMonth") \
        .option("row_keys", "datetime,pointName,equipRef,levelRef") \
        .option("chunk_size", "100") \
        .mode("overwrite") \
        .save()

In [31]:
iotDF = sqlContext.read.format("filodb.spark").option("dataset", "iot_history_metaless").load()
iotDF.count()
iotDF.show()

+-------------+--------------+-------------------+----------+-------+---------+-----+
|    pointName|      equipRef|           datetime|  levelRef|siteRef|yearMonth|value|
+-------------+--------------+-------------------+----------+-------+---------+-----+
|Chiller-2_STS|Site Chiller 2|1507046744000000000|Site Plant|   Site|  2017-10|  0.0|
|Chiller-2_ENB|Site Chiller 2|1507050417000000000|Site Plant|   Site|  2017-10|  0.0|
|   CDWP-2_ENB|   Site CDWP 2|1507050476000000000|Site Plant|   Site|  2017-10|  0.0|
|   CDWP-2_STS|   Site CDWP 2|1507050489000000000|Site Plant|   Site|  2017-10|  0.0|
|   CHWP-2_ENB|   Site CHWP 2|1507050535000000000|Site Plant|   Site|  2017-10|  0.0|
|   CHWP-2_STS|   Site CHWP 2|1507050551000000000|Site Plant|   Site|  2017-10|  0.0|
| Boiler-1_STS| Site Boiler 1|1507544565000000000|Site Plant|   Site|  2017-10|  0.0|
|    HWP-8_STS|    Site HWP 8|1507544634000000000|Site Plant|   Site|  2017-10|  0.0|
| Boiler-2_STS| Site Boiler 2|1507545158000000000|Site