In [1]:
# API interactions and data handling
import requests
import pandas as pd
import numpy as np
import json
from datetime import datetime, timedelta

# Database connections
import psycopg2
from sqlalchemy import create_engine

# Visualization (helpful during development)
import matplotlib.pyplot as plt
import seaborn as sns

In [2]:
def fetch_coinbase_prices(crypto_pairs, timeframe='1d', limit=300):
    results = {}
    base_url = "https://api.exchange.coinbase.com"
    
    for pair in crypto_pairs:
        endpoint = f"/products/{pair}/candles"
        params = {
            'granularity': timeframe_to_seconds(timeframe),
            'limit': limit
        }
        
        response = requests.get(f"{base_url}{endpoint}", params=params)
        if response.status_code == 200:
            # Coinbase returns [timestamp, low, high, open, close, volume]
            data = response.json()
            df = pd.DataFrame(data, columns=['timestamp', 'low', 'high', 'open', 'close', 'volume'])
            df['timestamp'] = pd.to_datetime(df['timestamp'], unit='s')
            df['trading_pair'] = pair
            results[pair] = df
        else:
            print(f"Error fetching data for {pair}: {response.status_code}, {response.text}")
    
    return results

def timeframe_to_seconds(timeframe):
    unit = timeframe[-1]
    value = int(timeframe[:-1])
    
    if unit == 'm':
        return value * 60
    elif unit == 'h':
        return value * 60 * 60
    elif unit == 'd':
        return value * 60 * 60 * 24
    else:
        raise ValueError(f"Unsupported timeframe: {timeframe}")

In [3]:
def transform_price_data(price_data_dict):
    # Combine all pairs into a single dataframe
    combined_df = pd.concat(price_data_dict.values(), ignore_index=True)
    
    # Add derived metrics
    combined_df['date'] = combined_df['timestamp'].dt.date
    combined_df['hour'] = combined_df['timestamp'].dt.hour
    combined_df['price_change'] = combined_df['close'] - combined_df['open']
    combined_df['price_change_pct'] = (combined_df['price_change'] / combined_df['open']) * 100
    combined_df['volatility'] = combined_df['high'] - combined_df['low']
    
    # Extract asset from trading pair
    combined_df['base_asset'] = combined_df['trading_pair'].apply(lambda x: x.split('-')[0])
    combined_df['quote_asset'] = combined_df['trading_pair'].apply(lambda x: x.split('-')[1])
    
    return combined_df

In [6]:
def load_to_postgres(df, table_name, connection_string):
    engine = create_engine(connection_string)
    
    # Create table if it doesn't exist
    # In production, you'd typically handle schema separately
    df.to_sql(table_name, engine, if_exists='append', index=False, schema='nafuu_ke')
    
    print(f"Loaded {len(df)} rows to {table_name}")

In [7]:
# Configuration
crypto_pairs = ["BTC-USD", "ETH-USD", "SOL-USD"]
pg_connection = "postgresql://postgres:1234@localhost:5432/products"

# Extract
price_data = fetch_coinbase_prices(crypto_pairs)

# Transform
transformed_prices = transform_price_data(price_data)

# Load
load_to_postgres(transformed_prices, "crypto_prices", pg_connection)


Loaded 1050 rows to crypto_prices


![image.png](attachment:image.png)