### Crypto Data API pipeline
- Get data from CoinGecko API
- Transform and get the data we need from the whole JSON set
- Load into a db that isn't on port 5432, change in postgresql.conf file where port = 5432 to port = 8000 or sth else


#### 1. Extract data from API

In [9]:
import os
import requests
import pandas as pd
from dotenv import load_dotenv
from sqlalchemy import create_engine

load_dotenv()
API_KEY = os.getenv("API_KEY")
DB_URL = os.getenv("DB_URL")
print(DB_URL)

postgresql+psycopg2://avnadmin:AVNS_6R6wTVPJlYNzgjqLkEW@pg-18b913ee-denzel-kinyua.h.aivencloud.com:12287/defaultdb


In [10]:

coin_list = ['cardano', 'polkadot', 'chainlink', 'litecoin', 'uniswap', 'stellar', 'aptos', 'ripple', 'avalanche-2']
name_list = []
price_list = []
cap_list = []
volume_list = []
time_list = []

for coin in coin_list: # loop thru the list of coins and extract its data
    url = f"https://api.coingecko.com/api/v3/coins/{coin}?localization=false&tickers=false&community_data=false&developer_data=false&sparkline=false&dex_pair_format=symbol"

    headers = {
        "accept": "application/json",
        "x-cg-demo-api-key": API_KEY
    }

    response = requests.get(url, headers=headers)
    if response.status_code == 200:
        data = response.json()

        name_list.append(data["name"])
        price_list.append(data["market_data"]['current_price']['usd'])
        cap_list.append(data['market_data']['market_cap']['usd'])
        volume_list.append(data['market_data']['total_volume']['usd'])
        time_list.append(data['market_data']['last_updated'])

        coin_data = {
            'name': name_list,
            'current_price': price_list,
            'market_cap': cap_list,
            'total_volume': volume_list,
            'time': time_list
        }
    else:
        print(f"Requests error: {response.status_code}, {response.text}")

coin_data

{'name': ['Cardano',
  'Polkadot',
  'Chainlink',
  'Litecoin',
  'Uniswap',
  'Stellar',
  'Aptos',
  'XRP',
  'Avalanche'],
 'current_price': [0.704871,
  4.19,
  14.34,
  90.0,
  6.52,
  0.274635,
  5.03,
  2.19,
  21.67],
 'market_cap': [25414639055,
  6387555350,
  9421339764,
  6831911884,
  3913665809,
  8555281645,
  3176031001,
  128328237913,
  9127929480],
 'total_volume': [890939787,
  298610449,
  657344317,
  604891690,
  644872762,
  245542771,
  279236667,
  3271384364,
  628965132],
 'time': ['2025-05-30T15:23:23.465Z',
  '2025-05-30T15:23:32.737Z',
  '2025-05-30T15:23:29.342Z',
  '2025-05-30T15:23:32.622Z',
  '2025-05-30T15:23:37.277Z',
  '2025-05-30T15:23:33.212Z',
  '2025-05-30T15:23:36.479Z',
  '2025-05-30T15:23:35.728Z',
  '2025-05-30T15:23:34.293Z']}

In [11]:
df = pd.DataFrame(coin_data)
df.head()

Unnamed: 0,name,current_price,market_cap,total_volume,time
0,Cardano,0.704871,25414639055,890939787,2025-05-30T15:23:23.465Z
1,Polkadot,4.19,6387555350,298610449,2025-05-30T15:23:32.737Z
2,Chainlink,14.34,9421339764,657344317,2025-05-30T15:23:29.342Z
3,Litecoin,90.0,6831911884,604891690,2025-05-30T15:23:32.622Z
4,Uniswap,6.52,3913665809,644872762,2025-05-30T15:23:37.277Z


In [12]:
# let's set time column as a pd.to_datetime obj
df['time'] = pd.to_datetime(df['time'], utc=True)
df.head()

Unnamed: 0,name,current_price,market_cap,total_volume,time
0,Cardano,0.704871,25414639055,890939787,2025-05-30 15:23:23.465000+00:00
1,Polkadot,4.19,6387555350,298610449,2025-05-30 15:23:32.737000+00:00
2,Chainlink,14.34,9421339764,657344317,2025-05-30 15:23:29.342000+00:00
3,Litecoin,90.0,6831911884,604891690,2025-05-30 15:23:32.622000+00:00
4,Uniswap,6.52,3913665809,644872762,2025-05-30 15:23:37.277000+00:00


In [13]:
# set the time column as the index for time-series analysis
#f.set_index('time', inplace=True)
df.head()

Unnamed: 0,name,current_price,market_cap,total_volume,time
0,Cardano,0.704871,25414639055,890939787,2025-05-30 15:23:23.465000+00:00
1,Polkadot,4.19,6387555350,298610449,2025-05-30 15:23:32.737000+00:00
2,Chainlink,14.34,9421339764,657344317,2025-05-30 15:23:29.342000+00:00
3,Litecoin,90.0,6831911884,604891690,2025-05-30 15:23:32.622000+00:00
4,Uniswap,6.52,3913665809,644872762,2025-05-30 15:23:37.277000+00:00


In [14]:
# checking the columns dtype
df.dtypes

name                          object
current_price                float64
market_cap                     int64
total_volume                   int64
time             datetime64[ns, UTC]
dtype: object

In [None]:
# the dataset is clean, loading it into a mock table in the public schema
engine = create_engine(DB_URL)
try:
    df.to_sql(name='mock_crypto_data', con=engine, schema='public', index=False)
    print("Data loaded successfully")
except Exception as e:
    print(f"Loading data: {e}")


Loading data: 'Engine' object has no attribute 'cursor'


  df.to_sql(name='mock_crypto_data', con=engine, schema='public')


### Test ETL complete.
 Airflow can now be initialized using this info and hourly DAG for this ETL pipeline be set up.
 Tasks are:
- extract_data: extracting data from API by looping through the list of coin IDs
- transform_data: cleans data by setting time into a pd.to_datetime obj for time series analysis, setting time as the index and checking for column datatypes
- load_to_db: loads the transformed dataframe into a Postgres DB whose port != 5432. Change this in the postgresql.conf file manually.
