In [21]:
import numpy as np
import pandas as pd
from datetime import datetime
import glob
import os

from bs4 import BeautifulSoup
import requests
from requests_html import HTMLSession

from sqlalchemy import create_engine, text
from pymongo import MongoClient


In [22]:
def intialize_html():
    url = "http://localhost:8000"

    r = requests.get(url)
    html = r.text

    soup = BeautifulSoup(html, "html.parser")
    return soup

def extract_from_website(columns_headers):

    soup = intialize_html()

    tables = soup.find_all("tbody")
    rows = tables[0].find_all("tr")

    data_list = []

    for row in rows:
        col = row.find_all("td")

        data_dict = {}
        for i in range(len(columns_headers)):
            data_dict[columns_headers[i]] = col[i].contents[0]
            
        # data_dict = {columns_headers[0]: col[0].contents[0],
        #             columns_headers[1]: col[1].contents[0],
        #             columns_headers[2]: col[2].contents[0],
        #             columns_headers[3]: col[3].contents[0],
        #             columns_headers[4]: col[4].contents[0],
        #             columns_headers[5]: col[5].contents[0],
        #             columns_headers[6]: col[6].contents[0],
        #             columns_headers[7]: col[7].contents[0]
        #             }
        
        data_list.append(data_dict)

    df = pd.DataFrame(data_list, columns=columns_headers)
    return df

In [23]:
def extract_from_csv(csv_file):
    dataframe = pd.read_csv(csv_file)
    return dataframe

def extract_from_json(json_file):
    try:
        return pd.read_json(json_file)
    except ValueError as e:
        print(f"Error reading {json_file}: {e}")

In [38]:
def extract(columns_headers):
    extracted_data = pd.DataFrame(columns=columns_headers)
    
    website_data = extract_from_website(columns_headers)
    extracted_data = pd.concat([extracted_data, website_data], ignore_index=True)

    for csv_file in glob.glob("etl_files\*.csv"):
        extracted_data = pd.concat([extracted_data, pd.DataFrame(extract_from_csv(csv_file))], ignore_index=True)
    
    for json_file in glob.glob("etl_files\*.json"):
        extracted_data = pd.concat([extracted_data, pd.DataFrame(extract_from_json(json_file))], ignore_index=True)

    
    return extracted_data

def transform(extracted_data):

    numeric_columns = ["SepalLengthCm","SepalWidthCm","PetalLengthCm","PetalWidthCm","Cost"]
    extracted_data[numeric_columns] = extracted_data[numeric_columns].astype(float)
    extracted_data[["Id"]] = extracted_data[["Id"]].astype(int)

    return extracted_data

def load_to_csv(df, csv_path):
    df.to_csv(csv_path, index=False)

def load_to_db_mySQL(df, my_sql_conn, table_name):
    df.to_sql(table_name, my_sql_conn, if_exists='replace', index=False)

def load_to_db_postgreSQL(df, postgres_conn, table_name):
    df.to_sql(table_name, postgres_conn, if_exists='replace', index=False)

def load_to_db_mongodb(df,collection):
    data = df.to_dict(orient="records")

    collection.insert_many(data)

def log_progress(message,log_file):
    timestamp_format = "%Y-%h-%d-%H:%M-%S"
    now = datetime.now()
    timestamp = now.strftime(timestamp_format)
    with open(log_file,"a") as f:
        f.write(timestamp + "," + message + "\n")
    

def run_query(query_statement, conn):
    # Running the query
    with conn.connect() as connection:
        result = connection.execute(text(query_statement))
        for row in result:
            print(row)

def run_noSQL_query(query_statement, collection):
    
    # Query to find all documents
    results = collection.find(query)

    for doc in results:
        print(doc)

    client.close()

In [39]:
from dotenv import load_dotenv
load_dotenv()

log_file = "log_file.txt"
columns_headers = ["Id","SepalLengthCm","SepalWidthCm","PetalLengthCm","PetalWidthCm","Species","Diseases","Cost"]
csv_path = "dataset/dataset_final.csv"

#Never keep table_name as all CAPS
table_name = "iris"
MYSQL_CONNECTION_STRING = os.getenv('MYSQL_CONNECTION_STRING')
POSTGRESQL_CONNECTION_STRING = os.getenv("POSTGRESQL_CONNECTION_STRING")
MONGODB_CONNECTION_STRING = os.getenv("MONGODB_CONNECTION_STRING")

MONGODB_DATABASE = os.getenv("MONGODB_DATABASE")

In [40]:
log_progress("Preliminaries complete. Initiating ETL process.",log_file)
if os.path.exists(csv_path) and os.path.getsize(csv_path) > 0:
    print("Already Extracted")
    raw_extracted_data = pd.read_csv(csv_path)
else:
    raw_extracted_data = extract(columns_headers)

log_progress("Preliminaries complete. Initiating ETL process.",log_file)

transformed_data = transform(raw_extracted_data)
log_progress("Data transformation complete. Initiating loading process.",log_file)

load_to_csv(transformed_data,csv_path)
log_progress("Data saved to CSV file.",log_file)

log_progress("SQL Connection initiated.",log_file)
my_sql_engine = create_engine(MYSQL_CONNECTION_STRING)

load_to_db_mySQL(transformed_data,my_sql_engine,table_name)
log_progress("Data loaded to MySQL Database as table.",log_file)

log_progress("PostGreSQL Connection initiated.",log_file)
postgreSQL_engine = create_engine(POSTGRESQL_CONNECTION_STRING)

load_to_db_postgreSQL(transformed_data,postgreSQL_engine,table_name)
log_progress("Data loaded to PostGreSQL Database as table.",log_file)

log_progress("MongoDB Connection initiated.",log_file)
client = MongoClient(MONGODB_CONNECTION_STRING)
db = client[MONGODB_DATABASE]
collection = db[table_name]

load_to_db_mongodb(transformed_data,collection)
log_progress("Data loaded to Mongodb Database as table.",log_file)
client.close()

log_progress("Process Complete.",log_file)

Already Extracted


In [None]:
query = f"SELECT * FROM {table_name};"
run_query(query_statement=query,conn=postgreSQL_engine)

In [None]:
client = MongoClient(MONGODB_CONNECTION_STRING)
db = client[MONGODB_DATABASE]
collection = db[table_name]

query = {"Cost": {"$gt": 150}}

run_noSQL_query(query, collection)