## TASKS
```
DESCRIPTION                             STATUS 
- Load seed data into DB.               done
- Clean and transform scraped data      done
- Create an idempotent etl job          ongoing
```

In [50]:
import pandas as pd
from pathlib import Path
import os
from datetime import datetime
import logging

In [51]:
# Setup logging
logging.basicConfig(
    level=logging.INFO, format="%(asctime)s - %(levelname)s - %(message)s"
)
logger = logging.getLogger(__name__)


In [70]:
scraped_data_source = r"C:\Users\APIN PC\OneDrive\Documents\DS\DE_Inter\data_epic_capstone\etl\data\20250601_213652_ai_tools_scraped.csv"
seed_data_source = r"C:\Users\APIN PC\OneDrive\Documents\DS\DE_Inter\data_epic_capstone\etl\data\seeded_ai_agents.csv"

## Data Loading

In [71]:
def read_data(source_path: str) -> pd.DataFrame:
    """

    Args:
        source_path (str): Data Path

    Raises:
        ValueError: Raises error for unsupported data type.

    Returns:
        dataframe: Pandas Dataframe.
    """

    try:
        ext = Path(source_path).suffix
        if ext == ".csv":
            return pd.read_csv(source_path)
        elif ext == ".json":
            return pd.read_json(source_path)
        elif ext == ".parquet":
            return pd.read_parquet(source_path)
        logger.info("Data successfully read!")
    except Exception as e:
        logger.error(f"Error: {e}. Unsupported file format! Use csv, json or parquet.")


## Data Preview

In [72]:
scraped_df = read_data(scraped_data_source)
seed_df = read_data(seed_data_source)

scraped_df.info()
seed_df.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 4233 entries, 0 to 4232
Data columns (total 5 columns):
 #   Column       Non-Null Count  Dtype 
---  ------       --------------  ----- 
 0   name         4233 non-null   object
 1   description  4233 non-null   object
 2   url          4231 non-null   object
 3   source       4233 non-null   object
 4   category     4232 non-null   object
dtypes: object(5)
memory usage: 165.5+ KB
<class 'pandas.core.frame.DataFrame'>
RangeIndex: 72 entries, 0 to 71
Data columns (total 8 columns):
 #   Column        Non-Null Count  Dtype 
---  ------        --------------  ----- 
 0   name          72 non-null     object
 1   description   72 non-null     object
 2   homepage_url  72 non-null     object
 3   category      72 non-null     object
 4   source        72 non-null     object
 5   created_at    72 non-null     object
 6   updated_at    72 non-null     object
 7   trending      72 non-null     object
dtypes: object(8)
memory usage: 4.6+ KB


## Cleaning

Cleaning "tags" column
- separate the values in the list and choose the unique tag. 
- Each tag must be just a value. (i.e list of len 1)
    + no # in value 
    + no duplicate.

In [73]:
def baseline_cleaning(df: pd.DataFrame) -> pd.DataFrame:
    try:
        df = df.drop(columns=[col for col in ['pricing', 'page'] if col in df.columns])
        new_df = df.dropna()
        new_df = new_df.reset_index(drop=True)
        logger.info("Columns dropped and null values dropped.",
                    extra={
                     "Cols dropped": ['pricing', 'page'],
                     "Null Values Dropped": len(df) - len(new_df)
                    }
                    )
    except Exception as e:
        logger.error(f"Error Raised {e}! Is the input a dataframe? Use a pandas dataframe.",  exc_info=True)
    return df


def remove_hashtags(tags):
    try:
        if isinstance(tags, list):
            clean = [tag for tag in tags if '#' not in tag]
        elif isinstance(tags, str):
            clean = [tags] if "#" not in tags else []
        else:
            clean = []
        clean = ','.join(clean)

        if len(clean) < 4:
            clean = clean.upper()
        else:
            clean = clean.lower().capitalize()
    except Exception as e:
        logger.error(f"Error Raised at tags column cleaning {e}! Use tags column.",  exc_info=True)
    return clean


def clean_data(df):
    try:
        # df = baseline_cleaning(df=scraped_df)
        df = df.drop(columns=[col for col in ['pricing', 'page'] if col in df.columns])
        new_df = df.dropna()
        new_df = new_df.reset_index(drop=True)

        if 'tags' in df.columns:
            new_df['tags'] = new_df['tags'].apply(remove_hashtags)
        else:
            pass

        logger.info("Columns dropped and null values dropped.",
                    extra={
                     "Cols dropped": ['pricing', 'page'],
                     "Null Values Dropped": len(df) - len(new_df)
                    }
                    )
        logger.info("Tags Column Successfully cleaned.")
        logger.info("Data successfully cleaned!")
    except Exception as e:
        logger.error(f"Error Raised at full cleaning process: {e}!",  exc_info=True)
    return new_df




## Transformation

In [74]:
def get_created_at(filepath: str) -> str:
    try:
        created_timestamp = os.path.getctime(filepath)
        created_date = datetime.fromtimestamp(created_timestamp)
    except Exception as e:
        logger.error(f"Error Raised: {e}!", exc_info=True)
    return created_date.strftime("%Y-%M-%d")


def transform_data(df: pd.DataFrame, source = None) -> pd.DataFrame:
    try:
        created_day = get_created_at(scraped_data_source)
        if 'source' in df.columns:
            if df['source'] is not None:
                pass
            else:
                df['source'] = source
        else:
            df['source'] = source


        if 'created_at' in df.columns:
            if df['created_at'] is not None:
                pass
            else:
                df['created_at'] = created_day
        else:
            df['created_at'] = created_day
        if 'updated_at' in df.columns:
            if df['updated_at'] is not None:
                pass
            else:
                df['updated_at'] = None
        else:
            df['updated_at'] = None
        

        if 'trending' not in df.columns:
            df['trending'] = None
            df['trending'] = df['trending'].notna().astype(bool)
        else:
            df["trending"] = df["trending"].apply(
                lambda x: False if x == 'Low' else True
                )
            df['trending'] = df['trending'].astype(bool)
    
        trans_df = df.rename(columns={'url': 'homepage_url', 'tags': 'category'})
        
        trans_df['created_at'] = pd.to_datetime(trans_df['created_at'], format="%Y-%M-%d", errors="coerce")
        trans_df['updated_at'] = pd.to_datetime(trans_df['updated_at'], format="%Y-%M-%d", errors="coerce")

        logger.info("Data successfully transformed!")
    except Exception as e:
        logger.error(f"Error Raised at transformation: {e}!", exc_info=True)
    return trans_df

In [76]:
def merging_dfs(new_df, existing_df) -> pd.DataFrame:
    """
    Merging DFs to extract unique ai_tools
    Returns:
        pd.DataFrame: Merged DF with unique Ai tools
    """
    try:
        merged_df = pd.merge(existing_df, new_df, how="outer", suffixes="_existing")
        merged_df.drop_duplicates(subset=[
            "name", "homepage_url"
            ], inplace=True)
        merged_df = merged_df.reset_index(drop=True)
        logger.info("Existing DB Data and Scraped Data successfully merged!")
    except Exception as e:
        logger.error("Error merging DFs: %s", e, exc_info=True)
    return merged_df

## ETL

In [77]:
def run_basic_etl() -> pd.DataFrame:
    # Extract
    scraped_df = read_data(scraped_data_source)

    # Clean
    clean_scraped_df = clean_data(scraped_df)

    # Transform
    trans_scraped_df = transform_data(
        clean_scraped_df, source="https://aitoolsdirectory.com/"
    )

    return trans_scraped_df
    

In [78]:
comp_df = run_basic_etl()

2025-06-01 22:03:55,423 - INFO - Columns dropped and null values dropped.
2025-06-01 22:03:55,431 - INFO - Tags Column Successfully cleaned.


2025-06-01 22:03:55,471 - INFO - Data successfully cleaned!
2025-06-01 22:03:55,637 - INFO - Data successfully transformed!


## Idempotent ETL Job

In [79]:
"""
Ai_tools ETL Local DB Setup and data upload

Name: Arowosegbe Victor Iyanuoluwa\n
Email: Iyanuvicky@gmail.com\n
GitHub: https://github.com/Iyanuvicky22/projects
"""

import os
from dotenv import load_dotenv
from sqlalchemy import (
    create_engine,
    Column,
    String,
    Text,
    Boolean,
    DateTime,
    func,
    Integer,
)
from sqlalchemy.orm import sessionmaker, declarative_base
from sqlalchemy.exc import SQLAlchemyError
import pandas as pd

load_dotenv(dotenv_path=".env")

DB_USER = os.getenv("DB_USER")
DB_PASSWORD = os.getenv("DB_PASSWORD")
DB_HOST = os.getenv("DB_HOST")
DB_PORT = os.getenv("DB_PORT")
DB_NAME = os.getenv("DB_NAME")

DB_URL = f"postgresql://{DB_USER}:{DB_PASSWORD}@{DB_HOST}:{DB_PORT}/{DB_NAME}"

Base = declarative_base()


def connect_db():
    """
    Database connector
    """
    try:
        engine = create_engine(DB_URL)
        Session = sessionmaker(bind=engine, autoflush=False)
        Base.metadata.create_all(engine)
        logger.info("Database succesfully connected to.")
    except SQLAlchemyError as e:
        logger.error("Databse connection error: %s", e, exc_info=True)
    return Session, engine


class Agent(Base):
    """
    Agents table model creation
    Args:
        Base (): SQLAlchemy Base model
    """

    __tablename__ = "agents"

    id = Column(Integer, primary_key=True, autoincrement=True)
    name = Column(String, nullable=False, index=True)
    description = Column(Text)
    homepage_url = Column(String)
    category = Column(String)
    source = Column(String)
    trending = Column(Boolean, default=False)
    created_at = Column(DateTime, server_default=func.now(), nullable=False)
    updated_at = Column(
        DateTime, server_default=func.now(), onupdate=func.now(), nullable=False
    )


def load_data(df: pd.DataFrame):
    """
    Function to load data into the Database (PostgreSQL).
    Args:
        df (pd.DataFrame): Cleaned and Transformed data to be loaded.
    """
    Session, engine = connect_db()
    data = df

    def change_to_none(row):
        row.replace(pd.NaT, '')
        return row


    with Session.begin() as session:
        for _, row in data.iterrows():
            ai_tool = session.query(Agent).filter_by(name=str(row["name"])).first()

            if not ai_tool:
                agent = Agent(
                    name=str(row["name"]),
                    description=str(row["description"]),
                    homepage_url=row["homepage_url"],
                    category=row["category"],
                    source=row["source"],
                    trending=row["trending"],
                    created_at=row["created_at"],
                    updated_at=row["updated_at"],
                )
                session.add(agent)
        session.commit()
        logger.info("Data successfully loaded in database!")


In [80]:
path = r"C:\Users\APIN PC\OneDrive\Documents\DS\DE_Inter\data_epic_capstone\etl\data\20250601_213652_ai_tools_scraped.csv"

def trans_load_seed_df():
    """
    Function to Load Seed Data into Database
    """
    data = read_data(source_path=path)

    clean_df = clean_data(data)

    trans_seed_df = transform_data(df=clean_df)

    # load_data(trans_seed_df)

    return trans_seed_df

## Checking ETL 

In [81]:
ab = trans_load_seed_df()

2025-06-01 22:04:00,715 - INFO - Columns dropped and null values dropped.
2025-06-01 22:04:00,733 - INFO - Tags Column Successfully cleaned.
2025-06-01 22:04:00,748 - INFO - Data successfully cleaned!


2025-06-01 22:04:00,838 - INFO - Data successfully transformed!


In [82]:
ab.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 4230 entries, 0 to 4229
Data columns (total 8 columns):
 #   Column        Non-Null Count  Dtype         
---  ------        --------------  -----         
 0   name          4230 non-null   object        
 1   description   4230 non-null   object        
 2   homepage_url  4230 non-null   object        
 3   source        4230 non-null   object        
 4   category      4230 non-null   object        
 5   created_at    4230 non-null   datetime64[ns]
 6   updated_at    0 non-null      datetime64[ns]
 7   trending      4230 non-null   bool          
dtypes: bool(1), datetime64[ns](2), object(5)
memory usage: 235.6+ KB


In [None]:
ab.drop_duplicates(subset=['homepage_url'])

Unnamed: 0,name,description,homepage_url,category,source,created_at,updated_at,trending
0,X-Design,Specialist AI tool for product photography. Up...,https://www.x-design.com/,Image Editing,https://aitoolsdirectory.com,2025-01-29 00:18:00,NaT,False
1,KIVA,KIVA is an AI SEO tool that automates keyword ...,https://wellows.com/kiva/,SEO,https://aitoolsdirectory.com,2025-01-29 00:18:00,NaT,False
2,Parliant,Parliant AI offers AI-driven conversational su...,https://www.parliant.ai/,Business Intelligence,https://aitoolsdirectory.com,2025-01-29 00:18:00,NaT,False
3,Galaxy AI,Your go-to hub for AI tools. This all-in-one A...,https://link.aitoolsdirectory.com/galaxy,Productivity,https://aitoolsdirectory.com,2025-01-29 00:18:00,NaT,False
4,Dropmagic,Build Shopify stores from AliExpress or produc...,https://link.aitoolsdirectory.com/dropmagic,Marketing,https://aitoolsdirectory.com,2025-01-29 00:18:00,NaT,False
...,...,...,...,...,...,...,...,...
8066,Photofeeler,Photofeeler provides unbiased photo feedback t...,https://www.photofeeler.com?utm_source=toolify,https://www.toolify.ai,Other,2025-01-29 00:18:00,NaT,False
8067,Thundr.tv,"Anonymous, AI-moderated video and text chat pl...",https://www.thundr.tv?utm_source=toolify,https://www.toolify.ai,Other,2025-01-29 00:18:00,NaT,False
8068,ShowZone,MLB The Show tools and resources for strategic...,https://showzone.gg?utm_source=toolify,https://www.toolify.ai,Other,2025-01-29 00:18:00,NaT,False
8069,Dover,Dover helps startups hire top talent with frac...,https://www.dover.com/trial?utm_source=toolify,https://www.toolify.ai,Other,2025-01-29 00:18:00,NaT,False


In [None]:
ab.duplicated(subset=['name', 'homepage_url']).value_counts()

ab[ab.duplicated(subset=['name'])]

def turn_to_none(col):
    for row in col:
        if row == pd.NaT:
            row = None
        else: 
            row = row
    return row

ac = turn_to_none(ab['updated_at'])
print(ac)

NaT


In [None]:
from sqlalchemy import select

def fetch_db_records():
    session, engine = connect_db()

    with engine.connect() as conn:
        db_df = pd.read_sql("SELECT * from agents", con=conn)
        conn.commit()
    return db_df

db_df = fetch_db_records()


2025-06-01 21:32:23,276 - ERROR - Databse connection error: (psycopg2.OperationalError) connection to server at "localhost" (::1), port 5432 failed: Connection refused (0x0000274D/10061)
	Is the server running on that host and accepting TCP/IP connections?
connection to server at "localhost" (127.0.0.1), port 5432 failed: Connection refused (0x0000274D/10061)
	Is the server running on that host and accepting TCP/IP connections?

(Background on this error at: https://sqlalche.me/e/20/e3q8)
Traceback (most recent call last):
  File "c:\Users\APIN PC\OneDrive\Documents\DS\DE_Inter\data_epic_capstone\.venv\Lib\site-packages\sqlalchemy\engine\base.py", line 145, in __init__
    self._dbapi_connection = engine.raw_connection()
                             ~~~~~~~~~~~~~~~~~~~~~^^
  File "c:\Users\APIN PC\OneDrive\Documents\DS\DE_Inter\data_epic_capstone\.venv\Lib\site-packages\sqlalchemy\engine\base.py", line 3297, in raw_connection
    return self.pool.connect()
           ~~~~~~~~~~~~~~~~~^

OperationalError: (psycopg2.OperationalError) connection to server at "localhost" (::1), port 5432 failed: Connection refused (0x0000274D/10061)
	Is the server running on that host and accepting TCP/IP connections?
connection to server at "localhost" (127.0.0.1), port 5432 failed: Connection refused (0x0000274D/10061)
	Is the server running on that host and accepting TCP/IP connections?

(Background on this error at: https://sqlalche.me/e/20/e3q8)

In [None]:
db_df.trending.value_counts()

trending
True     60
False    12
Name: count, dtype: int64

## Functions to Work On

In [None]:
def delta_check(new_df, existing_df):
    # Merge and check for differences
    merged = new_df.merge(
        existing_df, on=["name", "homepage_url"], how="left", suffixes=("", "_existing")
    )
    changed = merged[
        (merged["name"] != merged["name_existing"])
        | (merged["homepage_url"] != merged["homepage_url_existing"])
    ]
    return changed[new_df.columns] 



def upsert_records(conn, df):
    cursor = conn.cursor()
    for _, row in df.iterrows():
        cursor.execute(
            """
            INSERT INTO agents (name, homepage_url, email, phone)
            VALUES (?, ?, ?, ?)
            ON CONFLICT(name, homepage_url)
            DO UPDATE SET email=excluded.email, phone=excluded.phone
        """,
            (row["name"], row["homepage_url"], row["email"], row["phone"]),
        )
    conn.commit()


def etl_job(source_path):
    df = read_data(source_path)

    if needs_transformation(df):
        df = transform_data(df)

    with engine.connect("agents.db") as conn:
        existing_df = fetch_existing_records(conn)
        delta_df = delta_check(df, existing_df)

        if not delta_df.empty:
            upsert_records(conn, delta_df)
            print(f"Upserted {len(delta_df)} records.")
        else:
            print("No changes detected. Idempotent run.")


## Notes

### Pending tasks
- Load Seed data separately.    ```done```
- Load scraped data and check for duplicates with name(lower) and homepage_url.
- Write tests to check for:
    + test for duplicates.
    + test for invalid rows.
    + test for correct upserts.


## FUNCTIONALITY CHECK

In [3]:
from pathlib import Path
import os
from datetime import datetime, timezone
import boto3
from dotenv import load_dotenv
import pandas as pd
import logging

In [4]:
load_dotenv(dotenv_path='.env')
AWS_ACCESS_KEY_ID = os.getenv("AWS_ACCESS_KEY_ID")
AWS_SECRET_KEY = os.getenv("AWS_SECRET_KEY")
AWS_REGION = os.getenv("AWS_REGION")
s3 = boto3.client("s3", region_name=AWS_REGION,
                  aws_access_key_id=AWS_ACCESS_KEY_ID,
                  aws_secret_access_key=AWS_SECRET_KEY)

bucket_name = 'scraped-ai-agent'

logging.basicConfig(
    level=logging.INFO, format="%(asctime)s - %(levelname)s - %(message)s",
    filemode='w',
)
logger = logging.getLogger(__name__)

### Utils Functions

In [50]:

def read_data(source_path: str) -> pd.DataFrame:
    """

    Args:
        source_path (str): Data Path

    Raises:
        ValueError: Raises error for unsupported data type.

    Returns:
        dataframe: Pandas Dataframe.
    """

    try:
        ext = Path(source_path).suffix
        if ext == ".csv":
            return pd.read_csv(source_path)
        elif ext == ".json":
            return pd.read_json(source_path)
        elif ext == ".parquet":
            return pd.read_parquet(source_path)
        logger.info("Data successfully read!")
    except Exception as e:
        logger.error(
            "Error raised at data loading. Unsupported file format! Use csv, json or parquet: %s",
            e,
            exc_info=True,
        )


def remove_hashtags(tags):
    """
    Method to clean category column from "https://aitoolsdirectory.com/"
    Args:
        tags (Series): Column to be cleaned

    Returns:
        Series: Cleaned column.
    """
    try:
        if isinstance(tags, list):
            clean = [tag for tag in tags if "#" not in tag]
        elif isinstance(tags, str):
            clean = [tags] if "#" not in tags else []
        else:
            clean = []
        clean = ",".join(clean)

        if len(clean) < 4:
            clean = clean.upper()
        else:
            clean = clean.lower().capitalize()
    except Exception as e:
        logger.error("Error Raised at tags column cleaning:  %s", e, exc_info=True)
    return clean


def clean_data(df):
    """
    Custom function to clean scraped/manual ai_tools dataset
    Args:
        df (pd.DataFrame): Scraped/manually created ai_tools dataset.

    Returns:
        pd.DataFrame: Cleaned ai_tools dataset.
    """
    try:
        df = df.drop(columns=[col for col in ["pricing", "page"] if col in df.columns])
        new_df = df.dropna()
        new_df = new_df.reset_index(drop=True)

        if "category" in df.columns:
            new_df["category"] = new_df["category"].apply(remove_hashtags)
        else:
            pass

        logger.info(
            "Columns dropped and null values dropped.",
            extra={
                "Cols dropped": ["pricing", "page"],
                "Null Values Dropped": len(df) - len(new_df),
            },
        )
        logger.info("Tags Column Successfully cleaned.")
        logger.info("Data successfully cleaned!")
    except Exception as e:
        logger.error("Error Raised at full cleaning process: %s", e, exc_info=True)
    return new_df


def get_created_at(filepath: str) -> str:
    """
    Extracting file creation date
    Args:
        filepath (str): filepath

    Returns:
        str: creation time in strings
    """
    try:
        created_timestamp = os.path.getctime(filepath)
        created_date = datetime.fromtimestamp(created_timestamp)
    except Exception as e:
        logger.error(f"Error Raised: {e}!", exc_info=True)
    return created_date.strftime("%Y-%M-%d")


def transform_data(df: pd.DataFrame, source=None) -> pd.DataFrame:
    """
    Custom function to transform any scraped and cleaned ai_tools dataset.
    Args:
        df (pd.DataFrame): Clean ai_tools dataset

    Returns:
        pd.DataFrame: Transformed ai_tools dataset
    """
    try:
        # created_day = get_created_at(scraped_data_source)
        if "source" in df.columns:
            if df["source"] is not None:
                pass
            else:
                df["source"] = source
        else:
            df["source"] = source

        if "created_at" in df.columns:
            if df['created_at'] is not None:
                pass
            else:
                df["created_at"] = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
        else:
            df["created_at"] = datetime.now().strftime("%Y-%m-%d %H:%M:%S")

        df["updated_at"] = datetime.now().strftime("%Y-%m-%d %H:%M:%S")

        if "trending" not in df.columns:
            df["trending"] = 0
            # df["trending"] = df["trending"].notna().astype(bool)
        else:
            df["trending"] = df["trending"].apply(
                lambda x: 0 if x == "Low" else 1
            )
        df["trending"] = 0

        trans_df = df.rename(columns={"url": "homepage_url", "tags": "category"})

        trans_df["created_at"] = pd.to_datetime(
            trans_df["created_at"], format="%Y-%m-%d %H:%M:%S", errors="coerce"
        )
        trans_df["updated_at"] = pd.to_datetime(
            trans_df["updated_at"], format="%Y-%m-%d %H:%M:%S", errors="coerce"
        )
        trans_df = trans_df.drop_duplicates(subset=['name', 'homepage_url'])

        logger.info("Data successfully transformed!")
    except Exception as e:
        logger.error("Error Raised at transformation: %s", e, exc_info=True)
    return trans_df


def merging_dfs(new_df, existing_df) -> pd.DataFrame:
    """
    Merging DFs to extract unique ai_tools
    Returns:
        pd.DataFrame: Merged DF with unique Ai tools
    """
    try:
        merged_df = pd.merge(new_df, existing_df, how="outer")
        merged_df.drop_duplicates(subset=[
            "name", "homepage_url"
            ], inplace=True)
        merged_df = merged_df.reset_index(drop=True)
        logger.info("Existing DB Data and Scraped Data successfully merged!")
    except Exception as e:
        logger.error("Error merging DFs: %s", e, exc_info=True)
    return merged_df


def fetch_db_records():
    session, engine = connect_db()

    with engine.connect() as conn:
        db_df = pd.read_sql("SELECT * from agents", con=conn)
        conn.commit()
    return db_df


def dump_raw_data_to_s3(file_path: str):
    try:
        s3.upload_file(file_path, bucket_name, f"{os.path.basename(file_path)}")
        logger.info(f"Successfully upload to s3://{bucket_name}/{file_path}")
        os.remove(file_path)
    except Exception as e:
        logger.error(f"Uploading failed: {e}")


def fetch_latest_csv_from_s3(download_dir='downloads'):

    try:
        response = s3.list_objects_v2(Bucket=bucket_name)
        contents = response.get('Contents', [])

        # Filter for CSV files and sort by last modified time
        csv_files = [obj for obj in contents if obj['Key'].endswith('.csv')]
        if not csv_files:
            logger.info("❌ No CSV files found.")
            return None

        latest_file = max(csv_files, key=lambda x: x['LastModified'])
        latest_key = latest_file['Key']
        filename = os.path.basename(latest_key)
        local_path = os.path.join(download_dir, filename)

        os.makedirs(download_dir, exist_ok=True)
        s3.download_file(bucket_name, latest_key, local_path)

        logger.info(f"✅ Downloaded latest CSV: {latest_key} → {local_path}")
        return local_path

    except Exception as e:
        logger.error(f"❌ Failed to fetch from S3: {e}")
        return None


### Models Functions

In [None]:
import os
from dotenv import load_dotenv
from sqlalchemy import (
    create_engine,
    Column,
    String,
    Text,
    Boolean,
    DateTime,
    func,
    Integer,
)
from sqlalchemy.orm import sessionmaker, declarative_base
from sqlalchemy.exc import SQLAlchemyError
import pandas as pd

load_dotenv(dotenv_path=".env")

DB_USER = os.getenv("DB_USER")
DB_PASSWORD = os.getenv("DB_PASSWORD")
DB_HOST = os.getenv("DB_HOST")
DB_PORT = os.getenv("DB_PORT")
DB_NAME = os.getenv("DB_NAME")

DB_URL = f"postgresql://{DB_USER}:{DB_PASSWORD}@{DB_HOST}:{DB_PORT}/{DB_NAME}"

Base = declarative_base()


def connect_db():
    """
    Database connector
    """
    try:
        engine = create_engine(DB_URL)
        Session = sessionmaker(bind=engine, autoflush=False)
        Base.metadata.create_all(engine)
        logger.info("Database succesfully connected to.")
    except SQLAlchemyError as e:
        logger.error("Databse connection error: %s", e, exc_info=True)
    return Session, engine


class AiAgent(Base):
    """
    Agents table model creation
    Args:
        Base (): SQLAlchemy Base model
    """

    __tablename__ = "ai_agents"

    id = Column(Integer, primary_key=True, autoincrement=True)
    name = Column(String, nullable=False, index=True)
    description = Column(Text)
    homepage_url = Column(String)
    category = Column(String)
    source = Column(String)
    trending = Column(Boolean, default=False)
    created_at = Column(DateTime, server_default=func.now(), nullable=False)
    updated_at = Column(
        DateTime, server_default=func.now(), onupdate=func.now(), nullable=False)


def load_data(df: pd.DataFrame):
    """
    Function to load data into the Database (PostgreSQL).
    Args:
        df (pd.DataFrame): Cleaned and Transformed data to be loaded.
    """
    Session, engine = connect_db()
    data = df

    with Session.begin() as session:
        try:
            for _, row in data.iterrows():
                ai_tool = session.query(AiAgent).filter_by(
                    name=str(row["name"]),
                    homepage_url=row['homepage_url']                                       
                    ).first()

                if ai_tool:
                    ai_tool.description = row.get("description", ai_tool.description)
                    ai_tool.category = row.get("category", ai_tool.category)
                    ai_tool.source = row.get("source", ai_tool.source)
                    ai_tool.updated_at = row.get("updated_at", ai_tool.updated_at)
                else:
                    ai_tool = AiAgent(
                        name=str(row["name"]),
                        description=str(row["description"]),
                        homepage_url=row["homepage_url"],
                        category=row["category"],
                        source=row["source"],
                        trending=row["trending"],
                        created_at=row["created_at"],
                        updated_at=row["updated_at"],
                    )
                    session.add(ai_tool)
            session.commit()
            logger.info("Data successfully loaded in database!")
        except Exception as e:
            logger.error("Data upload failed: %s", e, exc_info=True)
            session.rollback()
        finally:
            session.close()


### ETL Check

In [52]:
def run_basic_etl() -> pd.DataFrame:
    """
    Basic ETL Job.
    Returns:
        pd.DataFrame: Ai tools data to run etl job on.
    """
    # download latest file from s3
    scraped_data_source = fetch_latest_csv_from_s3()

    scraped_df = read_data(scraped_data_source)

    clean_scraped_df = clean_data(scraped_df)

    trans_scraped_df = transform_data(clean_scraped_df)
    
    existing_db_df = fetch_db_records()

    final_df = merging_dfs(trans_scraped_df, existing_db_df)

    load_data(final_df)

    return final_df

In [53]:
ab = run_basic_etl()
ab.info()

2025-06-02 14:54:43,579 - INFO - ✅ Downloaded latest CSV: 20250601_234748_ai_tools_scraped.csv → downloads\20250601_234748_ai_tools_scraped.csv
2025-06-02 14:54:44,364 - INFO - Columns dropped and null values dropped.
2025-06-02 14:54:44,368 - INFO - Tags Column Successfully cleaned.
2025-06-02 14:54:44,370 - INFO - Data successfully cleaned!
2025-06-02 14:54:44,517 - INFO - Data successfully transformed!
2025-06-02 14:54:46,000 - INFO - Database succesfully connected to.
2025-06-02 14:54:46,368 - INFO - Existing DB Data and Scraped Data successfully merged!
2025-06-02 14:54:46,999 - INFO - Database succesfully connected to.
2025-06-02 14:55:12,474 - INFO - Data successfully loaded in database!


<class 'pandas.core.frame.DataFrame'>
RangeIndex: 6986 entries, 0 to 6985
Data columns (total 9 columns):
 #   Column        Non-Null Count  Dtype         
---  ------        --------------  -----         
 0   name          6986 non-null   object        
 1   description   6986 non-null   object        
 2   homepage_url  6986 non-null   object        
 3   source        6986 non-null   object        
 4   category      6986 non-null   object        
 5   created_at    6986 non-null   datetime64[ns]
 6   updated_at    6986 non-null   datetime64[ns]
 7   trending      6986 non-null   object        
 8   id            72 non-null     float64       
dtypes: datetime64[ns](2), float64(1), object(6)
memory usage: 491.3+ KB


In [42]:
# scraped['created_at'] = datetime.now().strftime("%d/%m/%Y, %H:%M:%S")
ab.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 6986 entries, 0 to 6985
Data columns (total 9 columns):
 #   Column        Non-Null Count  Dtype         
---  ------        --------------  -----         
 0   name          6986 non-null   object        
 1   description   6986 non-null   object        
 2   homepage_url  6986 non-null   object        
 3   source        6986 non-null   object        
 4   category      6986 non-null   object        
 5   created_at    6986 non-null   datetime64[ns]
 6   updated_at    6986 non-null   datetime64[ns]
 7   trending      6986 non-null   bool          
 8   id            72 non-null     float64       
dtypes: bool(1), datetime64[ns](2), float64(1), object(5)
memory usage: 443.6+ KB
