In [1]:
# Aaron Park ync4hn

import csv
import json
import requests
import sqlite3
from typing import List, Dict

class ETLProcessor:
    # INITIALIZE VALUES
    def __init__(self, input_format: str, output_format: str):
        self.input_format = input_format
        self.output_format = output_format
        self.data = []

    # 1. Fetching the data from a URL or a local remote file
    def fetch_data(self, source: str):
        try:
            if source.startswith('http'): # URL
                response = requests.get(source)
                response.raise_for_status()
                if self.input_format == 'csv': # csv files
                    self.data = list(csv.DictReader(response.text.splitlines()))
                elif self.input_format == 'json': # json files
                    self.data = response.json()
            else:
                with open(source, 'r', encoding='utf-8') as file:
                    if self.input_format == 'csv': # csv
                        self.data = list(csv.DictReader(file))
                    elif self.input_format == 'json': # json
                        self.data = json.load(file)
            
            # Print the Ingestion Summary
            print(f"Ingestion Summary:")
            print(f"Number of records: {len(self.data)}")
            print(f"Number of columns: {len(self.data[0])}")
        
        # ERROR STATEMENTS
        except requests.RequestException as e:
            raise Exception(f"Error fetching data from URL: {str(e)}")
        except FileNotFoundError:
            raise Exception(f"Error: File '{source}' not found")
        except json.JSONDecodeError:
            raise Exception("Error: Invalid JSON format")
        except Exception as e:
            raise Exception(f"Error fetching data: {str(e)}")

    # 3. Modify the Columns from the source to the destination
    def modify_columns(self, columns_to_add: Dict[str, str] = None, columns_to_delete: List[str] = None):
        try: 
            # ADDING COLUMNS TO THE ORIGINAL DATA
            if columns_to_add != None:
                for record in self.data:
                    record.update(columns_to_add)
            # DELETING COLUMNS FROM THE ORIGINAL DATA
            elif columns_to_delete != None:
                filtered_records = []
                for record in self.data:
                    filtered_record = {key: value for key, value in record.items() if key not in columns_to_delete}
                    filtered_records.append(filtered_record)
                self.data = filtered_records         

            # PRINT OUT MODIFICATION SUMMARY
            print(f"\nColumn Modification Summary:")
            print(f"Number of records: {len(self.data)}")
            print(f"Number of columns: {len(self.data[0])}")
        except Exception as e:
            raise Exception(f"Error modifying columns: {str(e)}")

    # Convert the data AND STORE THE DATA [both 2 and 4]
    def store_data(self, destination: str):
        try:
            # if output is a csv file 
            if self.output_format == 'csv':
                with open(destination, 'w', newline='', encoding='utf-8') as file:
                    writer = csv.DictWriter(file, fieldnames=self.data[0].keys())
                    writer.writeheader()
                    writer.writerows(self.data)
            # if output is a json file [dump]
            elif self.output_format == 'json':
                with open(destination, 'w', encoding='utf-8') as file:
                    json.dump(self.data, file, indent=2)
            # if output is sql file [connect to sql and then create it through the instance]
            elif self.output_format == 'sql':
                conn = sqlite3.connect(destination)
                cursor = conn.cursor()
                # creating the table within the sql database
                columns = ', '.join([f"{key} TEXT" for key in self.data[0].keys()])
                cursor.execute(f"CREATE TABLE IF NOT EXISTS newTable ({columns})")
                
                # insert the data that we have modified
                for record in self.data:
                    placeholders = ', '.join(['?' for _ in record])
                    values = [json.dumps(value) if isinstance(value, (dict, list)) else str(value) for value in record.values()]
                    cursor.execute(f"INSERT INTO newTable VALUES ({placeholders})", tuple(values))
                
                conn.commit()
                conn.close()

            # POST PROCESSING SUMMARY
            print(f"\nPost-processing Summary:")
            print(f"Number of records: {len(self.data)}")
            print(f"Number of columns: {len(self.data[0])}")
        except sqlite3.Error as e:
            raise Exception(f"SQLite error: {str(e)}")
        except Exception as e:
            raise Exception(f"Error storing data: {str(e)}")

def main():
    try:
        input_format = input("Enter input format (csv/json): ").lower()
        if input_format not in ['csv', 'json']:
            raise ValueError("Invalid input format. Please choose 'csv' or 'json'.")

        output_format = input("Enter output format (csv/json/sql): ").lower()
        if output_format not in ['csv', 'json', 'sql']:
            raise ValueError("Invalid output format. Please choose 'csv', 'json', or 'sql'.")

        source = input("Enter your Source File or URL: ")
        destination = input("Enter your Destination File Name: ")

        processor = ETLProcessor(input_format, output_format)

        processor.fetch_data(source)
        
        modify_format = input("Do you want to add some columns or delete some columns (add/delete): ").lower()
        if modify_format == "delete":
            columns_to_delete = [col.strip() for col in input("Enter columns to delete (comma-separated): ").split(',')]
            columns_to_add = None
        elif modify_format == "add":
            columns_to_add = {'Processed': 'True'}  # Example of adding a column
            columns_to_delete = None

        processor.modify_columns(columns_to_add, columns_to_delete)
        processor.store_data(destination)
        
        print("ETL process completed successfully.")
    except ValueError as e:
        print(f"Error: {str(e)}")
    except Exception as e:
        print(f"An error occurred: {str(e)}")

if __name__ == "__main__":
    main()

Enter input format (csv/json):  csv
Enter output format (csv/json/sql):  json
Enter your Source File or URL:  spongebob_episodes.csv
Enter your Destination File Name:  episodes.json


Ingestion Summary:
Number of records: 583
Number of columns: 25


Do you want to add some columns or delete some columns (add/delete):  add



Column Modification Summary:
Number of records: 583
Number of columns: 26

Post-processing Summary:
Number of records: 583
Number of columns: 26
ETL process completed successfully.


Testing Code Below

In [5]:
# CSV TO JSON - DELETE           [add done above]
input_format = "csv"
output_format = "json"

source = "spongebob_episodes.csv"
destination = "episodes_delete.json"

processor = ETLProcessor(input_format, output_format)
processor.fetch_data(source)
columns_to_delete = ["Airdate", "Animation"]
columns_to_add = None
processor.modify_columns(columns_to_add, columns_to_delete)
processor.store_data(destination)

print("ETL process completed successfully.")


Ingestion Summary:
Number of records: 583
Number of columns: 25

Column Modification Summary:
Number of records: 583
Number of columns: 23

Post-processing Summary:
Number of records: 583
Number of columns: 23
ETL process completed successfully.


In [7]:
# JSON TO SQL - ADD 
input_format = "json"
output_format = "sql"

source = "spongebob_episodes.json"
destination = "episodes_add.sql"

processor = ETLProcessor(input_format, output_format)
processor.fetch_data(source)
columns_to_delete = None
columns_to_add = {"Processed": "True"}
processor.modify_columns(columns_to_add, columns_to_delete)
processor.store_data(destination)

print("ETL process completed successfully.")


Ingestion Summary:
Number of records: 583
Number of columns: 4

Column Modification Summary:
Number of records: 583
Number of columns: 5

Post-processing Summary:
Number of records: 583
Number of columns: 5
ETL process completed successfully.


In [8]:
# JSON TO SQL - DELETE
input_format = "json"
output_format = "sql"

source = "spongebob_episodes.json"
destination = "episodes_delete.sql"

processor = ETLProcessor(input_format, output_format)
processor.fetch_data(source)
columns_to_delete = ["Airdate", "Creative"]
columns_to_add = None
processor.modify_columns(columns_to_add, columns_to_delete)
processor.store_data(destination)

print("ETL process completed successfully.")


Ingestion Summary:
Number of records: 583
Number of columns: 4

Column Modification Summary:
Number of records: 583
Number of columns: 4

Post-processing Summary:
Number of records: 583
Number of columns: 4
ETL process completed successfully.
