In [26]:
from pyspark.sql import SparkSession, SQLContext
from pyspark.conf import SparkConf
spark = SparkSession.builder\
        .config('spark.jars.packages', 'org.apache.hadoop:hadoop-azure:3.3.1')\
        .getOrCreate()

In [27]:
azure_application_id = "4413f43e-1232-47a3-b0dd-b224a5189b30"
azure_application_secret = "<secret>"
azure_ad_id = "2fd0bd59-1136-48e1-884a-0938d52022ce"

container_name = "adzd-adl-data"
storage_account_name = "adzdadl"
mount_path = "/mnt/azuredata"

spark.conf.set(f"fs.azure.account.auth.type.{storage_account_name}.dfs.core.windows.net", "OAuth")
spark.conf.set(f"fs.azure.account.oauth.provider.type.{storage_account_name}.dfs.core.windows.net", "org.apache.hadoop.fs.azurebfs.oauth2.ClientCredsTokenProvider")
spark.conf.set(f"fs.azure.account.oauth2.client.id.{storage_account_name}.dfs.core.windows.net", azure_application_id)
spark.conf.set(f"fs.azure.account.oauth2.client.secret.{storage_account_name}.dfs.core.windows.net", azure_application_secret)
spark.conf.set(f"fs.azure.account.oauth2.client.endpoint.{storage_account_name}.dfs.core.windows.net", f"https://login.microsoftonline.com/{azure_ad_id}/oauth2/token")

In [28]:
file_path = "2014-01-14.ndjson"
single_file_url = f"abfss://{container_name}@{storage_account_name}.dfs.core.windows.net/{file_path}"
all_files_url = f"abfss://{container_name}@{storage_account_name}.dfs.core.windows.net/"

In [29]:
import time

def time_measure(f):
    def timed(*args, **kw):
        ts = time.time()
        result = f(*args, **kw)
        te = time.time()
        
        print('func:%r %r took %2.4f sec' % (f.__name__, args, te-ts))
        return result
    return timed

In [30]:
@time_measure
def load_file(url):
    return spark.read.json(url)

@time_measure
def process_file(url):
    return spark.read.json(url)\
        .groupBy(["country", "parameter"])\
        .avg("value")\
        .collect()

In [31]:
load_file(single_file_url)

func:'load_file' ('abfss://adzd-adl-data@adzdadl.dfs.core.windows.net/2014-01-14.ndjson',) took 0.5975 sec


DataFrame[attribution: array<struct<name:string,url:string>>, averagingPeriod: struct<unit:string,value:bigint>, city: string, coordinates: struct<latitude:double,longitude:double>, country: string, date: struct<local:string,utc:string>, location: string, mobile: boolean, parameter: string, sourceName: string, sourceType: string, unit: string, value: double]

In [32]:
process_file(single_file_url)

func:'process_file' ('abfss://adzd-adl-data@adzdadl.dfs.core.windows.net/2014-01-14.ndjson',) took 0.9167 sec


[Row(country='CN', parameter='pm25', avg(value)=170.56666666666666)]

In [8]:
load_file(all_files_url)

func:'load_file' ('abfss://adzd-adl-data@adzdadl.dfs.core.windows.net/',) took 10.3954 sec


DataFrame[attribution: array<struct<name:string,url:string>>, averagingPeriod: struct<unit:string,value:double>, city: string, coordinates: struct<latitude:double,longitude:double>, country: string, date: struct<local:string,utc:string>, location: string, mobile: boolean, parameter: string, sourceName: string, sourceType: string, unit: string, value: double]

In [9]:
process_file(all_files_url)

func:'process_file' ('abfss://adzd-adl-data@adzdadl.dfs.core.windows.net/',) took 11.2645 sec


[Row(country='BG', parameter='so2', avg(value)=9.879365079365076),
 Row(country='NO', parameter='co', avg(value)=458.87128571428565),
 Row(country='RO', parameter='pm25', avg(value)=26.60277544487179),
 Row(country='RS', parameter='pm10', avg(value)=77.20296876923075),
 Row(country='AU', parameter='so2', avg(value)=0.0010872294372294374),
 Row(country='AT', parameter='pm10', avg(value)=25.38077178143156),
 Row(country='IE', parameter='no2', avg(value)=8.439883720930231),
 Row(country='KV', parameter='pm25', avg(value)=65.28571428571429),
 Row(country='RS', parameter='so2', avg(value)=16.800024266055043),
 Row(country='BG', parameter='no2', avg(value)=21.479666666666684),
 Row(country='BD', parameter='pm25', avg(value)=122.47619047619048),
 Row(country='FR', parameter='co', avg(value)=389.3953488372093),
 Row(country='HK', parameter='pm10', avg(value)=39.853174603174615),
 Row(country='KZ', parameter='pm25', avg(value)=2.422857142857143),
 Row(country='RS', parameter='co', avg(value)=12