In [0]:
#mport the necessary libraries
import requests
from pyspark.sql import DataFrame

In [0]:
#Download a CSV file from a URL and save it locally as 'glacier.csv' using the requests library:

# Open a connection to the specified URL and retrieve the content in chunks
with requests.get('https://datahub.io/core/glacier-mass-balance/r/glaciers.csv', stream=True) as r:
  
  # Open a local file in binary write mode to save the downloaded content
  with open('/dbfs/glacier.csv', 'wb') as f:
    
    # Iterate over the content in chunks (each of size 8192 bytes)
    for chunk in r.iter_content(chunk_size=8192):
      
      # Write the current chunk of content to the local file
      f.write(chunk)


In [0]:
#Define a function get_data to download a file from a given URL and save it locally
def get_data(url:str):
  filename = url.split('/')[-1]
  with requests.get('https://datahub.io/core/glacier-mass-balance/r/glaciers.csv', stream=True) as r:
    with open("/dbfs/{}".format(filename), 'wb') as f:
      for chunk in r.iter_content(chunk_size=8192):
        f.write(chunk)
  return filename


In [0]:
file_name = get_data('https://datahub.io/core/glacier-mass-balance/r/glaciers.csv')

In [0]:
file_name

'glaciers.csv'

In [0]:
#Read the CSV file into a Spark DataFrame using the Spark SQL API.
spark.read.format("csv").option("header","true").load("file:/dbfs/glacier.csv")

DataFrame[Year: string, Mean cumulative mass balance: string, Number of observations: string]

In [0]:
#Determine the file format by extracting the extension from the filename.
file_format = file_name.split(".")[-1]

In [0]:
#Define a function read_data to read data based on its file format into a Spark DataFrame.
def read_data(file_name):
  if file_format == 'csv':
    df = spark.read.format(file_format).option("header","true").load("file:/dbfs/{}".format(file_name))
  elif file_format == 'json':
    try:
      df = spark.read.format(file_format).load("file:/dbfs/{}".format(file_name))
    except:
      df = spark.read.format(file_format).option("multiline","true").load("file:/dbfs/{}".format(file_name))
  elif file_format == 'parquet':
    df = spark,read.format(file_format).load("file:/dbfs/{}".format(file_name))
  elif file_format == 'txt':
    df = spark.read.text("file:/dbfs/{}".format(file_name))
  return df

In [0]:
df = read_data(file_name)

In [0]:
#Display the contents of the DataFrame using Databricks display
display(df)

Year,Mean cumulative mass balance,Number of observations
1945,0.0,
1946,-1.13,1.0
1947,-3.19,1.0
1948,-3.19,1.0
1949,-3.82,3.0
1950,-4.887,3.0
1951,-5.217,3.0
1952,-5.707,3.0
1953,-6.341,7.0
1954,-6.825,6.0


In [0]:
df.createOrReplaceTempView("df")

In [0]:
#Run a SQL query on the DataFrame using Spark SQL.
%sql
select * from df

Year,Mean cumulative mass balance,Number of observations
1945,0.0,
1946,-1.13,1.0
1947,-3.19,1.0
1948,-3.19,1.0
1949,-3.82,3.0
1950,-4.887,3.0
1951,-5.217,3.0
1952,-5.707,3.0
1953,-6.341,7.0
1954,-6.825,6.0


In [0]:
#Create temporary views for data from the 1990s and the modern era using SQL queries.
%sql
create or replace temp view nintys as select * from df where Year like '19%' order by Year asc;
create or replace temp view modern as select * from df where Year like '20%' order by Year asc;

In [0]:
#Execute SQL queries on the temporary views to create two new DataFrames.
nintys_df = spark.sql("select * from nintys")
modern_df = spark.sql("select * from modern")

In [0]:
#Define a function transform_data to transform the original DataFrame into two new DataFrames based on the era
def transform_data(df: DataFrame):
  spark.sql("create or replace temp view nintys as select * from df where Year like '19%' order by Year asc;")
  nintys_df = spark.sql("select * from nintys")
  spark.sql("create or replace temp view modern as select * from df where Year like '20%' order by Year asc;")
  modern_df = spark.sql("select * from modern")
  return nintys_df, modern_df

In [0]:
#Apply the transform_data function to create two DataFrames, x for the 1990s and y for the modern era
x, y = transform_data(df)

In [0]:
display(y)

Year,Mean cumulative mass balance,Number of observations
2000,-17.727,37
2001,-18.032,37
2002,-18.726,37
2003,-19.984,37
2004,-20.703,37
2005,-21.405,37
2006,-22.595,37
2007,-23.255,37
2008,-23.776,37
2009,-24.459,37


In [0]:
display(x)

Year,Mean cumulative mass balance,Number of observations
1945,0.0,
1946,-1.13,1.0
1947,-3.19,1.0
1948,-3.19,1.0
1949,-3.82,3.0
1950,-4.887,3.0
1951,-5.217,3.0
1952,-5.707,3.0
1953,-6.341,7.0
1954,-6.825,6.0


In [0]:
display(nintys_df)

Year,Mean cumulative mass balance,Number of observations
1945,0.0,
1946,-1.13,1.0
1947,-3.19,1.0
1948,-3.19,1.0
1949,-3.82,3.0
1950,-4.887,3.0
1951,-5.217,3.0
1952,-5.707,3.0
1953,-6.341,7.0
1954,-6.825,6.0


In [0]:
display(modern_df)

Year,Mean cumulative mass balance,Number of observations
2000,-17.727,37
2001,-18.032,37
2002,-18.726,37
2003,-19.984,37
2004,-20.703,37
2005,-21.405,37
2006,-22.595,37
2007,-23.255,37
2008,-23.776,37
2009,-24.459,37


In [0]:
#Create temporary views with the first and last rows of each era.
nintys_file_namez = spark.sql("(select * from nintys order by Year ASC limit 1) union (select * from nintys order by Year DESC limit 1)")
modern_file_namez = spark.sql("(select * from modern order by Year ASC limit 1) union (select * from modern order by Year DESC limit 1)")

In [0]:
display(nintys_file_namez)

Year,Mean cumulative mass balance,Number of observations
1945,0.0,
1999,-17.697,37.0


In [0]:
display(modern_file_namez)

Year,Mean cumulative mass balance,Number of observations
2000,-17.727,37
2014,-28.652,24


In [0]:
modern_file_namez_df = modern_file_namez.collect()

In [0]:
modern_file_name = f"{modern_file_namez_df[0]['Year']}-{modern_file_namez_df[1]['Year']}"

In [0]:
def create_file_names():
    # Retrieve the first and last years from the 'nintys' view for the 1990s era
    nintys_file_namez = spark.sql("(select * from nintys order by Year ASC limit 1) union (select * from nintys order by Year DESC limit 1)")
    
    # Retrieve the first and last years from the 'modern' view for the modern era
    modern_file_namez = spark.sql("(select * from modern order by Year ASC limit 1) union (select * from modern order by Year DESC limit 1)")
    
    # Collect the results of the SQL queries into DataFrames
    nintys_file_namez_df = nintys_file_namez.collect()
    modern_file_namez_df = modern_file_namez.collect()
    
    # Extract the 'Year' values from the DataFrames and create a formatted string for the 1990s era
    nintys_file_name = nintys_file_namez_df[0].__getitem__('Year') + "-" + nintys_file_namez_df[1].__getitem__('Year') 
    
    # Extract the 'Year' values from the DataFrames and create a formatted string for the modern era
    modern_file_name = modern_file_namez_df[0].__getitem__('Year') + "-" + modern_file_namez_df[1].__getitem__('Year') 
    
    # Return the formatted file names for both eras
    return nintys_file_name, modern_file_name

In [0]:
#Use the create_file_names function to get and print the formatted year ranges.
m,n = create_file_names()

In [0]:
print(m, n)

1945-1999 2000-2014


In [0]:
nintys_df.write.format('parquet').save("/dbfs/nintysdf.parquet")

In [0]:
#Define a function write_df to write DataFrames to different file formats.
def write_df(file_type: str,dfs, file_names):
  for x,y in zip(dfs,file_names):
    m = x.write.format(file_type).save("/dbfs/{}.{}".format(y, file_type))
  return m 

In [0]:
def write_df(file_type: str, dfs, file_names, mode="error"):
    for x, y in zip(dfs, file_names):
        m = x.write.format(file_type).mode(mode).save("/dbfs/{}.{}".format(y, file_type))
    return m

In [0]:
write_df("parquet", [x, y], [m, n], mode="overwrite")

In [0]:
write_df("parquet", [x, y], ["new_{}".format(m), "new_{}".format(n)])

In [0]:
#List the files in the Databricks File System.
dbutils.fs.ls('/dbfs/')

[FileInfo(path='dbfs:/dbfs/1945-1999.parquet/', name='1945-1999.parquet/', size=0, modificationTime=1704393132000),
 FileInfo(path='dbfs:/dbfs/2000-2014.parquet/', name='2000-2014.parquet/', size=0, modificationTime=1704393133000),
 FileInfo(path='dbfs:/dbfs/new_1945-1999.parquet/', name='new_1945-1999.parquet/', size=0, modificationTime=1704400389000),
 FileInfo(path='dbfs:/dbfs/new_2000-2014.parquet/', name='new_2000-2014.parquet/', size=0, modificationTime=1704400390000),
 FileInfo(path='dbfs:/dbfs/new_nintys_df.parquet/', name='new_nintys_df.parquet/', size=0, modificationTime=1704398358000),
 FileInfo(path='dbfs:/dbfs/nintys.parquet/', name='nintys.parquet/', size=0, modificationTime=1704400142000),
 FileInfo(path='dbfs:/dbfs/nintys_df.parquet/', name='nintys_df.parquet/', size=0, modificationTime=1704393106000),
 FileInfo(path='dbfs:/dbfs/nintysdf.parquet/', name='nintysdf.parquet/', size=0, modificationTime=1704400225000)]