In [1]:
# flows/bitcoin_flow.py
from prefect import flow, task
import requests
import boto3
import json
from decimal import Decimal
from datetime import datetime, UTC
import time

In [18]:
url = "https://api.coingecko.com/api/v3/simple/price"
params = {
    'ids': 'bitcoin,ethereum,tether,binancecoin,solana,ripple,usd-coin,cardano,dogecoin,tron',
    'vs_currencies': 'eur',
    'include_24hr_change': 'false',
    'include_24hr_vol': 'false',
    'include_market_cap': 'false'
}

response = requests.get(url, params=params)

In [None]:
data_flatten = {key: item['eur'] for key, item in data.items()}

{'bitcoin': 99546, 'cardano': 0.624768, 'ethereum': 3613.61}

In [19]:
@task
def transform_data(raw_data):
    """Add timestamp and format data"""
    return {
       'timestamp': datetime.now(UTC).isoformat(),
       **{key: item['eur'] for key, item in raw_data.items()}
    }

In [21]:
data_

{'timestamp': '2025-10-13T07:51:09.888416+00:00',
 'bitcoin': 99334,
 'cardano': 0.619527,
 'ethereum': 3593.53}

In [20]:
data = fetch_bitcoin_data()
data_ = transform_data(data)

In [9]:
data_

{'timestamp': '2025-10-13T07:41:35.420753+00:00',
 'bitcoin': {'bitcoin': {'eur': 99546},
  'cardano': {'eur': 0.624768},
  'ethereum': {'eur': 3613.61}}}

In [None]:
@task(retries=3, retry_delay_seconds=10)
def fetch_bitcoin_data():
    """Fetch Bitcoin price data"""
    url = "https://api.coingecko.com/api/v3/simple/price"
    params = {
        'ids': 'bitcoin,ethereum,cardano',
        'vs_currencies': 'eur',
        'include_24hr_change': 'false',
        'include_24hr_vol': 'false',
        'include_market_cap': 'false'
    }

    response = requests.get(url, params=params)
    response.raise_for_status()
    return response.json()

@task
def transform_data(raw_data):
    """Add timestamp and format data"""
    return {
       'timestamp': datetime.now(UTC).isoformat(),
       **{key: item['eur'] for key, item in raw_data.items()}
    }

@task
def save_to_dynamodb(data, table_name='crypto-prices'):
    """Save to DynamoDB - all cryptos in one item per timestamp"""
    from decimal import Decimal
    import time
    
    dynamodb = boto3.resource('dynamodb', region_name='eu-west-1')
    table = dynamodb.Table(table_name)
    
    timestamp = data['timestamp']
    ttl = int(time.time()) + (180 * 24 * 3600)  # Expire in 1800 days
    
    item = {
        'PK': 'CRYPTO_PRICES',  # Fixed partition key
        'timestamp': timestamp,  # Sort key
        'ttl': ttl
    }
    
    # Add all crypto prices as separate attributes
    for key, value in data.items():
        if key == 'timestamp':
            continue
        item[key] = Decimal(str(value))
    
    table.put_item(Item=item)
    
    return 1 

@flow(name="Bitcoin Price Tracker")
def bitcoin_tracking_flow():
    """Main flow to track Bitcoin prices"""
    raw_data = fetch_bitcoin_data()
    transformed = transform_data(raw_data)
    dynamodb_path = save_to_dynamodb(transformed)

    print(f"Saved Bitcoin data to {dynamodb_path}")



In [None]:
if __name__ == "__main__":
    bitcoin_tracking_flow()

In [None]:
from prefect import flow, task
import requests
import boto3
from datetime import datetime
from decimal import Decimal

@task
def save_to_dynamodb(data, table_name='crypto-prices'):
    """Save to DynamoDB - designed for time-series"""
    dynamodb = boto3.resource('dynamodb')
    table = dynamodb.Table(table_name)
    
    # Write each crypto as separate item
    with table.batch_writer() as batch:
        for crypto, values in data.items():
            batch.put_item(Item={
                'crypto': crypto,
                'timestamp': timestamp,
                'price_eur': Decimal(str(values['eur'])),
                'ttl': int(datetime.utcnow().timestamp()) + (90 * 24 * 3600)  # Auto-delete after 90 days
            })
    
    return len(data)

@flow
def scrape_flow():
    data = fetch_crypto_prices()
    count = save_to_dynamodb(data)
    print(f"✅ Saved {count} prices to DynamoDB")