In [1]:
import time
import datetime
from pyspark.sql.functions import lit,unix_timestamp,col,concat

In [2]:
interval_1_df = spark.read.option("header", True).option("inferSchema", True).csv('file:///home/guest/data/meter_data_1.csv')

In [3]:
timestamp = datetime.datetime.fromtimestamp(time.time()).strftime('%Y-%m-%d %H:%M:%S')
ts_df = interval_1_df.withColumn('received_ts',unix_timestamp(lit(timestamp),'yyyy-MM-dd HH:mm:ss').cast("timestamp"))
ert_combined_id_df = ts_df.withColumn('ert_id',concat(col('ert_id_ms_bits'),col('ert_id_ls_bits')))
sequence_id_df = ert_combined_id_df.withColumn('meter_data_id',col('_c0')).drop('_c0')
sequence_id_df.printSchema()

root
 |-- sync_bit: integer (nullable = true)
 |-- preamble: string (nullable = true)
 |-- ert_id_ms_bits: integer (nullable = true)
 |-- reserved: string (nullable = true)
 |-- physical_tamper: string (nullable = true)
 |-- ert_type: string (nullable = true)
 |-- encoder_tamper: string (nullable = true)
 |-- consumption_data: integer (nullable = true)
 |-- ert_id_ls_bits: decimal(24,0) (nullable = true)
 |-- checksum: long (nullable = true)
 |-- received_ts: timestamp (nullable = true)
 |-- ert_id: string (nullable = true)
 |-- meter_data_id: integer (nullable = true)



In [4]:
sequence_id_df.write.format("org.apache.spark.sql.cassandra").mode('append')\
.options(table="meter_data", keyspace="tangez_demo").save()

In [33]:
sqlContext=SQLContext(spark.sparkContext)
stage_df = sqlContext.read.format("org.apache.spark.sql.cassandra").options(table="meter_data", keyspace="tangez_demo").load()
stage_df.show()

+-------------+-----------------+----------------+--------------+--------------------+--------------------+--------------+--------+---------------+--------+--------------------+--------+--------+
|meter_data_id|         checksum|consumption_data|encoder_tamper|              ert_id|      ert_id_ls_bits|ert_id_ms_bits|ert_type|physical_tamper|preamble|         received_ts|reserved|sync_bit|
+-------------+-----------------+----------------+--------------+--------------------+--------------------+--------------+--------+---------------+--------+--------------------+--------+--------+
|          745|47498238489494848|            3392|          null|28100000000000000...|10000000000000000...|            28|electric|           null| 0xF2A60|2018-12-27 01:02:...|    null|       1|
|          863|47498238489494848|            3055|          null|19100000000000000...|10000000000000000...|            19|electric|           null| 0xF2A60|2018-12-27 01:02:...|    null|       1|
|          885|47498

In [31]:
customer_data = spark.read.option("header", True).option("inferSchema", True).csv('file:///home/guest/data/customer_data.csv')
cleaned_customer_data = customer_data.drop('_c0').withColumnRenamed('st_address','address')

cleaned_customer_data = cleaned_customer_data.where("ert_id is not null")

cleaned_customer_data.write.format("org.apache.spark.sql.cassandra").mode('append')\
.options(table="customer_data", keyspace="tangez_demo").save()


In [32]:
customer_data_from_db = stage_df = sqlContext.read.format("org.apache.spark.sql.cassandra").options(table="customer_data", keyspace="tangez_demo").load()
customer_data_from_db.count()

2678

In [36]:
joined_df = stage_df.join(customer_data_from_db,stage_df.ert_id==customer_data_from_db.ert_id)
joined_df.printSchema()
joined_df.count()

root
 |-- meter_data_id: string (nullable = true)
 |-- checksum: string (nullable = true)
 |-- consumption_data: integer (nullable = true)
 |-- encoder_tamper: string (nullable = true)
 |-- ert_id: string (nullable = true)
 |-- ert_id_ls_bits: string (nullable = true)
 |-- ert_id_ms_bits: string (nullable = true)
 |-- ert_type: string (nullable = true)
 |-- physical_tamper: string (nullable = true)
 |-- preamble: string (nullable = true)
 |-- received_ts: timestamp (nullable = true)
 |-- reserved: string (nullable = true)
 |-- sync_bit: integer (nullable = true)
 |-- ert_id: string (nullable = true)
 |-- address: string (nullable = true)
 |-- name: string (nullable = true)



202