In [21]:
import requests
import schedule
import time
import pandas as pd
from datetime import datetime

# Fetching data from CoinGecko API
def fetch_data():
    tickers = ["bitcoin", "ethereum", "zcash"]
    url = f"https://api.coingecko.com/api/v3/simple/price?ids={','.join(tickers)}&vs_currencies=usd&include_24hr_vol=true"

    try:
        response = requests.get(url)
        data = response.json()
        return data
    except Exception as e:
        print(f"Error fetching data: {e}")
        return None

# DataFrame para armazenar os dados
df = pd.DataFrame(columns=['ticker', 'price', 'volume', 'other_data'])

# Ingest data and run monitoring
def data_ingestion(df):
    data = fetch_data()
    if data:
        new_rows = []
        for ticker, values in data.items():
            price = values['usd']
            volume = values.get('usd_24h_vol', 0)
            other_data = {'usd_24h_vol': volume}
            # Criar um dicionário com os novos dados
            new_rows = pd.DataFrame({'ticker': ticker, 'price': price, 'volume': volume, 'other_data': other_data})
        # Concatenar os novos dados ao DataFrame existente
        if df.empty:
            df = new_rows
        else:
            df = pd.concat([df, new_rows], ignore_index=True)
    return df

# Executar a função uma vez antes de iniciar o agendamento
df = data_ingestion(df)

# Schedule data ingestion every second
schedule.every(1).second.do(data_ingestion, df)

while True:
    try:
        schedule.run_pending()
        time.sleep(1)
    except KeyboardInterrupt:
        print("Exiting...")
        break

AttributeError: 'DataFrame' object has no attribute 'append'

In [36]:
import requests
import schedule
import time
import psycopg2
from datetime import datetime, timedelta
from statistics import mean  # Import mean function
import pandas as pd
import numpy as np
# Configurações de conexão com o banco de dados
conn = psycopg2.connect(
    dbname="luxor",
    user="postgres",
    password="s0lutus#21",
    host="localhost",
    port="5432"
)
cur = conn.cursor()

# Fetching data from CoinGecko API
def fetch_data():
    tickers = ["bitcoin", "ethereum", "zcash"]
    url = f"https://api.coingecko.com/api/v3/simple/price?ids={','.join(tickers)}&vs_currencies=usd&include_24hr_vol=true"

    try:
        response = requests.get(url)
        data = response.json()
        return data
    except Exception as e:
        print(f"Error fetching data: {e}")
        return None

# Função para inserir dados na tabela tickers_data
def insert_data(conn, cur, ticker, price, volume, timestamp):
    try:
        insert_query = """
        INSERT INTO public.tickers_data (ticker, price, volume, timestamp)
        VALUES (%s, %s, %s, %s)
        """
        cur.execute(insert_query, (ticker, price, volume, timestamp))
        conn.commit()
    except Exception as e:
        print(f"Error inserting data: {e}")

# Monitoramento de mudanças e atualização
def monitor_changes(avg_data, data, timestamp):
    for ticker, values in data.items():
           
            price = values['usd']
            volume = values.get('usd_24h_vol', 0)
            prev_price = mean(float(avg_data[ticker]["price"]))
            prev_volume = mean(float(avg_data[ticker]["volume"]))
            price_change = np.divide((price - prev_price), prev_price) * 100 if prev_price != 0 else 0
            volume_change = np.divide((volume - prev_volume), prev_volume) * 100 if prev_volume != 0 else 0
            
            if price_change > 2 or volume_change > 2:
                with open("alerts.txt", "a") as alert_file:
                    alert_file.write(f"{datetime.now()}: Significant change detected for {ticker}. "
                                    f"Price change: {price_change:.2f}%, Volume change: {volume_change:.2f}%\n")

    # Filtrar registros com mais de 5 minutos
            five_minutes_ago = timestamp- timedelta(minutes=5)
            avg_data[ticker]['price'] = [p for p, t in zip(avg_data[ticker]['prices'], avg_data[ticker]['timestamps']) if datetime.fromisoformat(t) > five_minutes_ago]
            avg_data[ticker]['volume'] = [v for v, t in zip(avg_data[ticker]['volumes'], avg_data[ticker]['timestamps']) if datetime.fromisoformat(t) > five_minutes_ago]
            avg_data[ticker]['timestamp'] = [t for t in avg_data[ticker]['timestamps'] if datetime.fromisoformat(t) > five_minutes_ago]
            
            # Adicionar novos valores
            avg_data[ticker]['price'].append(price)
            avg_data[ticker]['volume'].append(volume)
            avg_data[ticker]['timestamp'].append(timestamp)

    return avg_data

# Ingest data and run monitoring
def data_ingestion(conn, cur, avg_data):
    print("stating")
    data = fetch_data()
    print(data)
    if data is not None:
        for ticker, values in data.items():
            price = values['usd']
            volume = values.get('usd_24h_vol', 0)
            timestamp = datetime.now().isoformat()
            print("inserting")
            insert_data(conn, cur, ticker, price, volume, timestamp)
        avg_data = get_5min_avg(cur)
        # Run monitoring for changes and update avg_data
        avg_data = monitor_changes(avg_data, data, timestamp)
    
    

# Função para buscar dados do banco de dados
def get_5min_avg(cur):
    cur.execute("""
        SELECT price, volume, ticker, timestamp
        FROM tickers_data
        WHERE timestamp > %s;
        """, (datetime.now() - timedelta(minutes=50),))
    rows = cur.fetchall()

    avg_data = {}
    for price, volume, ticker, timestamp in rows:
        if ticker not in avg_data:
            avg_data[ticker] = {"price": [], "volume": [], "timestamp": []}
        avg_data[ticker]["price"].append(price)
        avg_data[ticker]["volume"].append(volume)
        avg_data[ticker]["timestamp"].append(timestamp)

    return avg_data

avg_data = get_5min_avg(cur)

# Schedule data ingestion every minute
schedule.every(59).seconds.do( data_ingestion(conn, cur, avg_data))

while True:
    try:
        schedule.run_pending()
        time.sleep(1)  # Add sleep to avoid busy waiting
        
    except KeyboardInterrupt:
        print("Exiting...")
        break

# Close Connection
cur.close()
conn.close()


stating
{'bitcoin': {'usd': 61863, 'usd_24h_vol': 28577244418.276703}, 'ethereum': {'usd': 2398.63, 'usd_24h_vol': 13509473680.178377}, 'zcash': {'usd': 26.72, 'usd_24h_vol': 66657522.940430686}}
inserting
inserting
inserting


TypeError: float() argument must be a string or a real number, not 'list'

In [None]:
# Schedule data ingestion every second
schedule.every(60).seconds.do(data_ingestion)

while True:
    schedule.run_pending()
    time.sleep(60)

# Clean up
cur.close()
conn.close()