In [0]:
# visualizando dados
display(dbutils.fs.ls("dbfs:/databricks-results/bronze/2024"))

path,name,size,modificationTime
dbfs:/databricks-results/bronze/2024/02/,02/,0,1712245564000
dbfs:/databricks-results/bronze/2024/03/,03/,0,1711856759000
dbfs:/databricks-results/bronze/2024/04/,04/,0,1712246659000


In [0]:
# exibindo os primeiros registros de todos os subdiretórios e arquivos dentro da pasta 'bronze'
spark.read.parquet("dbfs:/databricks-results/bronze/*/*/*").show()

+-----+----------+----------+
|moeda|      taxa|      data|
+-----+----------+----------+
|  USD|  0.201077|2024-02-05|
|  GBP|  0.160427|2024-02-05|
|  EUR|  0.187198|2024-02-05|
|  JPY| 29.889123|2024-02-05|
|  CNY|  1.431367|2024-02-05|
|  CAD|  0.272331|2024-02-05|
|  ZAR|  3.833315|2024-02-05|
|  ARS|166.702274|2024-02-05|
|  EUR|  0.187288|2024-02-06|
|  JPY| 29.779429|2024-02-06|
|  CNY|  1.432392|2024-02-06|
|  CAD|  0.271765|2024-02-06|
|  ZAR|  3.793992|2024-02-06|
|  ARS|167.106885|2024-02-06|
|  USD|  0.201175|2024-02-07|
|  GBP|  0.159326|2024-02-07|
|  EUR|  0.186701|2024-02-07|
|  JPY| 29.792147|2024-02-07|
|  CNY|  1.430573|2024-02-07|
|  CAD|  0.270912|2024-02-07|
+-----+----------+----------+
only showing top 20 rows



In [0]:
df_dados_juntos = spark.read.parquet("dbfs:/databricks-results/bronze/*/*/*")
df_dados_juntos.show(5)

+-----+---------+----------+
|moeda|     taxa|      data|
+-----+---------+----------+
|  USD| 0.201077|2024-02-05|
|  GBP| 0.160427|2024-02-05|
|  EUR| 0.187198|2024-02-05|
|  JPY|29.889123|2024-02-05|
|  CNY| 1.431367|2024-02-05|
+-----+---------+----------+
only showing top 5 rows



In [0]:
# consultando qtde de linhas no dataframe
df_dados_juntos.count()

344

In [0]:
# consultando total de moedas diferentes disponíveis no conjunto de dados
df_dados_juntos.select('moeda').distinct().count()

8

In [0]:
# filtrando conjunto de dados com apenas três moedas 
moedas = ['USD', 'EUR', 'GBP']

df_moedas = df_dados_juntos.filter(df_dados_juntos.moeda.isin(moedas))
df_moedas.show(5)

+-----+--------+----------+
|moeda|    taxa|      data|
+-----+--------+----------+
|  USD|0.201077|2024-02-05|
|  GBP|0.160427|2024-02-05|
|  EUR|0.187198|2024-02-05|
|  EUR|0.187288|2024-02-06|
|  USD|0.201175|2024-02-07|
+-----+--------+----------+
only showing top 5 rows



In [0]:
# nova consulta de moedas distintas no novo dataframe
df_moedas.select("moeda").distinct().count()

3

In [0]:
# verificando o esquema dos dados
df_moedas.printSchema()

root
 |-- moeda: string (nullable = true)
 |-- taxa: double (nullable = true)
 |-- data: string (nullable = true)



In [0]:
# convertendo os valores na coluna 'data' de string para date
from pyspark.sql.functions import to_date

df_moedas = df_moedas.withColumn("data", to_date("data"))

In [0]:
# verificando o esquema dos dados
df_moedas.printSchema()

root
 |-- moeda: string (nullable = true)
 |-- taxa: double (nullable = true)
 |-- data: date (nullable = true)



In [0]:
# informando o valor de cada taxa das 3 moedas por data ordenados por data de forma descendente
from pyspark.sql.functions import first

resultado_taxas_conversao = df_moedas.groupBy('data')\
                                    .pivot('moeda')\
                                    .agg(first('taxa'))\
                                    .orderBy('data', ascending=False)
resultado_taxas_conversao.show()

+----------+--------+--------+--------+
|      data|     EUR|     GBP|     USD|
+----------+--------+--------+--------+
|2024-04-04| 0.18374|0.157575|0.199664|
|2024-04-03|0.183063|0.156812|0.198409|
|2024-04-02|0.183317|0.156959|0.197398|
|2024-04-01|0.184242|0.157692| 0.19782|
|2024-03-31|0.184771|0.157804|0.199414|
|2024-03-30|0.184629|0.157923|0.199394|
|2024-03-27|0.185265|0.158791|0.200329|
|2024-03-25|0.185512|0.159099|0.201058|
|2024-03-24|0.184969|0.158678|0.199904|
|2024-03-23|0.185875|0.160247|0.201927|
|2024-03-22|0.183994|0.158625|0.199884|
|2024-03-21|0.185018|0.158734|0.200966|
|2024-03-20|0.184056|0.157292|  0.2013|
|2024-03-19|0.182958|0.156266|0.198748|
|2024-03-18|0.182815|0.156195|0.198784|
|2024-03-17|0.184115|0.157381|0.200361|
|2024-03-16|0.183642| 0.15721|0.200144|
|2024-03-15|0.183642|0.157185|0.200144|
|2024-03-14|0.183984|0.157103|0.200232|
|2024-03-12|0.184126|0.157247|0.201203|
+----------+--------+--------+--------+
only showing top 20 rows



In [0]:
#
resultado_valores_em_reais = resultado_taxas_conversao.select('*')
resultado_valores_em_reais.show(5)

+----------+--------+--------+--------+
|      data|     EUR|     GBP|     USD|
+----------+--------+--------+--------+
|2024-04-04| 0.18374|0.157575|0.199664|
|2024-04-03|0.183063|0.156812|0.198409|
|2024-04-02|0.183317|0.156959|0.197398|
|2024-04-01|0.184242|0.157692| 0.19782|
|2024-03-31|0.184771|0.157804|0.199414|
+----------+--------+--------+--------+
only showing top 5 rows



In [0]:
# calculando o valor equivalente em R$ para cada moeda disponível no dataframe
from pyspark.sql.functions import col, round

for moeda in moedas:
    resultado_valores_em_reais = resultado_valores_em_reais\
                                    .withColumn(
                                        moeda, round(1/col(moeda), 4)
                                    )

In [0]:
# visualizando resultado da conversão
resultado_valores_em_reais.show()

+----------+------+------+------+
|      data|   EUR|   GBP|   USD|
+----------+------+------+------+
|2024-04-04|5.4437|6.3452|5.0075|
|2024-04-03|5.4615|6.3776|5.0403|
|2024-04-02|5.4555|6.3694|5.0659|
|2024-04-01|5.4289|6.3412|5.0556|
|2024-03-31|5.4113|6.3371| 5.015|
|2024-03-30|5.4171|6.3331| 5.015|
|2024-03-27|5.3967|6.2972|4.9925|
|2024-03-25|5.3908|6.2854|4.9727|
|2024-03-24|5.4054|6.3012|5.0025|
|2024-03-23|5.3792|6.2422|4.9529|
|2024-03-22|5.4348|6.3052|5.0025|
|2024-03-21|5.4054|6.3012|4.9751|
|2024-03-20|5.4318|6.3573|4.9677|
|2024-03-19|5.4645| 6.398|5.0327|
|2024-03-18|5.4705| 6.402|5.0302|
|2024-03-17|5.4318|6.3532|  4.99|
|2024-03-16|5.4466|6.3613|4.9975|
|2024-03-15|5.4466|6.3613|4.9975|
|2024-03-14|5.4348|6.3654| 4.995|
|2024-03-12|5.4318|6.3613|4.9702|
+----------+------+------+------+
only showing top 20 rows



In [0]:
# diminuindo o número de partições do dataframe para 1, simplificando e garantindo neste projeto a geração de um único arquivo de saída
resultado_taxas_conversao = resultado_taxas_conversao.coalesce(1)
resultado_valores_em_reais = resultado_valores_em_reais.coalesce(1)

In [0]:
# salvando dados transformados na camada 'prata'
resultado_taxas_conversao.write\
    .mode ("overwrite")\
    .format("csv")\
    .option("header", "true")\
    .save("dbfs:/databricks-results/prata/taxas_conversao")
    
resultado_valores_em_reais.write\
    .mode ("overwrite")\
    .format("csv")\
    .option("header", "true")\
    .save("dbfs:/databricks-results/prata/valores_reais")


In [0]:
# consultando arquivo csv que foi salvo no diretório Databricks
display(dbutils.fs.ls('dbfs:/databricks-results/prata/valores_reais/'))

path,name,size,modificationTime
dbfs:/databricks-results/prata/valores_reais/_SUCCESS,_SUCCESS,0,1712260962000
dbfs:/databricks-results/prata/valores_reais/_committed_7855316289302317667,_committed_7855316289302317667,113,1712260962000
dbfs:/databricks-results/prata/valores_reais/_started_7855316289302317667,_started_7855316289302317667,0,1712260961000
dbfs:/databricks-results/prata/valores_reais/part-00000-tid-7855316289302317667-ada049ac-0069-4849-b36d-babb64992508-802-1-c000.csv,part-00000-tid-7855316289302317667-ada049ac-0069-4849-b36d-babb64992508-802-1-c000.csv,1378,1712260961000


In [0]:
display(dbutils.fs.ls('dbfs:/databricks-results/prata/taxas_conversao'))

path,name,size,modificationTime
dbfs:/databricks-results/prata/taxas_conversao/_SUCCESS,_SUCCESS,0,1712260957000
dbfs:/databricks-results/prata/taxas_conversao/_committed_77638854441662894,_committed_77638854441662894,111,1712260957000
dbfs:/databricks-results/prata/taxas_conversao/_started_77638854441662894,_started_77638854441662894,0,1712260956000
dbfs:/databricks-results/prata/taxas_conversao/part-00000-tid-77638854441662894-69850205-6289-42d9-8fea-0231f7c280d6-792-1-c000.csv,part-00000-tid-77638854441662894-69850205-6289-42d9-8fea-0231f7c280d6-792-1-c000.csv,1635,1712260957000


In [0]:
# excluindo pasta e arquivos
dbutils.fs.rm("dbfs:/databricks-results/prata/taxas_conversao", True)

True

In [0]:
dbutils.fs.rm("dbfs:/databricks-results/prata/valores_reais", True)

True