In [1]:
import logging
from logging.handlers import RotatingFileHandler
import os
import warnings
import time
from requests import Session
from requests.exceptions import ConnectionError, Timeout, TooManyRedirects
import json
import pandas as pd
import numpy as np
from sqlalchemy import create_engine, text
import traceback

In [2]:
# Wait a few seconds to ensure network is ready (important for Task Scheduler)
time.sleep(10)

# Ignore irrelevant NumExpr warnings
warnings.filterwarnings("ignore", message="NumExpr defaulting to")

# Use a neutral, system-wide accessible directory
BASE_DIR = r"C:\ETL\crypto"
LOG_DIR = os.path.join(BASE_DIR, "logs")

# Ensure folders exist
os.makedirs(BASE_DIR, exist_ok=True)
os.makedirs(LOG_DIR, exist_ok=True)

# Define consistent file paths
RAW_FILE = os.path.join(BASE_DIR, "crypto_raw.csv")
TRANSFORMED_FILE = os.path.join(BASE_DIR, "crypto_transformed.csv")
LOG_FILE = os.path.join(LOG_DIR, "crypto_pipeline.log")

# Logging configuration
if os.path.exists(LOG_FILE):
    try:
        os.remove(LOG_FILE)
    except Exception as e:
        print(f"Warning: Could not clear log file — {e}")

# Create a simple log file that refreshes each run
logging.basicConfig(
    filename=LOG_FILE,
    level=logging.INFO,
    format="%(asctime)s | %(levelname)s | %(message)s",
    datefmt="%Y-%m-%d %H:%M:%S",
    filemode='w'  # <-- this ensures overwrite mode
)

logging.info("ETL script started successfully")
logging.info(f"Base directory: {BASE_DIR}")


# Check write access before running
try:
    test_path = os.path.join(BASE_DIR, "write_test.txt")
    with open(test_path, "w") as f:
        f.write("Permission test OK")
    os.remove(test_path)
    logging.info("Write permission test passed.")
except Exception as e:
    logging.error(f"Write permission test failed: {e}")
    raise SystemExit(f"Cannot write to {BASE_DIR}: {e}")



# API configuration and extraction of data from CoinMarketCap
def extract_data():
    url = 'https://pro-api.coinmarketcap.com/v1/cryptocurrency/listings/latest'
    parameters = {
        'start': '1',
        'limit': '5000',
        'convert': 'USD'
    }
    headers = {
        'Accepts': 'application/json',
        'X-CMC_PRO_API_KEY': os.getenv('CMC_API_KEY', 'API_KEY'),
    }
    
    session = Session()
    session.headers.update(headers)
    
    try:
        logging.info("Extracting data from CoinMarketCap API...")
        response = session.get(url, params=parameters)
        response.raise_for_status()
        
        data = json.loads(response.text)
        
        # Create DataFrame from API response
        df = pd.json_normalize(data['data'])

        #Define a fixed raw file path
        raw_file = RAW_FILE

        #Check if raw file exist
        if os.path.exists(raw_file):
            #Read Existing data
            old_df = pd.read_csv(raw_file)
            #Combine new data with old
            combined = pd.concat([old_df, df], ignore_index=True)
            #Check and remove duplicates after combining data
            combined.drop_duplicates(subset=['id'], keep='last', inplace=True)
            #Save back to the raw file
            combined.to_csv(raw_file, index=False)
            logging.info(f"Raw data updated. Total rows: {len(combined)}")
            print(f"Raw data updated. Total rows: {len(combined)}")
        else:
            #Create new raw file
            df.to_csv(raw_file, index=False)
            logging.info(f"Raw data extracted and saved: {raw_file}")
            print(f"Raw data extracted and saved: {raw_file}")
        
        return df
        
    except (ConnectionError, Timeout, TooManyRedirects) as e:
        logging.error(f"Error during extraction: {e}")
        return None
    except Exception as e:
        logging.error(f"Unexpected error: {e}")
        return None
    finally:
        session.close()

if __name__ == "__main__":
    # Test the function
    result = extract_data()
    if result is not None:
        logging.info(f"Extraction successful. Retrieved {len(result)} records.")
    else:
        logging.error("Extraction failed.")


Raw data updated. Total rows: 5089


In [3]:
#Transformation of extracted data (pulling only crucial columns and renaming)
def transform_data(raw_df):
    try:
        logging.info("Starting data transformation...")
        df2 = raw_df.copy()
        
        # Convert datetime
        df2["quote.USD.last_updated"] = pd.to_datetime(df2["quote.USD.last_updated"], errors='coerce').dt.tz_localize(None)

        # Drop duplicate column
        if 'last_updated' in df2.columns:
            df2.drop(columns=['last_updated'], inplace=True)
        
        # Add category (handle NaN values)
        df2["price_change_category"] = df2["quote.USD.percent_change_24h"].apply(
            lambda x: "Rise" if pd.notna(x) and x > 0 else "Drop"
        )
        
        # Rename important columns
        df2.rename(columns={
            "quote.USD.price": "price",
            "quote.USD.volume_24h": "volume_24h",
            "quote.USD.percent_change_24h": "percent_change_24h",
            "quote.USD.market_cap": "market_cap",
            "quote.USD.last_updated": "last_updated"
        }, inplace=True)
        
        selected_cols = ["id", "name", "symbol", "price", "volume_24h",
                         "percent_change_24h", "market_cap", "last_updated", "price_change_category"]
        transformed = df2[selected_cols].copy()
        
        # Clean numeric columns
        for col in ["price", "volume_24h", "percent_change_24h", "market_cap"]:
            transformed[col] = pd.to_numeric(transformed[col], errors="coerce")
            transformed[col] = transformed[col].replace([np.inf, -np.inf], np.nan)
            transformed[col] = transformed[col].fillna(0).round(8)
        
        # Sort and reset index
        transformed = transformed.sort_values(by="id", ascending=True).reset_index(drop=True)
        
        # Save to CSV
        transformed_file = TRANSFORMED_FILE
        os.makedirs(os.path.dirname(transformed_file), exist_ok=True)
        transformed.to_csv(transformed_file, index=False)
        
        logging.info(f"Transformation completed. File saved: {transformed_file}")
        print(f"Transformation completed. {len(transformed)} records processed.")
        
        return transformed
        
    except Exception as e:
        logging.error(f"Transformation failed: {e}")
        print(f"Transformation failed: {e}")
        return None

if __name__ == "__main__":
    try:
        # Load raw data
        raw_file = RAW_FILE
        
        if not os.path.exists(raw_file):
            logging.error("Raw data file not found. Run extraction first.")
            print("Raw data file not found. Run extraction first.")
        else:
            raw_df = pd.read_csv(raw_file)
            result = transform_data(raw_df)
            
            if result is not None:
                logging.info(f"Transformation successful. Retrieved {len(result)} records.")
            else:
                logging.error("Transformation failed.")
                
    except Exception as e:
        logging.error(f"Error in main execution: {e}")
        print(f"Error in main execution: {e}")

Transformation completed. 5089 records processed.


In [4]:
#Loading the transformed data to SQL server
def load_to_sql(transformed_file, server='ALEXANDER', database='CryptoDB', table_name='CryptoData'):
    try:
        logging.info("Starting SQL Server load process...")
        
        # Validate input file
        if not os.path.exists(transformed_file):
            logging.error(f"Transformed file not found: {transformed_file}")
            return False
        
        logging.info(f"Reading transformed file: {transformed_file}")
        
        # Build connection string for SQLAlchemy
        conn_str = (
            f"mssql+pyodbc://@{server}/{database}"
            "?driver=ODBC+Driver+17+for+SQL+Server"
            "&trusted_connection=yes"
        )
        
        logging.info(f"Connecting to SQL Server: {server}/{database}")
        
        # Create SQLAlchemy engine
        engine = create_engine(conn_str, fast_executemany=True)
        
        # Test connection
        with engine.connect() as conn:
            result = conn.execute(text("SELECT @@VERSION"))
            version = result.fetchone()[0][:50]
            logging.info(f"Connected successfully! SQL Server: {version}...")
        
        # Step 1: Read transformed file
        df3 = pd.read_csv(transformed_file)
        logging.info(f"Loaded {len(df3)} records from transformed file")
        
        # Convert last_updated to datetime if needed
        if 'last_updated' in df3.columns:
            df3['last_updated'] = pd.to_datetime(df3['last_updated'], errors='coerce')
        
        # Step 2: Check if table exists and read existing IDs
        try:
            existing_ids = pd.read_sql(f"SELECT id FROM {table_name}", con=engine)
            logging.info(f"Found {len(existing_ids)} existing records in SQL table")
        except Exception as e:
            logging.warning(f"Table '{table_name}' might not exist yet. Will create it.")
            existing_ids = pd.DataFrame(columns=['id'])
        
        # Step 3: Filter out duplicates
        if not existing_ids.empty:
            df_filtered = df3[~df3["id"].isin(existing_ids["id"])]
            duplicates_count = len(df3) - len(df_filtered)
            if duplicates_count > 0:
                logging.info(f"Filtered out {duplicates_count} duplicate records")
        else:
            df_filtered = df3
            logging.info("No existing records - will insert all data")
        
        logging.info(f" {len(df_filtered)} new records ready for insert")
        
        # Step 4: Insert only new ones
        if not df_filtered.empty:
            logging.info(f"Inserting {len(df_filtered)} records into '{table_name}'...")
            df_filtered.to_sql("CryptoData", con=engine, if_exists="append", index=False)
            
            # Verify insertion
            with engine.connect() as conn:
                result = conn.execute(text(f"SELECT COUNT(*) FROM {table_name}"))
                total_rows = result.fetchone()[0]
                logging.info(f"Successfully inserted {len(df_filtered)} new records")
                logging.info(f" Total rows in '{table_name}': {total_rows}")
            
            logging.info(" SQL load completed successfully")
            return True
        else:
            logging.info("No new data to insert — all records already exist")
            return True
            
    except Exception as e:
        logging.error(f"Error during load: {e}")
        logging.error(traceback.format_exc())
        return False

if __name__ == "__main__":
    # File paths
    transformed_file = TRANSFORMED_FILE
    
    # SQL Server configuration
    server = 'ALEXANDER'
    database = 'CryptoDB'
    table_name = 'CryptoData'
    
    # Load data
    success = load_to_sql(
        transformed_file=transformed_file,
        server=server,
        database=database,
        table_name=table_name
    )
    
    if success:
        print("\nData successfully loaded to SQL Server!")
    else:
        print("\nFailed to load data. Check logs for details.")


Data successfully loaded to SQL Server!
