In [7]:
import pandas as pd
import json
import sqlite3

# Step 1: Fetch data from the given source
def fetch(file_path, file_type):
    if file_type == 'csv':
        df = pd.read_csv(file_path)
        print("Data extracted from CSV file.")
    elif file_type == 'json':
        df = pd.read_json(file_path)
        print("Data extracted from JSON file.")
    elif file_type == 'sql':
        conn = sqlite3.connect(file_path)
        df = pd.read_sql("SELECT * FROM movies", conn)
        conn.close()
        print("Data extracted from SQL database.")
    else:
        raise ValueError(f"Unsupported file type: {file_type}")

    # Generate a brief summary of the ingested data
    num_records = len(df)
    num_columns = len(df.columns)
    print(f"Initial Data Summary:\nNumber of records: {num_records}\nNumber of columns: {num_columns}")

    return df

# Step 2: Modify data columns
def modify(df, columns_to_remove=None, new_columns=None):
    # Remove specified columns
    if columns_to_remove:
        df = df.drop(columns=columns_to_remove, errors='ignore')
        print(f"Columns removed: {columns_to_remove}")

    # Add new columns
    if new_columns:
        for col, data in new_columns.items():
            df[col] = data
        print(f"New columns added: {list(new_columns.keys())}")

    # Generate a brief summary after modifying the data
    num_records = len(df)
    num_columns = len(df.columns)
    print(f"Post-Modifiying Data Summary:\nNumber of records: {num_records}\nNumber of columns: {num_columns}")

    print("Data modified.")
    return df

# Step 3: Convert and store data frame to the target format
def convert_and_store(df, target_format, output_file='output'):
    if target_format == 'csv':
        df.to_csv(f'{output_file}.csv', index=False)
        print(f"Data converted to CSV and stored as '{output_file}.csv'.")

    elif target_format == 'json':
        json_data = df.to_json(orient='records')
        with open(f'{output_file}.json', 'w') as json_file:
            json_file.write(json_data)
        print(f"Data converted to JSON and stored as '{output_file}.json'.")

    elif target_format == 'sql':
        conn = sqlite3.connect(f'{output_file}.db')
        df.to_sql('movies', conn, if_exists='replace', index=False)
        conn.close()
        print(f"Data converted to SQL and stored as '{output_file}.db'.")

    else:
        raise ValueError(f"Unsupported target format: {target_format}")

    # Display the final output to the user
    print("\nFinal Transformed Data:")
    return df

# Step 4: ETL overall function
def fetch_modify_convert_store(file_path, source_format, target_format, columns_to_remove=None, new_columns=None):
    # Step 1: Fetch data from the input file
    df = fetch(file_path, source_format)

    # Step 2: Modify data (remove/add columns)
    modified_df = modify(df, columns_to_remove, new_columns)

    # Step 3: Convert and load data into the target format
    final_df = convert_and_store(modified_df, target_format)
    return final_df

In [8]:
# ETL pipeline using CSV file
file_path = 'TOP 100 IMDB MOVIES.csv'  # Uploaded file path from local drive from Kaggle
source_format = 'csv'  # 'csv', 'json', or 'sql' for the input file type
target_format = 'json'  # 'csv', 'json', or 'sql' for the desired output

# Specify columns to remove and new columns to add
columns_to_remove = ['description']
new_columns = {'source': 'IMDB'}

# Run the ETL pipeline with column modifications and file storage
fetch_modify_convert_store(file_path, source_format, target_format, columns_to_remove, new_columns)

Data extracted from CSV file.
Initial Data Summary:
Number of records: 100
Number of columns: 6
Columns removed: ['description']
New columns added: ['source']
Post-Modifiying Data Summary:
Number of records: 100
Number of columns: 6
Data modified.
Data converted to JSON and stored as 'output.json'.

Final Transformed Data:


Unnamed: 0,rank,title,genre,rating,year,source
0,18,Spider-Man: Across the Spider-Verse,"['Animation', 'Action', 'Adventure']",8.7,2023,IMDB
1,32,Oppenheimer,"['Biography', 'Drama', 'History']",8.6,2023,IMDB
2,77,Joker,"['Crime', 'Drama', 'Thriller']",8.4,2019,IMDB
3,76,Avengers: Endgame,"['Action', 'Adventure', 'Drama']",8.4,2019,IMDB
4,37,Parasite,"['Drama', 'Thriller']",8.5,2019,IMDB
...,...,...,...,...,...,...
95,99,Citizen Kane,"['Drama', 'Mystery']",8.3,1941,IMDB
96,65,The Great Dictator,"['Comedy', 'Drama', 'War']",8.4,1940,IMDB
97,49,Modern Times,"['Comedy', 'Drama', 'Romance']",8.5,1936,IMDB
98,54,City Lights,"['Comedy', 'Drama', 'Romance']",8.5,1931,IMDB


In [9]:
# ETL pipeline using JSON file
file_path = 'iris.json'  # Uploaded file path from local drive from Kaggle
source_format = 'json'  # 'csv', 'json', or 'sql' for the input file type
target_format = 'csv'  # 'csv', 'json', or 'sql' for the desired output

# Specify columns to remove and new columns to add
columns_to_remove = ['petalWidth']

# Run the ETL pipeline with column modifications and file storage
fetch_modify_convert_store(file_path, source_format, target_format, columns_to_remove)

Data extracted from JSON file.
Initial Data Summary:
Number of records: 150
Number of columns: 5
Columns removed: ['petalWidth']
Post-Modifiying Data Summary:
Number of records: 150
Number of columns: 4
Data modified.
Data converted to CSV and stored as 'output.csv'.

Final Transformed Data:


Unnamed: 0,sepalLength,sepalWidth,petalLength,species
0,5.1,3.5,1.4,setosa
1,4.9,3.0,1.4,setosa
2,4.7,3.2,1.3,setosa
3,4.6,3.1,1.5,setosa
4,5.0,3.6,1.4,setosa
...,...,...,...,...
145,6.7,3.0,5.2,virginica
146,6.3,2.5,5.0,virginica
147,6.5,3.0,5.2,virginica
148,6.2,3.4,5.4,virginica


CSV file of top 100 IMDB movies retrieved from Kaggle https://www.kaggle.com/datasets/mayurkadam9833/top-100-imdb-movies?resource=download and downloaded to local drive

JSON file of Iris data retrieved from Kaggle https://www.kaggle.com/datasets/rtatman/iris-dataset-json-version and downloaded to local drive

# Reflection:

Creating an ETL pipeline that fetched data from various formats, converted it to a target format, allowed for data frame modifications, and stored it as a new file was both insightful and challenging. This project allowed me to apply the python knowledge that I learned in class while also providing me an opportunity to gain new skills in working with multiple data frame formats. While the information I learned in lecture gave me a basis to complete this project, I also had to conduct some research on my own for small syntax problems that I encountered. This allowed me to gain a deeper understanding of how ETL pipelines are built and implemented using python, SQL, and other data systems.

The first challenge I encountered while completing this project was the initial confusion on how to start writing the code. I was able to combat this issue by reading about how ETL pipelines are written and then figuring out how to write code to complete each operation in the pipeline individually. Another challenge I faced was handling multiple data formats such as CSV, JSON, and Sql while maintaining flexibility. Since each format was unique, I had to learn how to make sure there were no errors that arose from converting data formats and modifying columns.

One thing that was easier than expected in this project was data conversion. This is because libraries like pandas and sqlite3 had the ability to read and write data from various formats without complicated code. Also, pandas allowed for flexibility that made a challenge I had faced in modifying columns simpler by allowing for seamless manipulation of the data.

A utility like this could be useful for other data projects I may encounter involving large data sets from multiple sources or with various formats. The ETL pipeline can be a very helpful tool in switching between data formats like CSV and JSON and save time for repetitive processes of data fetching, modifying, converting, and storage. It can also be useful for projects dealing with other data sources like APIs making it convenient and reliable.

Overall, I think this project was very helpful in applying my knowledge from class while also expanding my skills in working with multiple data systems and sources.
