Hdfs file interactions to other storage account (configured through Ambari)

In [None]:
container_url = 'wasbs://meetup@pn123dev.blob.core.windows.net/'
user = 'marcomansi80-eh-dev'
folder = 'dambd/*/2017/02/*/*/*'

!hdfs dfs -ls -R -h $container_url/$user/$folder | head -n10    

In [None]:
!hdfs dfs -du -h wasbs://meetup@pn123dev.blob.core.windows.net/marcomansi80-eh-dev/

In [None]:
!hdfs dfs -cat wasbs://meetup@pn123dev.blob.core.windows.net/marcomansi80-eh-dev/dambd/0/2017/02/22/09/10/26 | head -c 700    

Now lets get started with a Spark Session:

In [None]:
import findspark
findspark.init(spark_home='/usr/hdp/current/spark2-client/',
               python_path='/usr/bin/anaconda/envs/py35/bin/python')

import os
from pyspark.sql import SparkSession

spark = (
    SparkSession
    .builder
#   .config('customproperty', 'customvalue')
    .appName(os.environ['USER'])
    .getOrCreate()
)

spark.version

In [None]:
# To free the resources on the cluster, please stop your session when finished.
spark.stop()

In [None]:
hadoop_conf = spark.sparkContext._jsc.hadoopConfiguration()
hadoop_conf.set('avro.mapred.ignore.inputs.without.extension', 'false')

In [None]:
input_sdf = (
    spark.read.format("com.databricks.spark.avro")
    .load(container_url + os.sep + user + os.sep + folder)
)

In [None]:
input_sdf.show(n=5)
input_sdf.count()

In [None]:
from pyspark.sql import functions as F

meter_sdf = (
    input_sdf
    .withColumn("Body", F.col("Body").astype('string'))
    .withColumn("Datetime", F.get_json_object('Body',"$.Date").astype('timestamp'))
    .withColumn("ElectricityUsage", F.get_json_object('Body', '$.ElectricityUsage').astype('integer'))
    .withColumn("CustomerId", F.get_json_object('Body', '$.CustomerId'))
    .drop("Body")
)

In [None]:
# Cache the usable DataFrame: show the columns and 5 datapoints, count nr of records.
meter_sdf.persist()
meter_sdf.show(n=5)
meter_sdf.count()

Transform DataFrame into aggregation and visualize with a plot:

In [None]:
def to_date_hour(col):
    return F.from_unixtime(F.round(F.unix_timestamp(col) / 3600) * 3600)

def to_date_minute(col):
    return F.from_unixtime(F.round(F.unix_timestamp(col) / 60) * 60)

avg_per_date_sdf = (
    meter_sdf
    .withColumn('Date', to_date_minute(F.col('DateTime')))
    .groupBy('Date')
    .agg(F.mean('ElectricityUsage').alias("AvgElectricityUsage"))
    .sort('Date')
)

In [None]:
avg_per_date_sdf.toPandas()

In [None]:
def plot_sdf(sdf, x, y):
    sdf.toPandas().plot(x=x, y=y, rot=90, figsize=(18, 8))

# Enable inline plotting of charts
%matplotlib inline

plot_sdf(avg_per_date_sdf, x='Date', y='AvgElectricityUsage')

Spark SQL example:

In [None]:
meter_sdf.registerTempTable('meter_data')

spark.sql("""
select
    CustomerId,
    date(DateTime) as date,
    count(ElectricityUsage) as cnt,
    min(ElectricityUsage) as max,
    max(ElectricityUsage) as min,
    avg(ElectricityUsage) as avg
from
    meter_data
group by 
    date(DateTime),
    CustomerId
""").show()