In [0]:
import requests
from pyspark.sql import DataFrame

In [0]:
file_loc = 'https://datahub.io/core/glacier-mass-balance/r/glaciers.csv'
with requests.get(file_loc, stream=True) as r:
    with open('/dbfs/glacier.csv','wb') as f:
        for chunk in r.iter_content(chunk_size=8192):
            f.write(chunk)

In [0]:
def get_data(url:str):
    filename = url.split('/')[-1]
    with requests.get(url, stream=True) as r:
        with open('/dbfs/{}'.format(filename), 'wb') as f:
            for chunk in r.iter_content(chunk_size=8192):
                f.write(chunk)
    return filename

In [0]:
file_name = get_data(file_loc)

In [0]:
  file_name

'glaciers.csv'

In [0]:
import os
spark.read.format("csv").option("header","true").load(os.path.join("file:/","dbfs",file_name))

DataFrame[Year: string, Mean cumulative mass balance: string, Number of observations: string]

In [0]:
file_format = file_name.split(".")[-1]
file_format

'csv'

In [0]:
 def read_data(file_name):
    path = os.path.join("file:/","dbfs",file_name)
    file_format = file_name.split(".")[-1]
    if file_format == 'csv':
        df = spark.read.format(file_format).option("header","true").load(path)
    elif file_format == 'json':
        try:
            df = spark.read.format(file_format).load(path)
        except:
            df = spark.read.format(file_format).option("multiline","true").load(path)
    elif file_format == 'parquet':
        df =  spark.read.text(path)
    return df        

In [0]:
df = read_data(file_name)

In [0]:
display(df)

Year,Mean cumulative mass balance,Number of observations
1945,0.0,
1946,-1.13,1.0
1947,-3.19,1.0
1948,-3.19,1.0
1949,-3.82,3.0
1950,-4.887,3.0
1951,-5.217,3.0
1952,-5.707,3.0
1953,-6.341,7.0
1954,-6.825,6.0


In [0]:
df.createOrReplaceTempView("df")

In [0]:
%sql
select * from df

Year,Mean cumulative mass balance,Number of observations
1945,0.0,
1946,-1.13,1.0
1947,-3.19,1.0
1948,-3.19,1.0
1949,-3.82,3.0
1950,-4.887,3.0
1951,-5.217,3.0
1952,-5.707,3.0
1953,-6.341,7.0
1954,-6.825,6.0


In [0]:
%sql
create or replace temp view nintys as select * from df where Year like '19%' order by Year asc;
create or replace temp view modern as select * from df where Year like '20%' order by Year asc;

In [0]:
nintys_df = spark.sql("select * from nintys")
modern_df = spark.sql("select * from modern")


In [0]:
def transform_data(df: DataFrame):
    spark.sql("create or replace temp view nintys as select * from df where Year like '19%' order by Year asc;")
    nintys_df = spark.sql("select * from nintys")
    spark.sql("create or replace temp view modern as select * from df where Year like '20%' order by Year asc;")
    modern_df = spark.sql("select * from modern")
    return nintys_df, modern_df

In [0]:
x, y = transform_data(df)

In [0]:
display(y)

Year,Mean cumulative mass balance,Number of observations
2000,-17.727,37
2001,-18.032,37
2002,-18.726,37
2003,-19.984,37
2004,-20.703,37
2005,-21.405,37
2006,-22.595,37
2007,-23.255,37
2008,-23.776,37
2009,-24.459,37


In [0]:
nintys_filename = spark.sql("(select * from nintys order by Year ASC limit 1) union (select * from nintys order by Year DESC limit 1)")
modern_filename = spark.sql("(select * from modern order by Year ASC limit 1) union (select * from modern order by Year DESC limit 1)")


In [0]:
display(nintys_filename)

Year,Mean cumulative mass balance,Number of observations
1945,0.0,
1999,-17.697,37.0


In [0]:
modern_filename_df = modern_filename.collect()

modern_filename_df[0].__getitem__('Year') + "-" + modern_filename_df[1].__getitem__('Year')

'2000-2014'

In [0]:
def create_file_name():
    nintys_filename = spark.sql("(select * from nintys order by Year ASC limit 1) union (select * from nintys order by Year DESC limit 1)")
    modern_filename = spark.sql("(select * from modern order by Year ASC limit 1) union (select * from modern order by Year DESC limit 1)") 
    nintys_filename_df = nintys_filename.collect()
    modern_filename_df = modern_filename.collect()
    nintys_filename = nintys_filename_df[0].__getitem__('Year') + "-" + nintys_filename_df[1].__getitem__('Year') 
    modern_filename = modern_filename_df[0].__getitem__('Year') + "-" + modern_filename_df[1].__getitem__('Year')
    return nintys_filename, modern_filename

In [0]:
m, n = create_file_name()

In [0]:
def write_df(file_type: str, dfs, file_names):
    for x, y in zip(dfs, file_names):
        m = x.write.format(file_type).save("/dbfs/{}.{}".format(y, file_type))
        

In [0]:
display(x)
m

Year,Mean cumulative mass balance,Number of observations
1945,0.0,
1946,-1.13,1.0
1947,-3.19,1.0
1948,-3.19,1.0
1949,-3.82,3.0
1950,-4.887,3.0
1951,-5.217,3.0
1952,-5.707,3.0
1953,-6.341,7.0
1954,-6.825,6.0


'1945-1999'

In [0]:
display(y)
n

Year,Mean cumulative mass balance,Number of observations
2000,-17.727,37
2001,-18.032,37
2002,-18.726,37
2003,-19.984,37
2004,-20.703,37
2005,-21.405,37
2006,-22.595,37
2007,-23.255,37
2008,-23.776,37
2009,-24.459,37


'2000-2014'

In [0]:
write_df("parquet", [x,y], [m, n])

In [0]:
dbutils.fs.ls('/dbfs/')

[FileInfo(path='dbfs:/dbfs/1945-1999.parquet/', name='1945-1999.parquet/', size=0, modificationTime=0),
 FileInfo(path='dbfs:/dbfs/2000-2014.parquet/', name='2000-2014.parquet/', size=0, modificationTime=0)]