In [14]:
from utility import MongoDBHandler
import pandas as pd
from pymongo import UpdateOne

In [15]:
def remove_suffixes(df, *args):
    for i in args:
        df = df.rename(columns={col: col.replace(i,'') for col in df.columns})
    return df

def add_suffixes(df, suffix):
    return df.rename(columns={col: col+suffix for col in df.columns})

In [16]:
mongo_handler = MongoDBHandler('mongodb://localhost:27017', 'ecommerce')
mongo_handler.connect()

In [10]:
df_product = pd.DataFrame(list(mongo_handler.find('stg_amz__product_details', {})))
df_bs = pd.DataFrame(list(mongo_handler.find('trf_amz__best_sellers', {'isLatest':True})))

In [13]:
pd.merge(df_bs, df_product,on='asin', how='inner')[['asin']].drop_duplicates()

Unnamed: 0,asin
0,B0BK1457X3
2,B07XTJDJHN
4,B07PK41FL4
6,B0C7BM18PB
8,B01GRJGM0Y
...,...
4578,B08WPWTZN4
4580,B0D2V7MY5Y
4582,B07RDX8MLH
4650,B0CXXLRQRW


In [20]:
df_trf = pd.DataFrame(list(mongo_handler.find('trf_amz__best_sellers', {'isLatest': True})))
df_trf['d_rank'] = df_trf.groupby(['category','subCategory','rank'])['loadTimestamp'].rank(method='dense', ascending=True)
df_upsert = df_trf[df_trf['d_rank']>1][['_id', 'isLatest']].assign(isLatest=False)
df_upsert

Unnamed: 0,_id,isLatest


In [19]:
if not df_upsert.empty:
    mongo_handler.bulk_upsert(
        'trf_amz__best_sellers', 
        df_upsert, 
        filter_cols=['_id'],
        upsert=True
    )
else:
    print("NO DATA TO UPSERT")
    

In [27]:
import psycopg2
from scrapy.utils.project import get_project_settings
from psycopg2.extras import RealDictCursor, DictCursor, execute_values

class PostgresDBHandler:
    def __init__(self, host, database, user, password, port=5432):
        """Initialize connection parameters."""
        self.host = host
        self.database = database
        self.user = user
        self.password = password
        self.port = port
        self.connection = None

    def connect(self):
        """Establish a connection to the PostgreSQL database."""
        try:
            self.connection = psycopg2.connect(
                host=self.host,
                database=self.database,
                user=self.user,
                password=self.password,
                port=self.port
            )
        except psycopg2.Error as e:
            print(f"Error connecting to PostgreSQL database: {e}")
            raise

    def close(self):
        """Close the database connection."""
        if self.connection:
            self.connection.close()

    def insert(self, table, data):
        """
        Insert a record into the specified table.
        
        Args:
            table (str): Name of the table.
            data (dict): Dictionary of column-value pairs to insert.
        """
        columns = ', '.join(data.keys())
        values = ', '.join(['%s'] * len(data))
        query = f"INSERT INTO {table} ({columns}) VALUES ({values}) RETURNING id;"
        
        try:
            with self.connection.cursor() as cursor:
                cursor.execute(query, tuple(data.values()))
                self.connection.commit()
                return cursor.fetchone()[0]
        except psycopg2.Error as e:
            self.connection.rollback()
            print(f"Error inserting into {table}: {e}")
            raise

    def read(self, table, columns='*', conditions=None):
        """
        Read records from the specified table.
        
        Args:
            table (str): Name of the table.
            columns (str or list): Columns to retrieve (default is '*').
            conditions (str): WHERE clause conditions (optional).
        
        Returns:
            list[dict]: List of retrieved records as dictionaries.
        """
        if isinstance(columns, list):
            columns = ', '.join(columns)
        query = f"SELECT {columns} FROM {table}"
        if conditions:
            query += f" WHERE {conditions}"
        
        try:
            with self.connection.cursor(cursor_factory=RealDictCursor) as cursor:
                cursor.execute(query)
                return cursor.fetchall()
        except psycopg2.Error as e:
            print(f"Error reading from {table}: {e}")
            raise

    def update(self, table, data, conditions):
        """
        Update records in the specified table.
        
        Args:
            table (str): Name of the table.
            data (dict): Dictionary of column-value pairs to update.
            conditions (str): WHERE clause conditions.
        """
        set_clause = ', '.join([f"{col} = %s" for col in data.keys()])
        query = f"UPDATE {table} SET {set_clause} WHERE {conditions}"
        
        try:
            with self.connection.cursor() as cursor:
                cursor.execute(query, tuple(data.values()))
                self.connection.commit()
                return cursor.rowcount  # Number of rows updated
        except psycopg2.Error as e:
            self.connection.rollback()
            print(f"Error updating {table}: {e}")
            raise

    def bulk_upsert(self, table, data, conflict_columns, update_columns):
        """
        Perform a bulk upsert operation.

        Args:
            table (str): Name of the table.
            data (list[dict]): List of dictionaries, where keys are column names, and values are the row data.
            conflict_columns (list[str]): List of columns to check for conflicts (e.g., primary or unique keys).
            update_columns (list[str]): List of columns to update on conflict.
        """
        # Ensure data is provided
        if not data:
            raise ValueError("No data provided for upsert.")

        # Generate the SQL parts dynamically
        columns = data[0].keys()  # Assume all rows have the same keys
        column_list = ", ".join(columns)
        value_placeholders = ", ".join([f"%({col})s" for col in columns])

        conflict_clause = ", ".join(conflict_columns)
        update_clause = ", ".join([f"{col} = EXCLUDED.{col}" for col in update_columns])

        query = f"""
            INSERT INTO {table} ({column_list})
            VALUES ({value_placeholders})
            ON CONFLICT ({conflict_clause})
            DO UPDATE SET
            {update_clause};
        """

        try:
            with self.connection.cursor() as cursor:
                # Use execute_values for efficiency
                execute_values(cursor, query, data)
                self.connection.commit()
                print(f"Bulk upsert completed for {len(data)} rows.")
        except psycopg2.Error as e:
            self.connection.rollback()
            print(f"Error during bulk upsert: {e}")
            raise
    
    def execute_stored_procedure(self, procedure_name):
        """
        Execute a stored procedure in PostgreSQL.

        Args:
            procedure_name (str): The name of the stored procedure, including the schema (e.g., 'processed.sp_update_master_tables').
        """
        try:
            with self.connection.cursor() as cursor:
                # Execute the stored procedure
                cursor.execute(f"CALL {procedure_name}();")
                self.connection.commit()
                print(f"Stored procedure {procedure_name} executed successfully.")
        except psycopg2.Error as e:
            self.connection.rollback()
            print(f"Error executing stored procedure {procedure_name}: {e}")
            raise
    
    def execute(self, query, type=None):
        """
        Execute a query in PostgreSQL.

        Args:
            query (str): The whole sql query to be executed (e.g. select * from processed.amz__product_category)
        """
        try:
            with self.connection.cursor(cursor_factory=RealDictCursor) as cursor:
                # Execute the stored procedure
                cursor.execute(query)
                self.connection.commit()
                print(f"Query executed successfully.")
                if type is None :
                    return cursor.fetchall()
        except psycopg2.Error as e:
            self.connection.rollback()
            print(f"Error executing query {query}: {e}")
            raise
    
    
    def delete(self, table, conditions):
        """
        Delete records from the specified table.
        
        Args:
            table (str): Name of the table.
            conditions (str): WHERE clause conditions.
        """
        query = f"DELETE FROM {table} WHERE {conditions}"
        
        try:
            with self.connection.cursor() as cursor:
                cursor.execute(query)
                self.connection.commit()
                return cursor.rowcount  # Number of rows deleted
        except psycopg2.Error as e:
            self.connection.rollback()
            print(f"Error deleting from {table}: {e}")
            raise



def getCategoryUrls(db='mongo'):
    if db == 'mongo':
        mongo_handler = MongoDBHandler(
            settings.get('MONGO_URI'),
            settings.get('MONGO_DATABASE'))
        results_category = mongo_handler.find(
            "amz__product_category", 
            {'IsActive':True},
            {'Category':1, 'SubCategory':{'$literal':''}, 'Url':1, '_id':0}
        )

        results_subCategory = mongo_handler.find(
            "amz__product_subcategory", 
            {'IsActive':True},
            {'Category':1, 'SubCategory':1, 'Url':1, '_id':0}
        )
        # print(list(results_category),list(results_subCategory))
        results = list(results_category)+list(results_subCategory)
        return results if results else []
    elif db == 'postgres':
        settings = get_project_settings()
        postgres_handler = PostgresDBHandler(
                settings.get('POSTGRES_HOST'),
                settings.get('POSTGRES_DATABASE'),
                settings.get('POSTGRES_USERNAME'),
                settings.get('POSTGRES_PASSWORD'),
                settings.get('POSTGRES_PORT'),
            )
        postgres_handler.connect()
        if settings.get('LOAD_TYPE') == 'INCREMENTAL':
            return postgres_handler.execute(
                """
                SELECT category, '' as sub_category, url FROM processed.amz__product_category 
                WHERE is_active = true AND cast(last_refreshed_timestamp as date) < CURRENT_DATE
                UNION
                SELECT category, sub_category, url FROM processed.amz__product_subcategory 
                WHERE is_active = true AND cast(last_refreshed_timestamp as date) < CURRENT_DATE
                """
            )
        elif settings.get('LOAD_TYPE') == 'FULLREFRESH':
            return postgres_handler.execute(
                """
                select category, '' as sub_category, url from processed.amz__product_category WHERE is_active = true
                union 
                select category, sub_category, url from processed.amz__product_subcategory WHERE is_active = true
                """
            )

In [29]:
print([i['url'] for i in getCategoryUrls(db='postgres')])

# settings = get_project_settings()
# postgres_handler = PostgresDBHandler(
#         settings.get('POSTGRES_HOST'),
#         settings.get('POSTGRES_DATABASE'),
#         settings.get('POSTGRES_USERNAME'),
#         settings.get('POSTGRES_PASSWORD'),
#         settings.get('POSTGRES_PORT'),
#     )
# postgres_handler.connect()

# postgres_handler.execute('CALL processed.sp_process_best_seller();')

Query executed successfully.
['https://www.amazon.in/gp/bestsellers/luggage', 'https://www.amazon.in/gp/bestsellers/apparel', 'https://www.amazon.in/gp/bestsellers/industrial/7124091031', 'https://www.amazon.in/gp/bestsellers/kitchen', 'https://www.amazon.in/gp/bestsellers/shoes', 'https://www.amazon.in/gp/bestsellers/industrial/10572875031', 'https://www.amazon.in/gp/bestsellers/home-improvement/10615970031', 'https://www.amazon.in/gp/bestsellers/shoes/1983338031', 'https://www.amazon.in/gp/bestsellers/home-improvement/4286644031', 'https://www.amazon.in/gp/bestsellers/electronics', 'https://www.amazon.in/gp/bestsellers/home-improvement/10615921031', 'https://www.amazon.in/gp/bestsellers/home-improvement/29594128031', 'https://www.amazon.in/gp/bestsellers/automotive/5257478031', 'https://www.amazon.in/gp/bestsellers/hpc', 'https://www.amazon.in/gp/bestsellers/home-improvement/4286642031', 'https://www.amazon.in/gp/bestsellers/beauty', 'https://www.amazon.in/gp/bestsellers/industrial/7

In [30]:
def getUrlToScrap(db:'mongo | postgres'='mongo'):
    settings = get_project_settings()
    if db == 'mongo':
        mongo_handler = MongoDBHandler(
            settings.get('MONGO_URI'),
            settings.get('MONGO_DATABASE'))
        urls = mongo_handler.find(
            "trf_amz__best_sellers", 
            {'isLatest':True},
            {'productUrl':1, '_id':0}
        )
        return list(urls)
    elif db == 'postgres':
        postgres_handler = PostgresDBHandler(
                settings.get('POSTGRES_HOST'),
                settings.get('POSTGRES_DATABASE'),
                settings.get('POSTGRES_USERNAME'),
                settings.get('POSTGRES_PASSWORD'),
                settings.get('POSTGRES_PORT'),
            )
        postgres_handler.connect()
        return postgres_handler.execute(
            """
                select distinct product_url from processed.amz__best_sellers where asin not in (select distinct asin from staging.stg_amz__product_details)
            """
        )

In [37]:
'.'.isdigit()

False

In [38]:
def extract_number(num):
    return ''.join([i for i in num if i.isdigit() or i=='.'])

In [40]:
extract_number('1,297 ratings')

'1297'

In [79]:
import re
from datetime import datetime as dt

In [81]:
date_string = " 25 September 2023 "
date_object = dt.strptime(date_string.strip(), "%d %B %Y")
print(date_object)

2023-09-25 00:00:00


In [92]:
def extract_numeric_part(value):
    if isinstance(value, str):
        # extract the numeric part
        x = [i for i in value.split(' ') if re.search(r'\d', i)]
        if len(x) > 1:
            raise Exception(f"Expected One Numeric Part, got {len(x)}:{x}")
        # remove any special character exclude: decimal and alphanumeric letters
        if x:
            x = re.sub(r'[^a-zA-Z0-9\s\.]', '', x[0]).lower()
            if x[-1] == 'k':
                x = float(x.replace('k',''))*1000
            elif x[-1] == 'l':
                x = float(x.replace('l',''))*1_000_000
            else:
                x = float(x)
            return x
        return None

In [None]:
extract_numeric_part('')
# value = ''
# if [i for i in value.split(' ') if re.search(r'\d', i)]:
#     print(True)
# else:
#     print(False)

: 

In [None]:

# Example Usage:
if __name__ == "__main__":
    # Connection parameters
    db_config = {
        "host": "localhost",
        "database": "your_database",
        "user": "your_user",
        "password": "your_password",
        "port": 5432
    }

    # Initialize the CRUD class
    db = PostgreSQLCRUD(**db_config)

    # Connect to the database
    db.connect()

    # Example CRUD operations
    try:
        # CREATE
        data_to_insert = {"column1": "value1", "column2": "value2"}
        new_id = db.create("your_table", data_to_insert)
        print(f"Inserted record with ID: {new_id}")

        # READ
        records = db.read("your_table", columns=["column1", "column2"], conditions="column1 = 'value1'")
        print(f"Retrieved records: {records}")

        # UPDATE
        updated_rows = db.update("your_table", {"column2": "new_value"}, "column1 = 'value1'")
        print(f"Updated rows: {updated_rows}")

        # DELETE
        deleted_rows = db.delete("your_table", "column1 = 'value1'")
        print(f"Deleted rows: {deleted_rows}")

    finally:
        # Close the connection
        db.close_connection()
