In [0]:
print("Spark version", sc.version, spark.sparkContext.version, spark.version)
print("Python version", sc.pythonVer)

Spark version 3.5.0 3.5.0 3.5.0
Python version 3.10


In [0]:
import requests

In [0]:
r = requests.get("https://timeseries.surge.sh/usd_to_eur.csv")

In [0]:
df = spark.read.csv(sc.parallelize(r.text.splitlines()), header=True, inferSchema=True)

In [0]:
display(df)

Date,Rate
2000-01-01,0.9954210631096956
2000-01-02,0.9954210631096956
2000-01-03,0.9910802775024778
2000-01-04,0.970402717127608
2000-01-05,0.9645061728395062
2000-01-06,0.9626492106276472
2000-01-07,0.972384286269934
2000-01-08,0.972384286269934
2000-01-09,0.972384286269934
2000-01-10,0.9776126698602016


In [0]:
container = "raw"
storageAccount = "sourceblobstorageacc"
accessKey = "8nuZDSiapgVATVxV+fm3lwIFF0bhq45H490Dok/RrACg0MUo052M+NJ+o42ZKBslrEZPdFucjfOI+AStc9QTvQ=="

accountKey = "fs.azure.account.key.{}.blob.core.windows.net".format(storageAccount)

In [0]:
# Set the credentials to Spark configuration
spark.conf.set(
  accountKey,
  accessKey)

In [0]:
# Set the access key also in SparkContext to be able to access blob in RDD
# Hadoop configuration options set using spark.conf.set(...) are not accessible via SparkContext..
# This means that while they are visible to the DataFrame and Dataset API, they are not visible to the RDD API.

spark._jsc.hadoopConfiguration().set(
  accountKey,
  accessKey)

In [0]:
# Mount the drive for native python
inputSource = "wasbs://{}@{}.blob.core.windows.net".format(container, storageAccount)
mountPoint = "/mnt/" + container
extraConfig = {accountKey: accessKey}

print("Mounting: {}".format(mountPoint))

Mounting: /mnt/raw


In [0]:
try:
  dbutils.fs.mount(
    source = inputSource,
    mount_point = str(mountPoint),
    extra_configs = extraConfig
  )
  print("=> Succeeded")
except Exception as e:
  if "Directory already mounted" in str(e):
    print("=> Directory {} already mounted".format(mountPoint))
  else:
    raise(e)

=> Succeeded


In [0]:
dbutils.fs.ls(".")

[FileInfo(path='dbfs:/Volume/', name='Volume/', size=0, modificationTime=0),
 FileInfo(path='dbfs:/Volumes/', name='Volumes/', size=0, modificationTime=0),
 FileInfo(path='dbfs:/databricks-datasets/', name='databricks-datasets/', size=0, modificationTime=0),
 FileInfo(path='dbfs:/databricks-results/', name='databricks-results/', size=0, modificationTime=0),
 FileInfo(path='dbfs:/mnt/', name='mnt/', size=0, modificationTime=1705948734000),
 FileInfo(path='dbfs:/volume/', name='volume/', size=0, modificationTime=0),
 FileInfo(path='dbfs:/volumes/', name='volumes/', size=0, modificationTime=0)]

In [0]:
%fs

ls

path,name,size,modificationTime
dbfs:/Volume/,Volume/,0,0
dbfs:/Volumes/,Volumes/,0,0
dbfs:/databricks-datasets/,databricks-datasets/,0,0
dbfs:/databricks-results/,databricks-results/,0,0
dbfs:/mnt/,mnt/,0,1705948734000
dbfs:/volume/,volume/,0,0
dbfs:/volumes/,volumes/,0,0


In [0]:
inputFilePath = "wasbs://{}@{}.blob.core.windows.net/{}".format(container, storageAccount, "/usd_to_eur.csv")
df = spark.read.format("csv").load(inputFilePath, header=True, inferSchema=True)
display(df)

Date,Rate
2000-01-01,0.9954210631096956
2000-01-02,0.9954210631096956
2000-01-03,0.9910802775024778
2000-01-04,0.970402717127608
2000-01-05,0.9645061728395062
2000-01-06,0.9626492106276472
2000-01-07,0.972384286269934
2000-01-08,0.972384286269934
2000-01-09,0.972384286269934
2000-01-10,0.9776126698602016


In [0]:
df.printSchema()

root
 |-- Date: date (nullable = true)
 |-- Rate: double (nullable = true)



In [0]:
df.describe().show()

+-------+------------------+
|summary|              Rate|
+-------+------------------+
|  count|              6663|
|   mean|0.8409084713683753|
| stddev| 0.131864995212834|
|    min|0.6253908692933083|
|    max|1.2118274357731458|
+-------+------------------+



In [0]:
display(df)

Date,Rate
2000-01-01,0.9954210631096956
2000-01-02,0.9954210631096956
2000-01-03,0.9910802775024778
2000-01-04,0.970402717127608
2000-01-05,0.9645061728395062
2000-01-06,0.9626492106276472
2000-01-07,0.972384286269934
2000-01-08,0.972384286269934
2000-01-09,0.972384286269934
2000-01-10,0.9776126698602016


Databricks visualization. Run in Databricks to view.

In [0]:
df.createOrReplaceTempView("currencyRate")

In [0]:
spark.sql("SELECT * FROM currencyRate").show()

+----------+------------------+
|      Date|              Rate|
+----------+------------------+
|2000-01-01|0.9954210631096955|
|2000-01-02|0.9954210631096955|
|2000-01-03|0.9910802775024778|
|2000-01-04| 0.970402717127608|
|2000-01-05|0.9645061728395062|
|2000-01-06|0.9626492106276473|
|2000-01-07|0.9723842862699339|
|2000-01-08|0.9723842862699339|
|2000-01-09|0.9723842862699339|
|2000-01-10|0.9776126698602015|
|2000-01-11|0.9750390015600623|
|2000-01-12|0.9701202949165697|
|2000-01-13|0.9731413001167769|
|2000-01-14|0.9779951100244499|
|2000-01-15|0.9779951100244499|
|2000-01-16|0.9779951100244499|
|2000-01-17|0.9906875371507826|
|2000-01-18|0.9907856930545922|
|2000-01-19|0.9896091044037606|
|2000-01-20|0.9911785112498762|
+----------+------------------+
only showing top 20 rows



In [0]:
spark.sql(" SELECT month(Date) as month, ROUND(AVG(Rate),2) as avg_rate \
            FROM currencyRate \
            GROUP BY month \
            ORDER BY month")\
    .show()

+-----+--------+
|month|avg_rate|
+-----+--------+
|    1|    0.84|
|    2|    0.84|
|    3|    0.84|
|    4|    0.85|
|    5|    0.84|
|    6|    0.84|
|    7|    0.84|
|    8|    0.84|
|    9|    0.84|
|   10|    0.84|
|   11|    0.84|
|   12|    0.83|
+-----+--------+



In [0]:
df2 = spark.sql(" SELECT year(Date) as year, ROUND(AVG(Rate),2) as avg_rate \
            FROM currencyRate \
            GROUP BY year \
            ORDER BY year")
display(df2)

year,avg_rate
2000,1.09
2001,1.12
2002,1.06
2003,0.89
2004,0.81
2005,0.8
2006,0.8
2007,0.73
2008,0.68
2009,0.72


Databricks visualization. Run in Databricks to view.

In [0]:
%sql
SELECT year(Date) as year, ROUND(AVG(Rate),2) as avg_rate
            FROM currencyRate 
            GROUP BY year 
            ORDER BY year

year,avg_rate
2000,1.09
2001,1.12
2002,1.06
2003,0.89
2004,0.81
2005,0.8
2006,0.8
2007,0.73
2008,0.68
2009,0.72


In [0]:
import pyspark.sql.functions as f


DF = (df.groupBy(f.year("Date").alias("year"))
      .agg(f.count("Date").alias("count"), f.mean("rate").alias("mean"))
      .sort(f.desc("year")))
      
display(DF)


year,count,mean
2018,88,0.8136286205366102
2017,365,0.8870457315050745
2016,366,0.9040333893110878
2015,365,0.90152086665803
2014,365,0.7539379989861992
2013,365,0.7532984461830754
2012,366,0.7784882843021298
2011,365,0.7190862465439314
2010,365,0.7553223608492069
2009,365,0.719216033026926
