#1.Importing necessary libraries

In [0]:
import requests
import pandas as pd
from datetime import datetime

#2.Extracting & Transforming data

In [0]:
response = requests.get("https://api.coinbase.com/v2/prices/spot")
bitcoin_data = response.json()
print(bitcoin_data)

{'data': {'amount': '87907.915', 'base': 'BTC', 'currency': 'USD'}}


In [0]:
bitcoin_data['data']['amount']

'87907.915'

In [0]:
def extract_bitcoin_data():
    """
    Extracts the current price of Bitcoin from the Coinbase API."""
    url = "https://api.coinbase.com/v2/prices/spot"
    response = requests.get(url)
    return response.json()
bitcoin_data = extract_bitcoin_data()
bitcoin_data

{'data': {'amount': '87907.915', 'base': 'BTC', 'currency': 'USD'}}

In [0]:
dbutils.widgets.text("api_key", "", "API Key")
def extract_usd_to_brl_rate():
    """
    Extracts the current exchange rate (USD-BRL) from the CurrencyFreaks Api."""
    api_key = dbutils.widgets.get("api_key")
    url = f'https://api.currencyfreaks.com/v2.0/rates/latest?apikey={api_key}'
    response = requests.get(url)
    return float(response.json()['rates']['BRL'])
usd_to_brl_rate = extract_usd_to_brl_rate()
usd_to_brl_rate

5.3759

In [0]:
def treat_bitcoin_data(json_data, rate_usd_to_brl):
    """
    Treats the extracted data from the Coinbase API, rename the columns, adds timestamp and converts the price to BRL.
    """
    usd_value = float(json_data['data']['amount'])
    crypto = json_data['data']['base']
    original_currency = json_data['data']['currency']
    # Convert USD to BRL
    brl_value = usd_value * rate_usd_to_brl

    #Adding timestamp as a datetime object
    timestamp = datetime.now()

    treated_data = [{
        "usd_value": usd_value,
        "brl_value": brl_value,
        "crypto": crypto,
        "original_currency": original_currency,
        "rate_usd_to_brl": rate_usd_to_brl,
        "timestamp": timestamp
    }]

    return treated_data

In [0]:
bitcoin_data = extract_bitcoin_data()
usd_to_brl_rate = extract_usd_to_brl_rate()
treated_btc_data = treat_bitcoin_data(bitcoin_data, usd_to_brl_rate)
treated_btc_data
#So we transformed the data, now we can save it to a dataframe

[{'usd_value': 87907.915,
  'brl_value': 472584.16024849995,
  'crypto': 'BTC',
  'original_currency': 'USD',
  'rate_usd_to_brl': 5.3759,
  'timestamp': datetime.datetime(2026, 1, 21, 18, 31, 31, 551286)}]

#3.Configuring the Unity Catalog

In [0]:
%sql
CREATE CATALOG IF NOT EXISTS pipeline_api_bitcoin
COMMENT 'This catalog is used for the pipeline api demo';

In [0]:
%sql
CREATE SCHEMA IF NOT EXISTS pipeline_api_bitcoin.lakehouse
COMMENT 'This schema is used for saving the processed data';

In [0]:
%sql
CREATE VOLUME IF NOT EXISTS pipeline_api_bitcoin.lakehouse.raw_files
COMMENT 'This volume is used for storing initial raw data'

In [0]:
%sql
CREATE SCHEMA IF NOT EXISTS pipeline_api_bitcoin.bitcoin_data
COMMENT 'This schema is used for saving the processed Bitcoin data';


#4.Creating the Pandas DataFrame

In [0]:
df_pandas =  pd.DataFrame(treated_btc_data)
df_pandas


Unnamed: 0,usd_value,brl_value,crypto,original_currency,rate_usd_to_brl,timestamp
0,87907.915,472584.160248,BTC,USD,5.3759,2026-01-21 18:31:31.551286


#5.Saving in JSON using Pandas

In [0]:
#picking the event timestamp
event_ts = treated_btc_data[0]['timestamp']

#converting it to a better format
ts = event_ts.strftime('%Y-%m-%d %H:%M:%S_%f')

#json file path
json_file = f'/Volumes/pipeline_api_bitcoin/lakehouse/raw_files/bitcoin_{ts}.json'

#saving the dataframe to a json file
df_pandas.to_json(json_file, orient='records', date_format='iso', indent=2)
print(f'JSON file saved: {json_file}')

JSON file saved: /Volumes/pipeline_api_bitcoin/lakehouse/raw_files/bitcoin_2026-01-21 18:31:31_551286.json


#6.Saving in CSV

In [0]:
csv_file = f'/Volumes/pipeline_api_bitcoin/lakehouse/raw_files/bitcoin_{ts}.csv'

df_pandas.to_csv(csv_file, index= False)
print(f'CSV file saved: {csv_file}')


CSV file saved: /Volumes/pipeline_api_bitcoin/lakehouse/raw_files/bitcoin_2026-01-21 18:31:31_551286.csv


#7.Saving in Parquet

In [0]:
parquet_file = f'/Volumes/pipeline_api_bitcoin/lakehouse/raw_files/bitcoin_{ts}.parquet'

df_pandas.to_parquet(parquet_file, index=False)
print(f'Parquet file saved: {parquet_file}')

Parquet file saved: /Volumes/pipeline_api_bitcoin/lakehouse/raw_files/bitcoin_2026-01-21 18:31:31_551286.parquet


#8.Delta Table

In [0]:
df_spark = spark.createDataFrame(df_pandas)

df_spark.printSchema()
df_spark

root
 |-- usd_value: double (nullable = true)
 |-- brl_value: double (nullable = true)
 |-- crypto: string (nullable = true)
 |-- original_currency: string (nullable = true)
 |-- rate_usd_to_brl: double (nullable = true)
 |-- timestamp: timestamp (nullable = true)



DataFrame[usd_value: double, brl_value: double, crypto: string, original_currency: string, rate_usd_to_brl: double, timestamp: timestamp]

#9.Converting Delta Table to DataFrame

In [0]:
df_spark.write.format('delta').mode('append').saveAsTable('pipeline_api_bitcoin.bitcoin_data.bitcoin_data')

In [0]:
%sql
SELECT * FROM pipeline_api_bitcoin.bitcoin_data.bitcoin_data;

usd_value,brl_value,crypto,original_currency,rate_usd_to_brl,timestamp
87907.915,472584.1602485,BTC,USD,5.3759,2026-01-21T18:31:31.551Z


## Verifying the Delta Table History (Time Travel)

In [0]:
%sql
DESCRIBE HISTORY pipeline_api_bitcoin.bitcoin_data.bitcoin_data

version,timestamp,userId,userName,operation,operationParameters,job,notebook,clusterId,readVersion,isolationLevel,isBlindAppend,operationMetrics,userMetadata,engineInfo
1,2026-01-21T18:39:40.000Z,75906575077486,joao3653@ufu.br,SET TBLPROPERTIES,"Map(properties -> {""comment"":""The table contains data related to Bitcoin and its value in different currencies. It includes the USD and BRL values of Bitcoin, the original currency used for transactions, and the exchange rate between USD and BRL. The timestamp indicates when the data was recorded. This table can be used for analyzing Bitcoin price trends, currency conversion rates, and understanding market fluctuations over time.""})",,,,0.0,WriteSerializable,True,Map(),,Databricks-Runtime/17.3.x-aarch64-photon-scala2.13
0,2026-01-21T18:38:10.000Z,75906575077486,joao3653@ufu.br,CREATE TABLE AS SELECT,"Map(partitionBy -> [], clusterBy -> [], description -> null, isManaged -> true, properties -> {""delta.enableDeletionVectors"":""true""}, statsOnLoad -> true)",,List(3952732395635648),0121-183132-doym67d1-v2n,,WriteSerializable,True,"Map(numFiles -> 1, numOutputRows -> 1, numOutputBytes -> 1710)",,Databricks-Runtime/17.3.x-aarch64-photon-scala2.13
