In [None]:
# Import required libraries
import requests
import pandas as pd
import json
import sqlite3
from io import StringIO
import os

# Install necessary packages
!pip install pandas requests

# CSV data source: https://www.kaggle.com/datasets/berkanoztas/synthetic-transaction-monitoring-dataset-aml
# Download the SAML-D.csv file from the above link and upload it to your Google Colab



In [None]:
# checking working directory to see if the file was uploaded correctly
print("Current working directory:", os.getcwd())
print("Files in the current directory:", os.listdir())

Current working directory: /content
Files in the current directory: ['.config', 'test', 'SAML-D.csv', '.ipynb_checkpoints', 'sample_data']


In [1]:
def fetch_data(source, file_path=None):
    """
    Fetch data from either a CSV file or an API
    """
    if source == 'csv':
        if not os.path.exists(file_path):
            print(f"Error: The file {file_path} does not exist.")
            return None
        try:
            df = pd.read_csv(file_path)
            print(f"Successfully read CSV file from {file_path}")
            return df
        except Exception as e:
            print(f"An unexpected error occurred while reading the file: {e}")
            return None
    elif source == 'api':
        API_KEY = "ffbc68e4-95fa-41a6-ae12-92538acdb860"
        BASE_URL = "https://api.coincap.io/v2"
        headers = {"Authorization": f"Bearer {API_KEY}"}
        url = f"{BASE_URL}/assets"
        try:
            response = requests.get(url, headers=headers)
            response.raise_for_status()
            data = response.json()["data"]
            print("Successfully fetched data from CoinCap API")
            return pd.DataFrame(data)
        except requests.RequestException as e:
            print(f"Error fetching data from API: {e}")
            return None
    else:
        raise ValueError("Invalid source. Choose 'csv' or 'api'.")

def convert_format(df, output_format):
    """
    Convert DataFrame to specified output format
    """
    if output_format == 'json':
        return df.to_json(orient='records')
    elif output_format == 'csv':
        return df.to_csv(index=False)
    elif output_format == 'sql':
        return df
    else:
        raise ValueError("Invalid output format. Choose 'json', 'csv', or 'sql'.")

def modify_columns(df, columns_to_keep=None, columns_to_add=None):
    """
    Modify DataFrame columns based on user input
    """
    if columns_to_keep:
        df = df[columns_to_keep]
    if columns_to_add:
        df = df.assign(**columns_to_add)
    return df

def store_data(data, output_format, filename=None, table_name=None):
    """
    Store data in specified format (file or SQL database)
    """
    if output_format in ['json', 'csv']:
        with open(filename, 'w') as f:
            f.write(data)
        print(f"Data stored in {filename}")
    elif output_format == 'sql':
        with sqlite3.connect('output.db') as conn:
            data.to_sql(table_name, conn, if_exists='replace', index=False)
        print(f"Data stored in SQLite database 'output.db', table '{table_name}'")

def generate_summary(df, stage):
    """
    Generate and print summary of the data
    """
    print(f"\n{stage} Summary:")
    print(f"Number of records: {len(df)}")
    print(f"Number of columns: {len(df.columns)}")

In [2]:
def get_user_input():
    """
    Get user input for data source and output format
    """
    source = input("Enter data source (csv/api): ").lower()
    file_path = None
    if source == 'csv':
        file_path = input("Enter the filename of your CSV file (e.g., 'SAML-D.csv'): ")
        file_path = f"/content/{file_path}"  # Adjust path for Colab
    output_format = input("Enter output format (json/csv/sql): ").lower()
    return source, file_path, output_format

def get_column_modifications():
    """
    Get user input for column modifications
    """
    columns_to_keep = input("Enter columns to keep (comma-separated, leave blank for all): ").split(',') if input("Do you want to keep specific columns? (y/n): ").lower() == 'y' else None
    columns_to_add = {col: input(f"Enter value for new column '{col}': ") for col in input("Enter new columns to add (comma-separated, leave blank for none): ").split(',') if col}
    return columns_to_keep, columns_to_add

In [None]:
def main():
    """
    Main function to run the ETL pipeline
    """
    try:
        # Get user input
        source, file_path, output_format = get_user_input()

        # Fetch data
        df = fetch_data(source, file_path)
        if df is None:
            raise ValueError("Failed to fetch data")

        # Generate pre-processing summary
        generate_summary(df, "Pre-processing")

        # Modify columns
        columns_to_keep, columns_to_add = get_column_modifications()
        df = modify_columns(df, columns_to_keep, columns_to_add)

        # Convert format
        converted_data = convert_format(df, output_format)

        # Store data
        if output_format in ['json', 'csv']:
            filename = f"/content/{input('Enter output filename: ')}"
            store_data(converted_data, output_format, filename)
        elif output_format == 'sql':
            table_name = input("Enter SQL table name: ")
            store_data(df, output_format, table_name=table_name)

        # Generate post-processing summary
        generate_summary(df, "Post-processing")

    except Exception as e:
        print(f"An error occurred: {e}")

if __name__ == "__main__":
    main()