<a href="https://colab.research.google.com/github/davidmarip/DS2002-PROJECT/blob/main/DataProject1.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [42]:
import pandas as pd
import requests
import json
import sqlite3
import os

#Resources we used:
#https://www.geeksforgeeks.org/what-is-etl-extract-transform-load/
#https://aws.amazon.com/what-is/etl/
#https://stackoverflow.com/questions/71004420/trying-to-read-json-from-url-and-parse-into-csv-format
#https://www.geeksforgeeks.org/save-api-data-into-csv-format-using-python/
#https://www.sqlitetutorial.net/sqlite-python/insert/
#https://pythonforthelab.com/blog/storing-data-with-sqlite/



In [18]:
def load_input_data(input_file, input_format):
    try:
        if input_format == 'csv':
            return pd.read_csv(input_file)
        elif input_format == 'json':
            with open(input_file, 'r') as f:
                data = json.load(f)
                return pd.json_normalize(data)
        else:
            raise ValueError("Unsupported input format. Choose 'csv' or 'json'.")
    except FileNotFoundError as e:
        raise FileNotFoundError(f"File {input_file} not found: {e}")
    except json.JSONDecodeError as e:
        raise ValueError(f"Error decoding JSON: {e}")

    
#loads data either from CSV or JSON, and converts the JSON data into a pandas DataFrame

In [20]:
def modify_columns(data, keep_columns=None, add_columns=None, remove_columns=None):
    # Check if columns exist before modifying
    if keep_columns:
        missing_keep = [col for col in keep_columns if col not in data.columns]
        if missing_keep:
            raise ValueError(f"Columns to keep not found: {missing_keep}")
        data = data[keep_columns]

    if add_columns:
        for column, value in add_columns.items():
            data[column] = value

    if remove_columns:
        missing_remove = [col for col in remove_columns if col not in data.columns]
        if missing_remove:
            raise ValueError(f"Columns to remove not found: {missing_remove}")
        data = data.drop(columns=remove_columns)

    return data

# to keep, add, or drop specified columns

In [22]:
def convert_and_save(data, output_file, output_format, db_table=None, db_file=None):

    if output_format == 'csv':
        data.to_csv(output_file, index=False)
        print(f"Data saved as CSV to {output_file}")
    elif output_format == 'json':
        data_json = data.to_json(orient='records', indent=4)
        with open(output_file, 'w') as f:
            f.write(data_json)
        print(f"Data saved as JSON to {output_file}")
    elif output_format == 'sqlite':
        if db_file is None or db_table is None:
            raise ValueError("For SQL output, specify both db_file and db_table.")
        conn = sqlite3.connect(db_file)  # Use db_file, not output_file
        try:
            data.to_sql(db_table, conn, if_exists='replace', index=False)
            print(f"Data saved to SQLite table '{db_table}' in database {db_file}")
        finally:
            conn.close()
    else:
        raise ValueError("Unsupported output format. Choose 'csv', 'json', or 'sqlite'.")

#convert the data format (CSV, Json, or SQL) to the desired output format

In [24]:
def store_data(data, output_format, output_file, db_table=None, db_file=None):
  convert_and_save(data, output_file, output_format, db_table, db_file)

#function to store the data in SQl or write to disk (as CSV or Json)

In [26]:
def view_sql_data(db_file, db_table):
    conn = sqlite3.connect(db_file)
    query = f"SELECT * FROM {db_table}"
    df = pd.read_sql_query(query, conn)
    conn.close()
    return df

In [28]:
def etl_processor(input_file, input_format='csv', output_format='csv',
                  output_file='output_data', keep_columns=None, add_columns=None, remove_columns=None,
                  db_table=None, db_file=None):
    data = load_input_data(input_file, input_format)
    modified_data = modify_columns(data, keep_columns, add_columns, remove_columns)
    store_data(modified_data, output_format, output_file, db_table, db_file)
    return modified_data

The Input CSV file, US_Census_Tract_Area_2010.csv, is mounted through a local file. It's from Charlottesville open data with information about specific demographics. It contains 12 records and 353 columns of multiple demographic attributes.

In [31]:
input_file = '/Users/davidmarip/Downloads/DS2002/US_Census_Tract_Area_2010.csv'  #file path VARIES BY DEVICE LOCAL STORAGE!
transformed_US2010census_data = etl_processor(
    input_file=input_file,
    input_format='csv',  # Input file format
    output_format='json',  # Desired output format ('csv', 'json', 'sql')
    output_file='transformed_US2010census_data',  # Base name for output file
    keep_columns=['ID','AREA_','Population','White','Black', 'AmIndian', 'Asian', 'Hawaiian', 'Other'],  # Columns to keep
    add_columns=None,
    remove_columns=None,
    db_table='input_file',  # For SQL output
    db_file='database.db'  # SQLite database for SQL output
)
df = pd.read_json('transformed_US2010census_data')
df.head()
# Example Usage for US_Census_Tract_Area_2010 for Charlottesville
# takes inputted cvs file and transforms (keeps columns ObjectID, ID, Area, Population, White, and Black)
# output formatted as Json saved to transformed_US2010census_data file

Data saved as JSON to transformed_US2010census_data


Unnamed: 0,ID,AREA_,Population,White,Black,AmIndian,Asian,Hawaiian,Other
0,9011,1.186296,4675,3505,706,3,180,2,130
1,9030,0.474023,3305,1253,1708,21,135,0,80
2,9047,0.374655,4351,2681,355,2,1087,1,55
3,9065,0.639455,3324,1471,1588,7,44,0,122
4,9084,0.370543,5617,3936,1097,12,339,3,59


In [33]:
df.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 12 entries, 0 to 11
Data columns (total 9 columns):
 #   Column      Non-Null Count  Dtype  
---  ------      --------------  -----  
 0   ID          12 non-null     int64  
 1   AREA_       12 non-null     float64
 2   Population  12 non-null     int64  
 3   White       12 non-null     int64  
 4   Black       12 non-null     int64  
 5   AmIndian    12 non-null     int64  
 6   Asian       12 non-null     int64  
 7   Hawaiian    12 non-null     int64  
 8   Other       12 non-null     int64  
dtypes: float64(1), int64(8)
memory usage: 996.0 bytes


The output DataFrame is a transformed version of the US_Census_Tract_Area_2010.csv with 12 Records and only 9 columns. Of these columns includes: ID, AREA_, Population, White, Black, AmIndian, Asian, Hawaiian, Other. It focuses the data to more specific demographics and get's rid of columns such as State and County, which had the same values for each row.

In [36]:
def test_json_to_csv():
    try:
        input_file = '/Users/davidmarip/Downloads/DS2002/Airports.json'
        output_file = '/Users/davidmarip/Downloads/DS2002/airports_transformed.csv'
        
        transformed_data = etl_processor(
            input_file=input_file,
            input_format='json',
            output_format='csv',
            output_file=output_file
        )
        
        assert os.path.exists(output_file), "CSV file was not created"
        print(f"Test for JSON to CSV passed")
    except Exception as e:
        print(f"Test for JSON to CSV failed: {e}")

# Call the test function
test_json_to_csv()


Data saved as CSV to /Users/davidmarip/Downloads/DS2002/airports_transformed.csv
Test for JSON to CSV passed


In [40]:
import requests
import json
import os
#https://www.geeksforgeeks.org/save-api-data-into-csv-format-using-python/

def test_api_to_csv():
    print("Starting API to CSV test")
    try:
        # NASA API URL for meteorite landings dataset in JSON format
        url = "https://data.nasa.gov/api/views/gh4g-9sfh/rows.json?accessType=DOWNLOAD"
        response = requests.get(url)
        data = response.json()

        # Save the JSON response to a file
        input_file = 'meteorite.json'
        with open(input_file, 'w') as f:
            json.dump(data, f, indent=4)
        
        # Convert the JSON to CSV
        output_file = 'meteorite_transformed.csv'
        transformed_data = etl_processor(
            input_file=input_file,
            input_format='json',
            output_format='csv',
            output_file=output_file
        )
        
        assert os.path.exists(output_file), "CSV file was not created from API data"
        print(f"Test for NASA API to CSV passed")
    except Exception as e:
        print(f"Test for NASA API to CSV failed: {e}")

# Call the function to run the test
test_api_to_csv()


Starting API to CSV test
Data saved as CSV to meteorite_transformed.csv
Test for NASA API to CSV passed


In [198]:
def test_generate_summary():
    print("Starting summary test")
    try:
        input_file = '/Users/davidmarip/Downloads/DS2002/New_Market_Tax_Credit_Eligible_Area_2015.csv'
        output_file = '/Users/davidmarip/Downloads/DS2002/market_data_summary.csv'
        keep_columns = ['COUNTY', 'TRACT', 'NAME', 'STATUS', 'FIPS']
        
        # Pre-processing summary
        data = pd.read_csv(input_file)
        pre_summary = {
            'number_of_records': data.shape[0],
            'number_of_columns': data.shape[1]
        }
        print(f"Pre-processing summary: {pre_summary}")
        
        # Run ETL
        transformed_data = etl_processor(
            input_file=input_file,
            input_format='csv',
            output_format='csv',
            output_file=output_file,
            keep_columns=keep_columns
        )
        
        # Post-processing summary
        post_summary = {
            'number_of_records': transformed_data.shape[0],
            'number_of_columns': transformed_data.shape[1]
        }
        print(f"Post-processing summary: {post_summary}")
        assert post_summary['number_of_columns'] == len(keep_columns), "Column modification failed"
        print(f"Test for summarizing data passed")
    except Exception as e:
        print(f"Test for summarizing data failed: {e}")

test_generate_summary()


Starting summary test
Pre-processing summary: {'number_of_records': 12, 'number_of_columns': 6}
Data saved as CSV to /Users/davidmarip/Downloads/DS2002/market_data_summary.csv
Post-processing summary: {'number_of_records': 12, 'number_of_columns': 5}
Test for summarizing data passed


In [199]:
def test_missing_file():
    print("Starting missing file test")
    try:
        input_file = '/Users/davidmarip/Downloads/DS2002/missing_file.csv'
        
        etl_processor(
            input_file=input_file,
            input_format='csv',
            output_format='csv',
            output_file='output.csv'
        )
        print("Test for missing file failed: no exception raised")
    except FileNotFoundError:
        print(f"Test for missing file passed")

test_missing_file()


Starting missing file test
Test for missing file passed


In [200]:
def test_unsupported_format():
    print("Starting unsupported format test")
    try:
        input_file = '/Users/davidmarip/Downloads/DS2002/US_Census_Tract_Area_2010.csv'
        
        etl_processor(
            input_file=input_file,
            input_format='csv',
            output_format='xml',  # Unsupported format
            output_file='output.xml'
        )
        print("Test for unsupported format failed: no exception raised")
    except ValueError as e:
        assert str(e) == "Unsupported output format. Choose 'csv', 'json', or 'sqlite'."
        print(f"Test for unsupported format passed")

test_unsupported_format()

Starting unsupported format test
Test for unsupported format passed


In [46]:
#https://pythonforthelab.com/blog/storing-data-with-sqlite/
#https://pythonforthelab.com/blog/storing-data-with-sqlite/
def test_save_to_sqlite():
    print("Starting CSV to SQLite test")
    try:
        input_file = '/Users/davidmarip/Downloads/DS2002/US_Census_Tract_Area_2010.csv'
        db_file = 'test_census_data.db'
        db_table = 'census_data'
        
        etl_processor(
            input_file=input_file,
            input_format='csv',
            output_format='sqlite',
            output_file=None,
            keep_columns=None,
            add_columns=None,
            remove_columns=None,
            db_table=db_table,
            db_file=db_file
        )
        
        # Check if the data was written to the SQLite table
        conn = sqlite3.connect(db_file)
        query = f"SELECT * FROM {db_table} LIMIT 5"
        result = pd.read_sql_query(query, conn)
        conn.close()
        
        assert not result.empty, "SQLite table is empty"
        print(f"Test for saving data to SQLite passed")
    except Exception as e:
        print(f"Test for saving data to SQLite failed: {e}")

test_save_to_sqlite()

Starting CSV to SQLite test
Data saved to SQLite table 'census_data' in database test_census_data.db
Test for saving data to SQLite passed
