In [27]:
import requests
import pandas as pd
import psycopg2
import json

# Configure database connection
DB_CONFIG = {
    "host": "127.0.0.1",
    "database": "crypto_db",
    "user": "postgres",
    "password": "123456"
}


def fetch_crypto_data():
    """
    Fetch data from CoinGecko API, perform quality checks, normalize data, and save to database.
    If the quality check fails, the data is still normalized and saved with the status 'failed_quality_check'.
    """
    print("🔄 Fetching data from CoinGecko API...")
    
    url = "https://api.coingecko.com/api/v3/coins/markets"
    params = {
        "vs_currency": "usd",
        "order": "market_cap_desc",
        "per_page": 250,
        "page": 1,
        "sparkline": "false"
    }

    try:
        response = requests.get(url, params=params)
        response.raise_for_status()
        df = pd.DataFrame(response.json())
        print(f"✅ Fetched {len(df)} rows from API.")

        if run_quality_checks(df):
            print("✅ Data passed quality checks.")
            quality_status = "passed_quality_check"
        else:
            print("⚠️ Data failed quality checks.")
            quality_status = "failed_quality_check" 

        print("🔄 Proceeding to data normalization...")
        df_normalized = normalize_data(df)
        print("✅ Data normalization complete.")

        save_to_db(df_normalized, status=quality_status)
        print(f"✅ Data saved to database with status: {quality_status}.")
        
        return df_normalized

    except requests.exceptions.RequestException as e:
        print(f"❌ Error fetching data from API: {e}")
        return None
    except Exception as e:
        print(f"❌ Unexpected error: {e}")
        return None


def save_to_db(df, status=None):
    """
    Save data to PostgreSQL, updating if the coin already exists.
    """
    try:
        if 'roi' in df.columns:
            df['roi'] = df['roi'].apply(lambda x: json.dumps(x) if isinstance(x, dict) else None)
        
        conn = psycopg2.connect(**DB_CONFIG)
        cursor = conn.cursor()
        print(f"🔄 Saving data to the database... Status: {status}")
        
        for _, row in df.iterrows():
            try:
                cursor.execute("""
                    INSERT INTO crypto_data (
                        id, symbol, name, image, current_price, market_cap, market_cap_rank,
                        fully_diluted_valuation, total_volume, high_24h, low_24h,
                        price_change_24h, price_change_percentage_24h, market_cap_change_24h,
                        market_cap_change_percentage_24h, circulating_supply, total_supply,
                        max_supply, ath, ath_change_percentage, ath_date, atl,
                        atl_change_percentage, atl_date, roi, last_updated
                    ) VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s)
                    ON CONFLICT (id) DO UPDATE 
                    SET current_price = EXCLUDED.current_price,
                        market_cap = EXCLUDED.market_cap,
                        total_volume = EXCLUDED.total_volume,
                        price_change_percentage_24h = EXCLUDED.price_change_percentage_24h
                """, tuple(row[col] for col in df.columns))
            except Exception as e:
                print(f"❌ Failed to insert row: {row['id']} - Error: {e}")
                conn.rollback()  
            else:
                conn.commit()  

        print("✅ Data saved successfully!")
    except Exception as e:
        print(f"❌ Failed to save data to database: {e}")
    finally:
        if conn:
            cursor.close()
            conn.close()

def normalize_data(df: pd.DataFrame) -> pd.DataFrame:
    """
    Normalize data from the DataFrame:
    - Remove rows with missing values in important columns.
    - Fill missing values in less important columns.
    - Remove unreasonable values.
    - Normalize data types if necessary.
    - Handle date-time formatting.
    """
    print("🔄 Starting data normalization...")

    # 1. Remove rows with missing values in important columns
    important_columns = ["current_price", "market_cap", "total_volume"]
    print(f"📊 Dropping rows with missing values in important columns: {important_columns}")
    df = df.dropna(subset=important_columns)
    print(f"✅ Remaining rows after dropping: {len(df)}")

    # 2. Handle missing values in less important columns
    print("\n📊 Handling missing values in less important columns...")
    less_important_columns = {
        "max_supply": 0,  
        "roi": None     
    }
    for col, fill_value in less_important_columns.items():
        if col in df.columns:
            print(f"  - Filling missing values in column '{col}' with: {fill_value}")
            if fill_value is None:
                df[col] = df[col].apply(lambda x: x if pd.notnull(x) else None)
            else:
                df[col] = df[col].fillna(fill_value)

    # 3. Remove unreasonable values
    print("\n📊 Removing unreasonable values...")
    if "current_price" in df.columns:
        initial_rows = len(df)
        df = df[(df["current_price"] <= 1e6) & (df["current_price"] >= 0.000001)]
        print(f"✅ Removed {initial_rows - len(df)} rows with unreasonable 'current_price' values.")
    if "market_cap" in df.columns:
        initial_rows = len(df)
        df = df[df["market_cap"] >= 0]  
        print(f"✅ Removed {initial_rows - len(df)} rows with negative 'market_cap' values.")

    # 4. Normalize data types
    print("\n📊 Normalizing data types...")
    expected_types = {
        "id": str,
        "symbol": str,
        "name": str,
        "image": str,
        "current_price": float,
        "market_cap": float,
        "market_cap_rank": float,
        "fully_diluted_valuation": float,
        "total_volume": float,
        "high_24h": float,
        "low_24h": float,
        "price_change_24h": float,
        "price_change_percentage_24h": float,
        "market_cap_change_24h": float,
        "market_cap_change_percentage_24h": float,
        "circulating_supply": float,
        "total_supply": float,
        "max_supply": float,
        "ath": float,
        "ath_change_percentage": float,
        "ath_date": str,
        "atl": float,
        "atl_change_percentage": float,
        "atl_date": str,
        "roi": dict,
        "last_updated": str
    }
    for col, expected_type in expected_types.items():
        if col in df.columns:
            try:
                if expected_type == float:
                    df[col] = pd.to_numeric(df[col], errors='coerce')  
                elif expected_type == str:
                    df[col] = df[col].astype(str)  
                elif expected_type == dict:
                    df[col] = df[col].apply(lambda x: x if isinstance(x, dict) else None)  
                print(f"✅ Normalized column '{col}' to type {expected_type}.")
            except Exception as e:
                print(f"⚠️ Error normalizing column '{col}': {e}")

    # 5. Handle date-time formatting
    print("\n📊 Normalizing date formats...")
    date_columns = ["ath_date", "atl_date", "last_updated"]
    for col in date_columns:
        if col in df.columns:
            try:
                df[col] = pd.to_datetime(df[col], errors='coerce')  
                print(f"✅ Normalized column '{col}' to datetime format.")
            except Exception as e:
                print(f"⚠️ Error normalizing date column '{col}': {e}")

    # 6. Post-normalization checks
    print("\n🔍 Running post-normalization checks...")
    total_nulls = df.isnull().sum().sum()
    if total_nulls > 0:
        print(f"⚠️ Data still contains {total_nulls} missing values after normalization.")
    else:
        print("✅ No missing values detected after normalization.")

    print(f"✅ Normalization complete. Final row count: {len(df)}")
    return df

def run_quality_checks(df: pd.DataFrame):
    """
    Perform data quality checks and print detailed information.
    """
    print("🔍 Running Data Quality Checks...")
    passed = True  

    # 1. Check for missing values across the entire DataFrame
    print("📊 Checking for missing values (nulls)...")
    null_summary = df.isnull().sum()
    total_nulls = null_summary.sum()
    if total_nulls > 0:
        print(f"⚠️ Total missing values across all columns: {total_nulls}")
        print("🔎 Missing values by column:")
        print(null_summary[null_summary > 0])
        passed = False
    else:
        print("✅ No missing values detected.")

    # 2. Check for negative values in numeric columns
    print("\n📊 Checking for negative values in numeric columns...")
    numeric_columns = ["current_price", "market_cap", "total_volume", "ath", "atl"]
    for col in numeric_columns:
        if col in df.columns:
            negative_count = (df[col] < 0).sum()
            if negative_count > 0:
                print(f"❌ Column '{col}' has {negative_count} negative values!")
                passed = False
            else:
                print(f"✅ Column '{col}' has no negative values.")

    # 3. Check for duplicate IDs
    print("\n📊 Checking for duplicate IDs...")
    duplicate_count = df.duplicated(subset=["id"]).sum()
    if duplicate_count > 0:
        print(f"⚠️ Found {duplicate_count} duplicate IDs.")
        passed = False
    else:
        print("✅ No duplicate IDs detected.")

    # 4. Check for extreme price changes
    print("\n📊 Checking for extreme price changes...")
    if "price_change_percentage_24h" in df.columns:
        extreme_changes = (df["price_change_percentage_24h"].abs() > 1000).sum()
        if extreme_changes > 0:
            print(f"⚠️ Found {extreme_changes} extreme price change(s) (>1000%).")
            passed = False
        else:
            print("✅ No extreme price changes detected.")

    # 5. Check for invalid data types
    print("\n📊 Checking for invalid data types...")
    expected_types = {
        "id": str,
        "symbol": str,
        "name": str,
        "image": str,
        "current_price": (float, int),
        "market_cap": (float, int),
        "market_cap_rank": (float, int),
        "fully_diluted_valuation": (float, int),
        "total_volume": (float, int),
        "high_24h": (float, int),
        "low_24h": (float, int),
        "price_change_24h": (float, int),
        "price_change_percentage_24h": (float, int),
        "market_cap_change_24h": (float, int),
        "market_cap_change_percentage_24h": (float, int),
        "circulating_supply": (float, int),
        "total_supply": (float, int),
        "max_supply": (float, int),
        "ath": (float, int),
        "ath_change_percentage": (float, int),
        "ath_date": str,
        "atl": (float, int),
        "atl_change_percentage": (float, int),
        "atl_date": str,
        "roi": (dict, type(None)), 
        "last_updated": str
    }
    for col, expected_type in expected_types.items():
        if col in df.columns:
            invalid_type_count = (~df[col].apply(lambda x: isinstance(x, expected_type))).sum()
            if invalid_type_count > 0:
                print(f"❌ Column '{col}' has {invalid_type_count} invalid data type(s).")
                passed = False
            else:
                print(f"✅ Column '{col}' has valid data types.")

    # 6. Check for unreasonable values (e.g., extremely large or small values)
    print("\n📊 Checking for unreasonable values...")
    if "current_price" in df.columns:
        unreasonable_prices = ((df["current_price"] > 1e6) | (df["current_price"] < 0.000001)).sum()
        if unreasonable_prices > 0:
            print(f"⚠️ Found {unreasonable_prices} unreasonable 'current_price' values.")
            passed = False
        else:
            print("✅ All 'current_price' values are within a reasonable range.")

    # 7. Check for invalid date formats
    print("\n📊 Checking for invalid date formats...")
    date_columns = ["ath_date", "atl_date", "last_updated"]
    for col in date_columns:
        if col in df.columns:
            try:
                pd.to_datetime(df[col], errors='coerce')  
                print(f"✅ Column '{col}' has valid date formats.")
            except Exception as e:
                print(f"❌ Column '{col}' has invalid date formats. Error: {e}")
                passed = False

    # Check results
    print("\n🔍 Data Quality Check Results:")
    if passed:
        print("✅ All checks passed successfully!")
    else:
        print("❌ One or more checks failed. Please review the issues above.")

    return passed

if __name__ == "__main__":
    fetch_crypto_data()


🔄 Fetching data from CoinGecko API...
✅ Fetched 250 rows from API.
🔍 Running Data Quality Checks...
📊 Checking for missing values (nulls)...
⚠️ Total missing values across all columns: 342
🔎 Missing values by column:
high_24h                              1
low_24h                               1
price_change_24h                      1
price_change_percentage_24h           1
market_cap_change_24h                 1
market_cap_change_percentage_24h      1
max_supply                          118
roi                                 218
dtype: int64

📊 Checking for negative values in numeric columns...
✅ Column 'current_price' has no negative values.
✅ Column 'market_cap' has no negative values.
✅ Column 'total_volume' has no negative values.
✅ Column 'ath' has no negative values.
✅ Column 'atl' has no negative values.

📊 Checking for duplicate IDs...
✅ No duplicate IDs detected.

📊 Checking for extreme price changes...
✅ No extreme price changes detected.

📊 Checking for invalid data types.