##Dolar Exchange Project<br/>


**1-API connection and extraction**

In [0]:
import pyspark
import pandas as pd
from datetime import datetime,timedelta,date
import requests


The date format to access a specific time period must be in the format MM-DD-YYYY. For more information on the API:
[API documentation](https://olinda.bcb.gov.br/olinda/servico/PTAX/versao/v1/documentacao)

###Set the period parameters

In [0]:
#Date paramters
todays_date = (date.today()).strftime("%m-%d-%Y")
yesterday = (date.today() - timedelta(days=1)).strftime("%m-%d-%Y") #Today's date -1 day

#print(yesterday)

In [0]:
# Get the CSV file from the URL
#Previous Months
#start_date="01-01-2023"
#end_date="09-30-2023"
#url = f"https://olinda.bcb.gov.br/olinda/servico/PTAX/versao/v1/odata/CotacaoDolarPeriodo(dataInicial=@dataInicial,dataFinalCotacao=@dataFinalCotacao)?@dataInicial=%27{start_date}%27&@dataFinalCotacao=%27{end_date}%27&$top=9000&$format=text/csv&$select=cotacaoCompra,cotacaoVenda,dataHoraCotacao"

url = f"https://olinda.bcb.gov.br/olinda/servico/PTAX/versao/v1/odata/CotacaoDolarPeriodo(dataInicial=@dataInicial,dataFinalCotacao=@dataFinalCotacao)?@dataInicial=%27{yesterday}%27&@dataFinalCotacao=%27{todays_date}%27&$top=9000&$format=text/csv&$select=cotacaoCompra,cotacaoVenda,dataHoraCotacao"
response = requests.get(url)

# Check if the request was successful
if response.status_code == 200:
    # Read the CSV file directly from the URL using pandas
    pandas_df = pd.read_csv(url)
    print(pandas_df)

else:
    print(f"Failed to retrieve data. Status code: {response.status_code}")

  cotacaoCompra cotacaoVenda          dataHoraCotacao
0         5,166       5,1666  2023-10-09 13:22:11.067


When we try to convert a pandas dataframe to Spark dataframe, we run into the following error:
- AttributeError: 'DataFrame' object has no attribute 'iteritems'\
This occurs because this method was removed in Pandas version 2.0.0. So I needed to use **items** instead.

In [0]:
pandas_df.iteritems = pandas_df.items

In [0]:

# Create a Spark DataFrame from the pandas DataFrame
dolarspark_df = spark.createDataFrame(pandas_df)
display(dolarspark_df.limit(10))

  'DataFrame' object has no attribute 'iteritems'
Attempting non-optimization as 'spark.sql.execution.arrow.pyspark.fallback.enabled' is set to true.
  warn(msg)


cotacaoCompra,cotacaoVenda,dataHoraCotacao
5166,51666,2023-10-09 13:22:11.067


#2-Connection to ADLS

In [0]:
#Create some widgets to safely store the information to our data lake
dbutils.widgets.text(name="Storage_Account",defaultValue="")
dbutils.widgets.text(name="Storage_Key",defaultValue="")
dbutils.widgets.text(name="Storage_Scope",defaultValue="")

In [0]:
#Acess the widgets to get our key and scope
scope = dbutils.widgets.get("Storage_Scope")
key = dbutils.widgets.get("Storage_Key")
storageAccount = dbutils.widgets.get("Storage_Account")
containerName = "ingrid-sollim"
mountpoint = "/mnt/ingrid-sollim/"
storageEndpoint = f"wasbs://{containerName}@{storageAccount}.blob.core.windows.net" 
storageKey = dbutils.secrets.get(scope=scope,key=key)
storageConn = f"fs.azure.account.key.{storageAccount}.blob.core.windows.net"

try:
    if not any(mount.mountPoint==mountpoint for mount in dbutils.fs.mounts()):
        dbutils.fs.mount(
        source = storageEndpoint,
        mount_point = mountpoint,
        extra_configs = {storageConn:storageKey}
    )
        print(f"{mountpoint} has been mounted")
    else:
        print(f"Mount point '{mountpoint}' is already mounted.")
except Exception as e:
    raise e  # Re-raise the exception if mounting fails

Mount point '/mnt/ingrid-sollim/' is already mounted.


In [0]:

path=mountpoint+f"dolar/{todays_date}"


In [0]:
%sql
--Disable commit, start files and success files
SET spark.sql.sources.commitProtocolClass=org.apache.spark.sql.execution.datasources.SQLHadoopMapReduceCommitProtocol;

key,value
spark.sql.sources.commitProtocolClass,org.apache.spark.sql.execution.datasources.SQLHadoopMapReduceCommitProtocol


In [0]:
#Save to datalake
dolarspark_df.repartition(1).write.format("parquet").option("header",True).option("sep",";").option("mode", "append").save(path)

In [0]:
display(dbutils.fs.ls(path))

path,name,size,modificationTime
dbfs:/mnt/ingrid-sollim/dolar/10-10-2023/_SUCCESS,_SUCCESS,0,1696901571000
dbfs:/mnt/ingrid-sollim/dolar/10-10-2023/_common_metadata,_common_metadata,651,1696901571000
dbfs:/mnt/ingrid-sollim/dolar/10-10-2023/_metadata,_metadata,1219,1696901570000
dbfs:/mnt/ingrid-sollim/dolar/10-10-2023/part-00000-a0bf2236-bddb-4327-ab22-9313819c8e95-c000.snappy.parquet,part-00000-a0bf2236-bddb-4327-ab22-9313819c8e95-c000.snappy.parquet,1294,1696901570000


#Rename the file

In [0]:
# Get the path to the first part file
part_files = [file for file in dbutils.fs.ls(path) if file.name.startswith("part-00000")]

if part_files:
    part_file_path = part_files[0].path
    
    # Extract the directory path (without the filename)
    directory_path = "/".join(part_file_path.split("/")[:-1])

    # Construct the new path using the directory and the desired filename
    new_file_path = f"{directory_path}/{todays_date}.parquet"

    # Rename the file
    dbutils.fs.mv(part_file_path, new_file_path)
else:
    print("No matching part file found.")



In [0]:
dbutils.fs.unmount(mountpoint)

/mnt/ingrid-sollim/ has been unmounted.
Out[82]: True