In [1]:
pip install kagglehub

Note: you may need to restart the kernel to use updated packages.



[notice] A new release of pip is available: 24.0 -> 24.2
[notice] To update, run: C:\Users\Brian\AppData\Local\Microsoft\WindowsApps\PythonSoftwareFoundation.Python.3.11_qbz5n2kfra8p0\python.exe -m pip install --upgrade pip


In [2]:
import os
import pandas as pd
import json
import sqlite3
import requests
import kagglehub

dataset_path = kagglehub.dataset_download("jvanark/nvidia-daily-stock-price-data")
csv_file = os.path.join(dataset_path, "nvidia_stock_prices.csv")

def load_data(file_path_or_url, api_key=None):
    # Check if the input is a URL
    if file_path_or_url.startswith('http://') or file_path_or_url.startswith('https://'):
        params = {'access_key': api_key} if api_key else {}
        response = requests.get(file_path_or_url, params=params)
        
        if response.status_code != 200:
            raise ValueError(f"Error fetching data: {response.status_code}")
        
        content_type = response.headers['Content-Type']
        
        if 'application/json' in content_type:
            data = pd.DataFrame(response.json()['data'])  # Adjusted for MarketStack's API response structure
            print(f"Loaded {len(data)} records from API (JSON)")
            return data
        elif 'text/csv' in content_type:
            from io import StringIO
            data = pd.read_csv(StringIO(response.text))
            print(f"Loaded {len(data)} records from API (CSV)")
            return data
        else:
            raise ValueError("Unsupported Content-Type from API!")
    
    elif file_path_or_url.endswith('.csv'):
        data = pd.read_csv(file_path_or_url)
        print(f"Loaded {len(data)} records from {file_path_or_url}")
        return data
    elif file_path_or_url.endswith('.json'):
        with open(file_path_or_url, 'r') as f:
            data = pd.DataFrame(json.load(f))
        print(f"Loaded {len(data)} records from {file_path_or_url}")
        return data
    else:
        raise ValueError("Unsupported file format!")

def modify_columns(data, drop_columns=None, add_columns=None):
    if drop_columns:
        data = data.drop(columns=drop_columns)
        print(f"Dropped columns: {drop_columns}")
    if add_columns:
        for col, default_value in add_columns.items():
            data[col] = default_value
        print(f"Added columns: {add_columns.keys()} with default values.")    
    return data

def convert_data(data, output_format):
    if output_format == 'csv':
        output_file = "output_data.csv"
        data.to_csv(output_file, index=False)
        print(f"Data converted to CSV and saved to {output_file}")
    elif output_format == 'json':
        output_file = "output_data.json"
        data.to_json(output_file, orient='records', lines=True)
        print(f"Data converted to JSON and saved to {output_file}")
    else:
        raise ValueError("Unsupported output format!")
    return output_file

def store_in_sqlite(data, table_name):
    conn = sqlite3.connect('etl_data.db')
    data.to_sql(table_name, conn, if_exists='replace', index=False)
    print(f"Data stored in SQLite table '{table_name}'")
    conn.close()

def etl_process(file_path_or_url, output_format='csv', store_sql=False, drop_columns=None, add_columns=None, api_key=None):
    try:
        data = load_data(file_path_or_url, api_key=api_key)
        print(f"Initial Data Summary: {len(data)} records, {len(data.columns)} columns")
        data = modify_columns(data, drop_columns, add_columns)
        converted_file = convert_data(data, output_format)
        if store_sql:
            store_in_sqlite(data, table_name="stock_prices")
        print(f"Post-processing Data Summary: {len(data)} records, {len(data.columns)} columns")
    except Exception as e:
        print(f"Error during ETL process: {str(e)}")

# Example usage with a local CSV file
etl_process(
    csv_file, 
    output_format='json', 
    store_sql=True, 
    drop_columns=['Open', 'Close'],  
    add_columns={'New_Column': 'default_value'} 
)

# Example usage with an API URL (with API key for MarketStack)
api_url = 'http://api.marketstack.com/v1/currencies'
api_key = 'api-key'  # Replace with your actual API key

etl_process(
    api_url, 
    output_format='json', 
    store_sql=True, 
    api_key=api_key,
    drop_columns=None, 
    add_columns={'New_Column': 'default_value'}
)


  from .autonotebook import tqdm as notebook_tqdm


Loaded 5033 records from C:\Users\Brian\.cache\kagglehub\datasets\jvanark\nvidia-daily-stock-price-data\versions\2\nvidia_stock_prices.csv
Initial Data Summary: 5033 records, 6 columns
Dropped columns: ['Open', 'Close']
Added columns: dict_keys(['New_Column']) with default values.
Data converted to JSON and saved to output_data.json
Data stored in SQLite table 'stock_prices'
Post-processing Data Summary: 5033 records, 5 columns
Loaded 42 records from API (JSON)
Initial Data Summary: 42 records, 3 columns
Added columns: dict_keys(['New_Column']) with default values.
Data converted to JSON and saved to output_data.json
Data stored in SQLite table 'stock_prices'
Post-processing Data Summary: 42 records, 4 columns
