### Demo Session

In [0]:
# Databricks Notebooks have some Apache Spark variables already defined.
# SparkContext: sc
# SQLContext/HiveContext: sqlContext
# SparkSession (Spark 2.x): spark

print("Spark version", sc.version, spark.sparkContext.version, spark.version)
print("Python version", sc.pythonVer)

In [0]:
%scala
println(sc.version)

In [0]:
import requests
r = requests.get("https://timeseries.surge.sh/usd_to_eur.csv")
df = spark.read.csv(sc.parallelize(r.text.splitlines()), header=True, inferSchema=True)
display(df)

In [0]:
container = dbutils.widgets.get("container")
dbutils.jobs.taskValues.set(key = 'container', value = container)
storageAccount = dbutils.widgets.get("storageAccount")
accessKey = dbutils.widgets.get("accessKey")
# container = "demo-container"
# storageAccount = "demoaccountstora"
# accessKey = "ydMAq8qniBp3afwrRK+qxzejhdEFu1KdmFT0UnggaPLbcDVHsl5a4PVcW+6Np4T1JMAm1BR+tQKY+AStBJuwRg=="

accountKey = "fs.azure.account.key.{}.blob.core.windows.net".format(storageAccount)

# Set the credentials to Spark configuration
spark.conf.set(
  accountKey,
  accessKey)

# Set the access key also in SparkContext to be able to access blob in RDD
# Hadoop configuration options set using spark.conf.set(...) are not accessible via SparkContext..
# This means that while they are visible to the DataFrame and Dataset API, they are not visible to the RDD API.

spark._jsc.hadoopConfiguration().set(
  accountKey,
  accessKey)

# Mount the drive for native python
inputSource = "wasbs://{}@{}.blob.core.windows.net".format(container, storageAccount)
mountPoint = "/mnt/" + container
extraConfig = {accountKey: accessKey}

print("Mounting: {}".format(mountPoint))

try:
  dbutils.fs.mount(
    source = inputSource,
    mount_point = str(mountPoint),
    extra_configs = extraConfig
  )
  print("=> Succeeded")
except Exception as e:
  if "Directory already mounted" in str(e):
    print("=> Directory {} already mounted".format(mountPoint))
  else:
    raise(e)

In [0]:
dbutils.help()

In [0]:
dbutils.fs.ls(".")

In [0]:
# using a WASB file path formatted like this:
# wasbs://<containername>@<accountname>.blob.core.windows.net/<partialPath>
# WASB (Windows Azure Storage Blob) is an extension built on top of the HDFS APIs. HDFS, the Hadoop Distributed File System, is one of the core Hadoop components that manage data and storage on multiple nodes.
inputFilePath = "wasbs://{}@{}.blob.core.windows.net/{}".format(container, storageAccount, "/demo_data.csv")
df = spark.read.format("csv").load(inputFilePath, header=True, inferSchema=True)
display(df)

In [0]:

# Using a mount point on worker nodes with Databricks FS protocol and request files using a file path like:
# dbfs:/mnt/<containername>/<partialPath>
inputFilePath = "dbfs:/mnt/{}/{}".format(container, "demo_data.csv")
df = spark.read.format("csv").load(inputFilePath, header=True, inferSchema=True)
display(df)

In [0]:
df.printSchema()

In [0]:
df.describe().show()

In [0]:
df.head(5)

In [0]:
# We can register the input dataframe as a temporary view named xrate in the SQL context
df.createOrReplaceTempView("erate")

In [0]:
e_df=spark.sql("select YEAR(Date) as year, COUNT(Date) as count, Mean(Rate) as mean \
    from erate \
        GROUP BY YEAR(Date) order by year DESC")
display(e_df)

In [0]:
import pyspark.sql.functions as f

retDF = (
  df
  .groupBy(f.year("Date").alias("year"))
  .agg(f.count("Date").alias("count"), f.mean("Rate").alias("mean"))
  .sort(f.desc("year"))
)

display(retDF.head(4))

In [0]:
# Plotting
import plotly.offline as py
import plotly.graph_objs as go
import plotly.figure_factory as ff

plots = []
pandaData = df.toPandas().reset_index().set_index('Date')
hd_trace = go.Scatter(x=pandaData.index, y=pandaData["Rate"], name="Rate")
plots.append(hd_trace)
 
# Plot  
p = py.plot(plots, output_type='div')

displayHTML(p)

In [0]:
display(df.select("Date", "Rate").where("Date = '2000-01-01'"))

In [0]:
from pyspark.sql.functions import *
df_with_year = df.withColumn("year", year(df["Date"]))

In [0]:
df_with_year.show()

In [0]:
output_path = f"wasbs://{container}@{storageAccount}.blob.core.windows.net/demo_data_process.csv"
df_with_year.write.mode("overwrite").option("header", "true").csv(output_path)