In [None]:
# importing libraries and modules
import json
import csv
import requests
import logging
import logging
import psycopg2
import pandas as pd
from bs4 import BeautifulSoup
from sqlalchemy import create_engine

In [None]:
def request_json_data(params):
    """
    This function retrieves json data from the specified URL. 
    If an error occurs while retrieving the data, the function 
    logs the error message
    """
    url = params['urls'][0]
    headers = params['headers']
    try:
        response = requests.get(url, headers=headers)
        data = response.json()
        return data
    except Exception as e:
        logging.error(
            f"An error occurred while retrieving data from {url}: {e}")
        return None

In [None]:
def write_json_data(params, data):
    """
    This function writes json data to a specific file.
    If an error occurs while writing the data, the function
    logs the error message
    """
    filename = params['filenames'][0]
    try:
        with open(filename, 'w') as json_file:
            print(f"Response is sucessful. Writing json data to {filename}.")
            json.dump(data, json_file)
    except Exception as e:
        logging.error(f"An error occurred while saving data: {e}")

In [None]:
def data_cleaning(params):
    """
    This function cleans and organizes data in dataframes and write them to csv files. 
    If an error occurs while processing the data, the function logs the error message
    """
    if type(params[0]) == str:
        try:
            df = pd.read_json(params[0])
            flat_df = pd.json_normalize(df[params[1]])
            flat_df = flat_df.drop_duplicates(subset=['id'])
            flat_df['category_id'] = flat_df['id']
            flat_df['category_name'] = flat_df['name']
            flat_df['category_name'] = flat_df['category_name'].astype(str)
            flat_df['category_id'] = flat_df['category_id'].astype(int)
            flat_df.drop(['id', 'name', 'link'], axis=1, inplace=True)
            flat_df.to_csv(params[2][0], index=False)
        except Exception as e:
            logging.error(
                f"An error occurred while creating csv file {params[2][0]}: {e}")
    elif type(params[0]) == list:
        try:
            obj = {}
            obj['laptop_name'] = params[0]
            obj['quantity_sold'] = params[1]
            obj['unit_price'] = params[2]
            obj['shipping_cost'] = params[3]
            obj['store_name'] = params[4]
            obj['discount_percent'] = params[5]
            df = pd.DataFrame.from_dict(obj, orient='index')
            df = df.transpose()

            # create a series of numbers from 1 to the len of df for laptop_id
            df = df.assign(laptop_id=pd.Series(
                range(1, len(df)+1), dtype="int"))

            # Converting laptop_name from object type to string type
            df["laptop_name"] = df["laptop_name"].astype(str)

            df["quantity_sold"] = df["quantity_sold"].str.replace(
                '[^0-9]', '', regex=True)
            df["quantity_sold"] = df["quantity_sold"].fillna('0')
            df["quantity_sold"] = df["quantity_sold"].astype(int)

            df['unit_price'] = df['unit_price'].str.replace(
                '[^0-9.]', '', regex=True)
            df['unit_price'] = df['unit_price'].astype(float)

            df['shipping_cost'] = df['shipping_cost'].str.replace(
                'Free shipping', '0', regex=True)
            df['shipping_cost'] = df['shipping_cost'].str.replace(
                '[^0-9.]', '', regex=True)
            df['shipping_cost'] = df['shipping_cost'].astype(float)

            df['discount_percent'] = df['discount_percent'].str.replace(
                '[^0-9]', '', regex=True)
            df['discount_percent'] = df['discount_percent'].fillna('0')
            df['discount_percent'] = df['discount_percent'].astype(float)

            # create category_id column in df with default id of 702 (laptop)
            df = df.assign(category_id=702)

            # Placing laptop_id as the first column in  df
            laptop_id_col = df.pop('laptop_id')
            df.insert(0, 'laptop_id', laptop_id_col)

            laptop_df = df.copy()

            laptop_df.drop(['unit_price', 'quantity_sold',
                            'shipping_cost', 'discount_percent'], axis=1, inplace=True)

            laptop_df.to_csv("laptop.csv", index=False)

            # Drop rows where quantity is 0 in order to create sales tables
            for x in df.index:
                if df.loc[x, "quantity_sold"] == 0:
                    df.drop(x, inplace=True)

            df.drop(['store_name', 'category_id',
                     'laptop_name'], axis=1, inplace=True)

            df = df.assign(sales_id=pd.Series(
                range(1, len(df)+1), dtype="int"))

            first_col = df.pop('sales_id')
            df.insert(0, 'sales_id', first_col)

            sales_df = df.copy()
            sales_df.to_csv("sales.csv", index=False)
            print("Data is successfully extracted, cleaned and loaded into csv files.")
        except Exception as e:
            logging.error(
                f"An error occurred while cleaning or loading data into csv files: {e}")

In [None]:
def parser_html(response, tag, attribute):
    """
    This function parses the HTML content of the response 
    and returns a list of text data for the specified HTML 
    tag and class attribute. If an error occurs while parsing 
    the HTML content, the function logs the error message. 
    """
    if response is None:
        return None
    try:
        soup = BeautifulSoup(response.content, 'html.parser')
        data_list = soup.find_all(tag, class_=attribute)
        return [data.text for data in data_list]
    except Exception as e:
        logging.error(f"An error occurred while parsing HTML content: {e}")
        return None

In [None]:
def request_html_data(params):
    """
    This function retrieves the HTML content of the website, 
    and then calls the parse_html function to extract the 
    relevant information. The information is returned as a list of lists
    """
    start = params['webpages']['start']
    end = params['webpages']['end']
    tags = params['tags']
    attributes = params['attributes']
    all_list = params['all_list']
    for num in range(start, end):
        try:
            response = requests.get(params['urls'][1])
            response.raise_for_status()
        except requests.exceptions.RequestException as err:
            print(f"An error occurred: {err}")
            break
        else:
            print(
                f"Response is sucessful. Extracting html data from webpage {num}.")
            for i in range(0, len(tags)):
                all_list[i] += parser_html(response, tags[i], attributes[i])
            url = f'https://www.aliexpress.com/w/wholesale-laptop.html?page={num+1}&g=y&SearchText=laptop'
    return all_list

In [None]:
def db_connect(params):
    """
    This function creates a connection to the database using the
    parameter passed into it and returns the connection object
    """
    conn = psycopg2.connect(
        database=params['database'],
        user=params['user'],
        password=params['password'],
        host=params['host'],
        port=params['port']
    )
    return conn

In [None]:
# sql_stmt_list is a list of sql statements to create schema and tables and insert
sql_stmt_list = [
    [
        "DROP DATABASE IF EXISTS AliExpress;",
        "CREATE AliExpress;",
        "CREATE SCHEMA lap;",
        """
        CREATE TABLE IF NOT EXISTS lap.category (
            category_id INTEGER PRIMARY KEY,
            category_name VARCHAR(255)
        );
        """,
        """
        CREATE TABLE IF NOT EXISTS lap.laptop (
            laptop_id SERIAL PRIMARY KEY,
            laptop_name VARCHAR(255),
            store_name VARCHAR(255), 
            category_id INTEGER
        );
        """,
        """
        CREATE TABLE IF NOT EXISTS lap.sales (
            sales_id SERIAL PRIMARY KEY,
            laptop_id INTEGER,
            unit_price DECIMAL(10, 2),
            quantity_sold INTEGER, 
            discount_percent DECIMAL(10, 2),
            shipping_cost DECIMAL(10, 2),
            FOREIGN KEY (laptop_id) REFERENCES lap.laptop(laptop_id)
        );
        """
    ],
    [
        "INSERT INTO lap.category (category_id,category_name) VALUES (%s, %s);",
        """INSERT INTO lap.laptop (laptop_id,laptop_name,store_name,category_id) VALUES (%s, %s, %s, %s);""",
        """INSERT INTO lap.sales (sales_id,laptop_id,quantity_sold,unit_price,discount_percent,shipping_cost) VALUES (%s, %s, %s, %s, %s, %s);"""
    ]
]

In [None]:
def create_tables(sql_stmt_list, conn):
    """
    This function executes the sql statement object to create tables and schema and closes the
    connection. It accepts sql statement list, and connection object
    """
    cursor = conn.cursor()
    for stmt in sql_stmt_list[0]:
        cursor.execute(stmt)
    conn.commit()

In [None]:
def insert_data(params, conn, sql_stmt_list):
    """
    This function loads the data in csv files into their repective tables in
    the database postgres. It accepts a list of csv files, connection object
    and sql statement parameters
    """
    sql_stmt = sql_stmt_list[1]
    cursor = conn.cursor()
    print("Loading data into tables in database.")
    for i, csv_file in enumerate(params['filenames'][2]):
        try:
            with open(csv_file, 'r') as file:
                reader = csv.reader(file)
                next(reader)  # skip the header row
                for row in reader:
                    cursor.execute(sql_stmt[i], row)
            print(
                f"Finished loading data into {params['table_names'][i]} tables")
        except Exception as e:
            logging.error(
                f"An error occurred while loading data into {params['table_names'][i]} tables: {e}")
    conn.commit()
    cursor.close()
    conn.close()

In [None]:
def main():
    """
    This function executes the other functions and regulate
    the whole program
    """
    params = {
        'database': 'AliExpress',
        'user': 'postgres',
        'password': 'password',
        'host': 'localhost',
        'port': '5432',
        'table_names': ['lap.category', 'lap.laptop', 'lap.sales'],
        "filenames": ['categories.json', 'categories', ['category.csv', 'laptop.csv', 'sales.csv']],
        'urls': [
                    "https://ali-express1.p.rapidapi.com/categories",
                    "https://www.aliexpress.com/w/wholesale-laptop.html?g=y&SearchText=laptop"
        ],
        'headers': {
            "X-RapidAPI-Key": "664a752285msh4ae3ff3d99b12a1p13a3e9jsn5dbb9e535753",
            "X-RapidAPI-Host": "ali-express1.p.rapidapi.com"
        },
        'webpages': {
            'start': 1,
            'end': 5
        },
        'tags': ["h1", "span", "div", "span", "span", "span"],
        'attributes': [
            "multi--titleText--nXeOvyr",
            "multi--trade--Ktbl2jB",
            "multi--price-sale--U-S0jtj",
            "tag--text--1BSEXVh tag--textStyle--3dc7wLU multi--serviceStyle--1Z6RxQ4",
            "cards--store--3GyJcot",
            "tag--text--1BSEXVh tag--textStyle--3dc7wLU multi--superStyle--1jUmObG"
        ],
        # The length of the all_list must be the same as the length of tags or attributes
        'all_list': [[], [], [], [], [], []]
    }

    json_data = request_json_data(params)
    if json_data is not None:
        write_json_data(params, json_data)
        data_cleaning(params['filenames'])

    all_list = request_html_data(params)
    data_cleaning(all_list)
    connection = db_connect(params)
    create_tables(sql_stmt_list, connection)
    insert_data(params, connection, sql_stmt_list)


main()