In [89]:
import requests
import pandas as pd
from sqlalchemy import create_engine
from datetime import datetime
import psycopg2
import time

In [90]:
BINANCE_API_URL = "https://api.binance.com/api/v3/klines"
SYMBOLS = ["BTCUSDT"]  # You can add more pairs
limit=30
interval="1d"
# PostgreSQL connection string
DB_USER = "postgres"
DB_PASSWORD = "200903"
DB_HOST = "localhost"
DB_PORT = "5432"
DB_NAME = "crypto_data"

log_file = "./code_log.txt"
output_path="./crypto_etl_pipeline_data.csv"
table_name='crypto_prices'

In [91]:
# Create SQLAlchemy engine
engine = create_engine(f"postgresql://{DB_USER}:{DB_PASSWORD}@{DB_HOST}:{DB_PORT}/{DB_NAME}")


In [92]:
def log_progress(message):
    ''' This function logs the mentioned message of a given stage of the
    code execution to a log file. Function returns nothing'''

    timestamp_format = "%Y-%h-%d-%H:%M:%S" # Year-Monthname-Day-Hour-Minute-Second
    now = datetime.now() # get current timestamp
    timestamp = now.strftime(timestamp_format)
    with open(log_file,"a") as f:
        f.write(timestamp + " : " + message + "\n")

In [93]:
def extract_binance_data(url,SYMBOLS,limit,interval):
    try:
        params = {
            "symbol": SYMBOLS,
            "interval": interval,
            "limit": limit
        }

        response = requests.get(url, params=params)
        response.raise_for_status()  # kiểm tra lỗi HTTP

        data = response.json()

        columns = [
            "Open Time", "Open", "High", "Low", "Close", "Volume",
            "Close Time", "Quote Asset Volume", "Number of Trades",
            "Taker Buy Base Asset Volume", "Taker Buy Quote Asset Volume", "Ignore"
        ]
        df = pd.DataFrame(data, columns=columns)

        # Chuyển đổi kiểu dữ liệu
        df["Open Time"] = pd.to_datetime(df["Open Time"], unit='ms')
        df["Close Time"] = pd.to_datetime(df["Close Time"], unit='ms')
        df[["Open", "High", "Low", "Close", "Volume"]] = df[["Open", "High", "Low", "Close", "Volume"]].astype(float)

        return df
    except Exception as e:
        print(f"❌ Error fetching Binance data: {e}")
        return pd.DataFrame()

In [94]:
def transform(df):
    df["Daily Change (%)"] = ((df["Close"] - df["Open"]) / df["Open"]) * 100
    df["Date"] = df["Close Time"].dt.date
    df = df[["Date", "Open", "High", "Low", "Close", "Volume", "Daily Change (%)"]]
    return df

In [95]:
def load_to_csv(df, output_path):
    df.to_csv(output_path)

In [96]:
def load_to_postgres(df,engine, table_name):
    """Load data into PostgreSQL database."""
    if df.empty:
        log_progress("No data to load.")
        return
    
    df.to_sql(table_name, engine, if_exists="replace", index=False)
    log_progress(f"Loaded {len(df)} rows into '{table_name}' table at")

In [97]:
def run_query(query_statement, sql_connection):
    ''' This function runs the query on the database table and
    prints the output on the terminal. Function returns nothing. '''

    print(query_statement)
    query_output = pd.read_sql(query_statement, sql_connection)
    print(query_output)

In [98]:
sql_connection = psycopg2.connect(database=DB_NAME, user=DB_USER, password=DB_PASSWORD, host=DB_HOST, port=DB_PORT)


In [99]:
log_progress("Preliminaries complete. Initiating ETL process")

    # 1. Extract
df = extract_binance_data(BINANCE_API_URL,SYMBOLS,limit,interval)
print("Extracted data:")
print(df)
log_progress("Data extraction complete. Initiating Transformation process")

# 2. Transform
print("Transformed data:")

print(df)

log_progress("Data transformation complete. Initiating Loading process")

load_to_csv(df, output_path)

log_progress("Data saved to CSV file")
    # 3. Load
load_to_postgres(df,engine,table_name)

log_progress("ETL pipeline completed at")

Extracted data:
    Open Time       Open       High        Low      Close       Volume  \
0  2025-10-15  113028.13  113612.35  110164.00  110763.28  22986.48811   
1  2025-10-16  110763.28  111982.45  107427.00  108194.28  29857.17252   
2  2025-10-17  108194.27  109240.00  103528.23  106431.68  37920.66838   
3  2025-10-18  106431.68  107499.00  106322.20  107185.01  11123.18766   
4  2025-10-19  107185.00  109450.07  106103.36  108642.78  15480.66423   
5  2025-10-20  108642.77  111705.56  107402.52  110532.09  19193.44160   
6  2025-10-21  110532.09  114000.00  107473.72  108297.67  37228.01659   
7  2025-10-22  108297.66  109163.88  106666.69  107567.44  28610.78451   
8  2025-10-23  107567.45  111293.61  107500.00  110078.18  17573.09294   
9  2025-10-24  110078.19  112104.98  109700.01  111004.89  15005.16913   
10 2025-10-25  111004.90  111943.19  110672.86  111646.27   6407.96864   
11 2025-10-26  111646.27  115466.80  111260.45  114559.40  13454.47737   
12 2025-10-27  114559.

In [100]:
load_to_postgres(df,engine,table_name)

log_progress("Data loaded to Database as a table, Executing queries")

In [101]:
query_statement = f"SELECT * from {table_name}"
run_query(query_statement, sql_connection)

SELECT * from crypto_prices
    Open Time       Open       High        Low      Close       Volume  \
0  2025-10-15  113028.13  113612.35  110164.00  110763.28  22986.48811   
1  2025-10-16  110763.28  111982.45  107427.00  108194.28  29857.17252   
2  2025-10-17  108194.27  109240.00  103528.23  106431.68  37920.66838   
3  2025-10-18  106431.68  107499.00  106322.20  107185.01  11123.18766   
4  2025-10-19  107185.00  109450.07  106103.36  108642.78  15480.66423   
5  2025-10-20  108642.77  111705.56  107402.52  110532.09  19193.44160   
6  2025-10-21  110532.09  114000.00  107473.72  108297.67  37228.01659   
7  2025-10-22  108297.66  109163.88  106666.69  107567.44  28610.78451   
8  2025-10-23  107567.45  111293.61  107500.00  110078.18  17573.09294   
9  2025-10-24  110078.19  112104.98  109700.01  111004.89  15005.16913   
10 2025-10-25  111004.90  111943.19  110672.86  111646.27   6407.96864   
11 2025-10-26  111646.27  115466.80  111260.45  114559.40  13454.47737   
12 2025-10

  query_output = pd.read_sql(query_statement, sql_connection)
