In [None]:
import glob
import pandas as pd
import xml.etree.ElementTree as ET
from datetime import datetime

In [None]:
# File to log ETL events
log_file = "log_file.txt"

# Final output file
target_file = "transformed_data.csv"

In [None]:
# Log any message with timestamp into the log file
def log_progress(message):
    timestamp_format = '%Y-%h-%d-%H:%M:%S'  # Format: Year-Monthname-Day-Hour-Minute-Second
    now = datetime.now()
    timestamp = now.strftime(timestamp_format)
    with open(log_file, "a") as f:
        f.write(f"{timestamp},{message}\n")

In [None]:
# Check XML file headers function
def get_xml_headers(file_to_process):
    tree = ET.parse(file_to_process)
    root = tree.getroot()

    # Grab first row for header preview
    first_row = root[0]
    headers = [elem.tag for elem in first_row]
    
    print("XML Headers Found:", headers)
    return headers

In [None]:
# Replace your_file,xml with your file
get_xml_headers("your_file.xml")

In [None]:
# Extraction function for CSV files
def extract_from_csv(file_to_process):
    # Directly read CSV into DataFrame
    dataframe = pd.read_csv(file_to_process)
    return dataframe

# Extraction function for JSON files
def extract_from_json(file_to_process):
    # Read JSON (line-delimited) into DataFrame
    dataframe = pd.read_json(file_to_process, lines=True)
    return dataframe

In [None]:

# Extraction function for XML files with dynamic parsing
def extract_from_xml(file_to_process):
    # Define expected schema and datatypes (customize this to match your needs)
    dtype_map = {
        "car_model": str,
        "year_of_manufacture": int,
        "price": float,
        "fuel": str
    }

    # Initialize empty DataFrame with correct types
    dataframe = pd.DataFrame({col: pd.Series(dtype=dtype) for col, dtype in dtype_map.items()})

    # Parse XML file
    tree = ET.parse(file_to_process)
    root = tree.getroot()

    # Iterate over each entry in the XML
    for row in root:
        record = {}
        for key in dtype_map:
            element = row.find(key)
            if element is not None and element.text is not None:
                try:
                    record[key] = dtype_map[key](element.text.strip())
                except ValueError:
                    record[key] = None  # Handle type conversion errors
            else:
                record[key] = None  # Handle missing fields
        dataframe = pd.concat([dataframe, pd.DataFrame([record])], ignore_index=True)

    return dataframe

In [None]:
# General extraction function for all supported file types
def extract():
    # Create an empty DataFrame with predefined columns
    extracted_data = pd.DataFrame(columns=['car_model', 'year_of_manufacture', 'price', 'fuel'])

    # Extract from all CSV files in the directory, excluding the output target file
    for csvfile in glob.glob("*.csv"):
        if csvfile != target_file:
            extracted_data = pd.concat([extracted_data, extract_from_csv(csvfile)], ignore_index=True)

    # Extract from all JSON files
    for jsonfile in glob.glob("*.json"):
        extracted_data = pd.concat([extracted_data, extract_from_json(jsonfile)], ignore_index=True)

    # Extract from all XML files
    for xmlfile in glob.glob("*.xml"):
        extracted_data = pd.concat([extracted_data, extract_from_xml(xmlfile)], ignore_index=True)

    return extracted_data

In [None]:
# Transform function to clean and normalize the data
def transform(data):
    # Round price to 2 decimal places
    if 'price' in data.columns:
        data['price'] = pd.to_numeric(data['price'], errors='coerce').round(2)
    return data

In [None]:
# Load the final transformed data into a CSV
def load_data(target_file, transformed_data):
    transformed_data.to_csv(target_file, index=False)

In [None]:
# --------- ETL Pipeline Execution Starts Below ---------

log_progress("ETL Job Started")

log_progress("Extract phase Started")
extracted_data = extract()
log_progress("Extract phase Ended")

log_progress("Transform phase Started")
transformed_data = transform(extracted_data)
print("Transformed Data")
print(transformed_data)
log_progress("Transform phase Ended")

log_progress("Load phase Started")
load_data(target_file, transformed_data)
log_progress("Load phase Ended")

log_progress("ETL Job Ended")


In [None]:
# Preview what the extracted data looks like
extracted_data.head()