In [129]:
import requests
import pandas as pd
from datetime import datetime, timedelta, timezone
from deltalake import DeltaTable, write_deltalake
from deltalake.exceptions import TableNotFoundError
import pyarrow as pa
from pprint import pprint

#requests.get? 
# #Signature: requests.get(url, params=None, **kwargs)
#requests.post?
# #Signature: requests.post(url, data=None, json=none, **kwargs)

In [130]:
def get_data(base_url, endpoint, data_field, params=None, headers=None):
        try:
            endpoint_url = f"{base_url}/{endpoint}"
            response = requests.get(endpoint_url, params=params, headers=headers)
            response.raise_for_status()  # Levanta una excepción si hay un error en la respuesta HTTP.

        # Verificar si los datos están en formato JSON.
            try:
                data = response.json()
                data = data[data_field]
            except:
                print("El formato de respuesta no es el esperado")
                return None
            return data

        except requests.exceptions.RequestException as e:
            # Capturar cualquier error de solicitud, como errores HTTP.
            print(f"La petición ha fallado. Código de error : {e}")
        return None

def build_table(json_data):
    try:
        df = pd.json_normalize(json_data)
        return df
    except:
        print("Los datos no están en el formato esperado")
        return None

In [131]:

# Usar la función para hacer la petición a la API de CoinCap
base_url = "https://api.coincap.io/v2"

In [132]:

endpoint = "assets"
endpoint2 = "markets"

json_data = get_data(base_url, endpoint, data_field="data")
json_data_times =get_data(base_url, endpoint, data_field="timestamp")

json_data2 = get_data(base_url, endpoint2, data_field="data")
json_data_times2 =get_data(base_url, endpoint2, data_field="timestamp")


In [133]:
pprint(json_data)

pprint(json_data2)

[{'changePercent24Hr': '0.4410115189809265',
  'explorer': 'https://blockchain.info/',
  'id': 'bitcoin',
  'marketCapUsd': '1790318125119.5495605746329380',
  'maxSupply': '21000000.0000000000000000',
  'name': 'Bitcoin',
  'priceUsd': '90494.5093554215387780',
  'rank': '1',
  'supply': '19783721.0000000000000000',
  'symbol': 'BTC',
  'volumeUsd24Hr': '12519563199.2030121898396041',
  'vwap24Hr': '90245.7684177295013732'},
 {'changePercent24Hr': '0.9786167989928653',
  'explorer': 'https://etherscan.io/',
  'id': 'ethereum',
  'marketCapUsd': '373836104330.3023309222233559',
  'maxSupply': None,
  'name': 'Ethereum',
  'priceUsd': '3104.3320219216994461',
  'rank': '2',
  'supply': '120424008.0282661200000000',
  'symbol': 'ETH',
  'volumeUsd24Hr': '8879590010.1099003588295158',
  'vwap24Hr': '3098.6174440684339828'},
 {'changePercent24Hr': '0.0063896807594976',
  'explorer': 'https://www.omniexplorer.info/asset/31',
  'id': 'tether',
  'marketCapUsd': '127540841059.9573799288127283

In [134]:
df_assets = build_table(json_data)

df_markets = build_table(json_data2)

In [135]:
df_assets.head()

df_markets.head()


Unnamed: 0,exchangeId,rank,baseSymbol,baseId,quoteSymbol,quoteId,priceQuote,priceUsd,volumeUsd24Hr,percentExchangeVolume,tradesCount24Hr,updated
0,alterdice,1,BTC,bitcoin,USDT,tether,90488.72,90487.2968923742,33972182.869167455,100.0,7.0,1731901429141
1,bibox,1,ETH,ethereum,USDT,tether,3104.72,3104.671172359296,28927631.454518043,23.403324211702618,,1731901423330
2,bibox,2,LINK,chainlink,USDT,tether,14.265,14.26477565568082,13486885.57166773,10.911296216426884,,1731901098057
3,bibox,3,BTC,bitcoin,USDT,tether,90517.3,90515.87644289923,6626180.520001143,5.36077940700654,,1731901408218
4,bibox,4,BNB,binance-coin,USDT,tether,626.08,626.0701536984681,5502034.545473061,4.451311490681436,,1731893357144


In [136]:
# Si los datos de los activos y el timestamp son válidos
if df_assets is not None and json_data_times is not None:
    # Convertir el timestamp de milisegundos a segundos y luego a una fecha legible
    timestamp_datetime = datetime.fromtimestamp(json_data_times / 1000, tz=timezone.utc)

    # Dividir el timestamp en columnas de fecha y hora
    df_assets['date'] = timestamp_datetime.date()  
    df_assets['time'] = timestamp_datetime.time()

    # Convertir la columna 'date' a un tipo compatible con Delta Lake
    df_assets['date'] = pd.to_datetime(df_assets['date'], errors='coerce')

    # Convertir la columna 'time' a un tipo compatible (por ejemplo, string)
    df_assets['time'] = df_assets['time'].astype(str)
# Si los datos de los activos y el timestamp son válidos
if df_markets is not None and json_data_times2 is not None:
    # Convertir el timestamp de milisegundos a segundos y luego a una fecha legible
    timestamp_datetime2 = datetime.fromtimestamp(json_data_times2 / 1000, tz=timezone.utc)

# Asignar el timestamp al DataFrame
    # Dividir el timestamp en columnas de fecha y hora
    df_markets['date'] = timestamp_datetime2.date()  
    df_markets['time'] = timestamp_datetime2.time()
    
    # Convertir la columna 'date' a un tipo compatible con Delta Lake
    df_markets['date'] = pd.to_datetime(df_markets['date'], errors='coerce')

    # Convertir la columna 'time' a un tipo compatible (por ejemplo, string)
    df_markets['time'] = df_markets['time'].astype(str)



In [137]:
# Aquí debería aparecer la columna 'timestamp' en el DataFrame
# DF temporal
df_assets.head()

Unnamed: 0,id,rank,symbol,name,supply,maxSupply,marketCapUsd,volumeUsd24Hr,priceUsd,changePercent24Hr,vwap24Hr,explorer,date,time
0,bitcoin,1,BTC,Bitcoin,19783721.0,21000000.0,1790318125119.5496,12519563199.203012,90494.50935542154,0.4410115189809265,90245.7684177295,https://blockchain.info/,2024-11-18,03:44:52.621000
1,ethereum,2,ETH,Ethereum,120424008.02826612,,373836104330.3023,8879590010.109901,3104.3320219216994,0.9786167989928652,3098.6174440684335,https://etherscan.io/,2024-11-18,03:44:52.621000
2,tether,3,USDT,Tether,127539192697.57086,,127540841059.95737,32612943391.488503,1.0000129243595763,0.0063896807594976,0.9997484425731084,https://www.omniexplorer.info/asset/31,2024-11-18,03:44:52.621000
3,solana,4,SOL,Solana,474610053.50994295,,113864476387.231,3099481184.7674775,239.91164018788652,10.13323977675906,234.2336246087076,https://explorer.solana.com/,2024-11-18,03:44:52.621000
4,binance-coin,5,BNB,BNB,166801148.0,166801148.0,104060595432.36269,481361241.2702568,623.8601872953698,1.5459438255052078,626.0740744732262,https://etherscan.io/token/0xB8c77482e45F1F44d...,2024-11-18,03:44:52.621000


In [138]:
# DF estatico
df_markets.head()


Unnamed: 0,exchangeId,rank,baseSymbol,baseId,quoteSymbol,quoteId,priceQuote,priceUsd,volumeUsd24Hr,percentExchangeVolume,tradesCount24Hr,updated,date,time
0,alterdice,1,BTC,bitcoin,USDT,tether,90488.72,90487.2968923742,33972182.869167455,100.0,7.0,1731901429141,2024-11-18,03:44:53.800000
1,bibox,1,ETH,ethereum,USDT,tether,3104.72,3104.671172359296,28927631.454518043,23.403324211702618,,1731901423330,2024-11-18,03:44:53.800000
2,bibox,2,LINK,chainlink,USDT,tether,14.265,14.26477565568082,13486885.57166773,10.911296216426884,,1731901098057,2024-11-18,03:44:53.800000
3,bibox,3,BTC,bitcoin,USDT,tether,90517.3,90515.87644289923,6626180.520001143,5.36077940700654,,1731901408218,2024-11-18,03:44:53.800000
4,bibox,4,BNB,binance-coin,USDT,tether,626.08,626.0701536984681,5502034.545473061,4.451311490681436,,1731893357144,2024-11-18,03:44:53.800000


## Delta lake

In [139]:
def save_data_as_delta(df, path, mode="overwrite", partition_cols=None):
    #Guardar datos (opcionalmente particionados)|Escribe (write_deltalake)

    write_deltalake(
        path, df, mode=mode, partition_by=partition_cols
    )

def save_new_data_as_delta(new_data, data_path, predicate, partition_cols=None):
    #Guardar solo datos nuevos evitando duplicados	| MERGE (inserción condicional)
    try:
        dt = DeltaTable(data_path)
        # Convertir los nuevos datos a formato Arrow para trabajar con Delta
        new_data_pa = pa.Table.from_pandas(new_data)
        # Se insertan en target, datos de source que no existen en target
        dt.merge(
            source=new_data_pa,
            source_alias="source",
            target_alias="target",
            predicate=predicate
        ) \
        .when_not_matched_insert_all() \
        .execute()

    # sino se guarda como nueva
    except TableNotFoundError:
        save_data_as_delta(new_data, data_path, partition_cols=partition_cols)

def upsert_data_as_delta(data, data_path, predicate):
    #Actualizar o insertar registros dependiendo de coincidencias | MERGE (actualización/inserción)
    try:
        dt = DeltaTable(data_path)
        data_pa = pa.Table.from_pandas(data)
        dt.merge(
            source=data_pa,
            source_alias="source",
            target_alias="target",
            predicate=predicate
        ) \
        .when_matched_update_all() \
        .when_not_matched_insert_all() \
        .execute()
    except TableNotFoundError:
        save_data_as_delta(data, data_path)
    except Exception as e:
        # Capturar cualquier otro error
        print(f"Ocurrió un error al realizar el upsert: {e}")

In [140]:
#Ruta de guardado
bronze_dir = "datalake/bronze/api_coincap"
assets_raw_dir = f"{bronze_dir}/assets"

markets_raw_dir = f"{bronze_dir}/markets"

### Assets
https://api.coincap.io/v2/assets

In [141]:
# Guardar solo los nuevos datos (evitar duplicados) usando el predicado basado en el id
try:
    upsert_data_as_delta(
        df_assets,
        assets_raw_dir,
        "target.id = source.id"
    )
except Exception as e:
    print(f"Ocurrió un error al guardar los datos: {e}")

In [142]:
canRow = DeltaTable(assets_raw_dir)
print(f"Cant de filas: {canRow.to_pandas().shape[0]}")

Cant de filas: 100


In [143]:
#.sort_values("rank")
DeltaTable(assets_raw_dir).to_pandas()
# Cargar la tabla Delta como un DataFrame de Pandas
#df = DeltaTable(f"{bronze_dir}/assets").to_pandas()

# Filtrar las filas donde el id es "bitcoin"
#df_bitcoin = df[df['id'] == 'bitcoin']

# Mostrar las primeras 10 filas del DataFrame filtrado
#df_bitcoin.head(10)


Unnamed: 0,id,rank,symbol,name,supply,maxSupply,marketCapUsd,volumeUsd24Hr,priceUsd,changePercent24Hr,vwap24Hr,explorer,date,time
0,dogecoin,6,DOGE,Dogecoin,146835886383.7052600000000000,,54323132468.6564966601152408,2493603582.2985157775229262,0.3699581471977607,5.7502002447362146,0.3616966795665211,http://dogechain.info/chain/Dogecoin,2024-11-18,03:44:52.621000
1,fetch,28,FET,Artificial Superintelligence Alliance,2520000000.0000000000000000,2630547141.0000000000000000,3267902626.2117592560000000,91193332.0475563610285784,1.2967867564332378,1.4558362386439533,1.2816341905599402,https://etherscan.io/token/0x1d287cc25dad7ccaf...,2024-11-18,03:44:52.621000
2,vechain,34,VET,VeChain,80985041177.0000000000000000,86712634466.0000000000000000,2482537378.1382530007747611,26100521.9866755304213035,0.0306542707401043,2.8438212181689620,0.0292774235807196,https://explore.veforge.com/,2024-11-18,03:44:52.621000
3,filecoin,31,FIL,Filecoin,599550473.0000000000000000,,2717508805.8615097728799578,149229169.1296065367921374,4.5325772028229386,-0.6097653180955389,4.4805922026576955,https://protocol.ai,2024-11-18,03:44:52.621000
4,hedera-hashgraph,41,HBAR,Hedera Hashgraph,14832756028.0000000000000000,50000000000.0000000000000000,1686371552.3987041282124228,234010649.5535554744921912,0.1136923946713151,33.1124464870681606,0.0907561887154486,https://hash-hash.info/,2024-11-18,03:44:52.621000
...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
95,ethereum-classic,27,ETC,Ethereum Classic,149591222.2698405000000000,210700000.0000000000000000,3948159510.2899322748189396,229719378.8097647616414926,26.3929891766512535,-5.0508351070863081,26.6260894123707650,http://gastracker.io/,2024-11-18,03:44:52.621000
96,crypto-com-coin,25,CRO,Crypto.com Coin,25263013692.0000000000000000,30263013692.0000000000000000,4524442411.4952669919759356,23388765.7280004263784638,0.1790935343920593,13.0357487363660559,0.1632910425849030,https://etherscan.io/token/0xa0b73e1ff0b80914a...,2024-11-18,03:44:52.621000
97,monero,29,XMR,Monero,18446744.0737095500000000,,2958034939.0843445590990696,43650373.1742213480570341,160.3553953621636711,7.9043432902066693,152.6544189556331546,http://moneroblocks.info/,2024-11-18,03:44:52.621000
98,gnosis-gno,71,GNO,Gnosis,2589588.0000000000000000,3000000.0000000000000000,669253776.4190775537081468,4083631.0061882470921876,258.4402524336217011,-0.6876683495844674,255.6680484881150722,https://etherscan.io/token/Gnosis,2024-11-18,03:44:52.621000


### markets
https://api.coincap.io/v2/markets

In [144]:

save_new_data_as_delta(
    df_markets,
    markets_raw_dir,
    """target.date = source.date""",
    partition_cols=["date"]
    )

In [145]:
canRowTwo = DeltaTable(markets_raw_dir)
print(f"Cant de filas: {canRowTwo.to_pandas().shape[0]}")

Cant de filas: 100


In [146]:
DeltaTable(markets_raw_dir).to_pandas()


Unnamed: 0,exchangeId,rank,baseSymbol,baseId,quoteSymbol,quoteId,priceQuote,priceUsd,volumeUsd24Hr,percentExchangeVolume,tradesCount24Hr,updated,date,time
0,alterdice,1,BTC,bitcoin,USDT,tether,90493.5800000000000000,90491.1997356390204594,33915994.9717930165497698,100.0000000000000000,7,1731901354347,2024-11-18,03:43:38.954000
1,bibox,1,ETH,ethereum,USDT,tether,3104.8400000000000000,3104.7583329911520384,28882723.8299292803158661,23.3777260266278375,,1731901285184,2024-11-18,03:43:38.954000
2,bibox,2,LINK,chainlink,USDT,tether,14.2650000000000000,14.2646247858565285,13486742.9291120003610098,10.9161927747834771,,1731901098057,2024-11-18,03:43:38.954000
3,bibox,3,BTC,bitcoin,USDT,tether,90542.5000000000000000,90540.1184488899213618,6619736.4581741334791749,5.3580260019421284,,1731901270152,2024-11-18,03:43:38.954000
4,bibox,4,BNB,binance-coin,USDT,tether,626.0800000000000000,626.0635321366319901,5501976.3538123467077092,4.4533090632928981,,1731893357144,2024-11-18,03:43:38.954000
...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
95,bibox,95,COMP,compound,USDT,tether,50.9800000000000000,50.9786590664539657,98973.3129116484884251,0.0801091686095969,,1731900793438,2024-11-18,03:43:38.954000
96,bibox,96,ALGO,algorand,USDC,usd-coin,0.1910000000000000,0.1909546324229206,95641.1571956254842941,0.0774121160786120,,1731901088017,2024-11-18,03:43:38.954000
97,bibox,97,FEI,fei-protocol,USDT,tether,0.9778000000000000,0.9777742807998958,92663.8837017479005621,0.0750023058247021,,1731893360925,2024-11-18,03:43:38.954000
98,bibox,98,MXC,mxc,USDT,tether,0.0053000000000000,0.0052998605934132,85479.0422475346855024,0.0691868828732385,,1731901070425,2024-11-18,03:43:38.954000
