## Filtering data

## Imports:

In [0]:
from pyspark.sql.functions import to_date, first, col, round # Functions Pyspark

## Reading files

In [0]:
# File location and file type
file_location = 'dbfs:/Workspace/Repos/otnielgomes/Pipelines_with_Airflow_and_Azure-Databricks/data/bronze/*/*/*'
file_type = 'parquet'

# Read files
df_all_data = spark.read.format(file_type) \
    .load(file_location)

#df_all_data.limit(5).display()

## Analyzing and processing data

In [0]:
# Number of lines
#df_all_data.count() 

8286

In [0]:
# Coins in dataset
#df_all_data \
    #.select('coin') \
    #.distinct() \
    #.display()

coin
SYP
SOS
XPF
TJS
XCD
SVC
SRD
VES
ZWL
ZMW


In [0]:
# Numbers of coins in dataset
#df_all_data \
    #.select('coin') \
    #.distinct() \
    #.count()

170

In [0]:
# Choosing currencies according to project request
coins = ['USD', 'EUR', 'GBP']

In [0]:
# Defined a new dataset with the chosen currencies
df_coin = df_all_data \
    .filter(df_all_data.coin.isin(coins))

#df_coin.limit(10).display()

coin,rate,date
USD,0.20105,2024-02-15
USD,0.201325,2024-02-18
USD,0.201325,2024-02-16
USD,0.201155,2024-02-17
USD,0.201215,2024-02-14
USD,0.20602,2024-01-01
USD,0.203088,2024-01-02
USD,0.203223,2024-01-03
USD,0.204203,2024-01-04
USD,0.205124,2024-01-05


In [0]:
# Coins in dataset
#df_coin \
    #.select('coin') \
    #.distinct() \
    #.display()

coin
USD
GBP
EUR


In [0]:
# Check the data type of the dataset
#df_coin.printSchema()

root
 |-- coin: string (nullable = true)
 |-- rate: double (nullable = true)
 |-- date: string (nullable = true)



In [0]:
# Adjusting the data column schema
df_coin = df_coin = df_coin \
    .withColumn('date', to_date('date'))

In [0]:
# Check the data type of the dataset
#df_coin.printSchema()

root
 |-- coin: string (nullable = true)
 |-- rate: double (nullable = true)
 |-- date: date (nullable = true)



In [0]:
# Creating columns for the selected currencies in the dataset
conversion_rate_results = df_coin \
    .groupBy('date') \
    .pivot('coin') \
    .agg(first('rate')) \
    .orderBy('date', ascending = False)

#conversion_rate_results.limit(5).display()

date,EUR,GBP,USD
2024-01-01,0.186651,0.161866,0.20602
2024-01-02,0.185614,0.160894,0.203088
2024-01-03,0.18599,0.160353,0.203223
2024-01-04,0.18654,0.161001,0.204203
2024-01-05,0.187247,0.16118,0.205124


In [0]:
# Adjusting the conversion rate values to check the value in BRL that we will need to purchase the other currencies
# Creating new dataset
result_value_BRL = conversion_rate_results
#result_value_BRL.limit(5).display()

date,EUR,GBP,USD
2024-01-01,0.186651,0.161866,0.20602
2024-01-02,0.185614,0.160894,0.203088
2024-01-03,0.18599,0.160353,0.203223
2024-01-04,0.18654,0.161001,0.204203
2024-01-05,0.187247,0.16118,0.205124


In [0]:
# Adjusting the conversion rate values to check the value in BRL that we will need to purchase the other currencies
# creating a FOR to convert the values
for coin in coins:
    result_value_BRL = result_value_BRL \
        .withColumn(coin, round(1 / col(coin), 4))

In [0]:
#result_value_BRL.limit(5).display()

date,EUR,GBP,USD
2024-01-01,5.3576,6.1779,4.8539
2024-01-02,5.3875,6.2153,4.924
2024-01-03,5.3766,6.2362,4.9207
2024-01-04,5.3608,6.2111,4.8971
2024-01-05,5.3405,6.2042,4.8751


## Saving data

In [0]:
# joining dataset partitions
conversion_rate_results = conversion_rate_results.coalesce(1)
result_value_BRL = result_value_BRL.coalesce(1)

In [0]:
# Datset: conversion_rate_results

# File_location and Filetype
file_location = 'dbfs:/Workspace/Repos/otnielgomes/Pipelines_with_Airflow_and_Azure-Databricks/data/silver/conversion_rate_results'
file_type = 'csv'

# Options CSV
first_row_is_header = 'True'
infer_schema = 'True'
delimiter = ','

# Mode
mode = 'overwrite'

# Writing the dataset
conversion_rate_results.write.format(file_type) \
    .mode(mode) \
    .option('header', first_row_is_header) \
    .option('inferSchema', infer_schema) \
    .option('sep', delimiter) \
    .save(file_location)

#display(dbutils.fs.ls(file_location))

In [0]:
# Datset: result_value_BRL
# File_location and Filetype
file_location = 'dbfs:/Workspace/Repos/otnielgomes/Pipelines_with_Airflow_and_Azure-Databricks/data/silver/result_value_BRL'
file_type = 'csv'

# Options CSV
first_row_is_header = 'True'
infer_schema = 'True'
delimiter = ','

# Mode
mode = 'overwrite'

# Writing the dataset
result_value_BRL.write.format(file_type) \
    .mode(mode) \
    .option('header', first_row_is_header) \
    .option('inferSchema', infer_schema) \
    .option('sep', delimiter) \
    .save(file_location)

#display(dbutils.fs.ls(file_location))