In [1]:
import glob
import pandas as pd
import xml.etree.ElementTree as ET
from datetime import datetime
from sqlalchemy import create_engine
import urllib.parse

In [2]:
class ETLFactory:
    def __init__(self, host, user, password, database, table_name):
        self.host = host
        self.user = user
        self.password = urllib.parse.quote_plus(password)
        self.database = database
        self.table_name = table_name

        self.columns = ['age', 'sex', 'bmi', 'children', 'smoker', 'region', 'charges']
        self.engine = create_engine(f"mysql+mysqlconnector://{user}:{self.password}@{host}/{database}")
        

    def extract_from_csv(self, file_to_process):
        dataframe = pd.read_csv(file_to_process)
        return dataframe

    
    def extract_from_json(self, file_to_process):
        dataframe= pd.read_json(file_to_process)
        return dataframe

    
    def extract_from_xml(self, file_to_process):
        dataframe = pd.DataFrame(columns=['age', 'sex', 'bmi', 'children', 'smoker', 'region', 'charges'])
        tree = ET.parse(file_to_process)
        root = tree.getroot()
        for row in root:
            age = int(row.find("age").text)
            sex = row.find("sex").text
            bmi = float(row.find("bmi").text)
            children = row.find("children").text
            smoker = row.find("smoker").text
            region = row.find("region").text
            charges = float(row.find("charges").text)
            row_data = pd.DataFrame({'age': [age], 'sex': [sex], 'bmi': [bmi], 'children': [children], 'smoker': [smoker], 'region': [region], 'charges': [charges]})
            dataframe = pd.concat([dataframe, row_data], ignore_index=True)
        return dataframe

    
    def extract(self, file_pattern):
        extracted_data = pd.DataFrame(columns=self.columns)
        for file_to_process in glob.glob(file_pattern):
            if file_to_process.endswith('.csv'):
                extracted_data = pd.concat([extracted_data, self.extract_from_csv(file_to_process)], ignore_index=True)
            elif file_to_process.endswith('.json'):
                extracted_data = pd.concat([extracted_data, self.extract_from_json(file_to_process)], ignore_index=True)
            elif file_to_process.endswith('.xml'):
                extracted_data = pd.concat([extracted_data, self.extract_from_xml(file_to_process)], ignore_index=True)
        return extracted_data

    
    def transform(self, data):
        data['bmi'] = round(data['bmi'], 3)
        data['charges'] = round(data['charges'], 3)
        data['smoker'] = data['smoker'].map({'yes': 1, 'no': 0})
        return data

    
    def load(self, data, if_exists='replace'):
        data.to_sql(name=self.table_name, con=self.engine, if_exists=if_exists, index=False)

    
    def log(self, message):
        timestamp_format = '%Y-%m-%d %H:%M:%S.%f'
        now = datetime.now()
        timestamp = now.strftime(timestamp_format)
        with open("./log/logfile.txt", "a") as f:
            f.write(timestamp + ', ' + message + '\n')

    
    def run(self, file_pattern):
        self.log("ETL Job Started")
        self.log("Extracting Data")
        extracted_data = self.extract(file_pattern)
        self.log("Data Extraction Completed")
        self.log("Transforming Data")
        transformed_data = self.transform(extracted_data)
        self.log("Data Transformation Completedd")
        self.log("Loading Data")
        self.load(transformed_data)
        self.log("Data Loaded")
        self.log("ETL Job Completed")

In [3]:
host = 'localhost'
user = '' # add your user
password = '' # add your password
database = 'insurance_db'
table_name = 'tbl_insurance'
file_pattern = "./datasets/*.*"

etl = ETLFactory(host, user, password, database, table_name)
etl.run(file_pattern)

  extracted_data = pd.concat([extracted_data, self.extract_from_csv(file_to_process)], ignore_index=True)
  dataframe = pd.concat([dataframe, row_data], ignore_index=True)
