In [4]:
from sqlalchemy import create_engine
import pandas as pd

# Step 1: Establish a connection to the PostgreSQL database using SQLAlchemy 
# This is the most important part the postgresql needs to be running and correctly configured
# postgres is the username and password is password established and database is called name of the database.
# my defualt port is 5432 but it can be different
# again this is the most important part of the code
engine = create_engine('postgresql+psycopg2://username:password@host:port/database')


# List of specific tables to query
specific_tables = [
    'categories',
    'group_deals',
    'groups',
    'groups_carts',
    'orders',
    'product_names',
    'products',
    'users',
    'vendors'
]

# For each table, retrieve the columns
for table in specific_tables:
    query_columns = f"""
    SELECT column_name, data_type
    FROM information_schema.columns
    WHERE table_name = '{table}';
    """
    columns = pd.read_sql(query_columns, engine)
    print(f"\n{table}:")
    print(columns)


categories:
          column_name                    data_type
0                  id                         uuid
1              status                 USER-DEFINED
2          created_at  timestamp without time zone
3          updated_at  timestamp without time zone
4          deleted_at  timestamp without time zone
5          created_by                         uuid
6                name            character varying
7   short_description                         text
8    long_description                         text
9       detail_images                        ARRAY
10      primary_image            character varying

group_deals:
               column_name                    data_type
0                       id                         uuid
1               product_id                         uuid
2         max_group_member                      integer
3              group_price                      numeric
4               created_at  timestamp without time zone
5               updated_a

In [5]:
# Query to show top 1 row for each table in specific_tables and show it
for table in specific_tables:
    query_data = f"""
    SELECT *
    FROM {table}
    LIMIT 1;
    """
    data = pd.read_sql(query_data, engine)
    print(f"\n{table}:")
    print(data)


categories:
                                     id       name short_description  \
0  af95ccc0-6ddb-4b10-82b2-4de8dabfcc46  Vegetable              desc   

  long_description  status                                  primary_image  \
0             desc  ACTIVE  category/318486d9-c483-4b8b-88ce-1ef17978ff3f   

                                       detail_images  \
0  [category/a7b3db05-0d38-4db7-9ee0-b10dfb446e6c...   

                  created_at updated_at deleted_at  \
0 2023-10-12 10:00:38.345655       None       None   

                             created_by  
0  ad752023-a7af-4b14-95d0-6399f95c6eb6  

group_deals:
                                     id                            product_id  \
0  07d7d32e-ee7e-4594-8917-f604db617b4d  7af7c574-12e0-4d0e-a030-2721d28271a7   

   max_group_member  group_price                 created_at updated_at  \
0                 3         67.0 2023-10-12 10:34:14.582614       None   

                  deleted_at             effective_from

# Part 1: Database and Data Understanding

## 1. Complex Querying

### 1.1.1 Write a SQL query to fetch the top 10 users who contributed the highest revenue within the last 30 days, along with the number of groups they participated in and the categories of products they purchased.

In [None]:
WITH date_30_days_ago AS (
    SELECT MAX(created_at) - INTERVAL '30 days' AS date_30_days_ago
    FROM orders
),
filtered_orders AS (
    SELECT o.*, gc.user_id, g.group_deals_id, gd.product_id
    FROM orders o
    JOIN groups_carts gc ON o.groups_carts_id = gc.id
    JOIN groups g ON gc.group_id = g.id
    JOIN group_deals gd ON g.group_deals_id = gd.id
    WHERE o.created_at >= (SELECT date_30_days_ago FROM date_30_days_ago)
      AND o.status = 'COMPLETED' -- Include only completed orders
),
user_revenue AS (
    SELECT 
        fo.user_id, 
        SUM(fo.total_amount) AS total_revenue
    FROM filtered_orders fo
    GROUP BY fo.user_id
),
user_group_count AS (
    SELECT 
        gc.user_id, 
        COUNT(DISTINCT gc.group_id) AS group_count
    FROM groups_carts gc
    JOIN filtered_orders fo ON gc.id = fo.groups_carts_id
    GROUP BY gc.user_id
),
user_product_categories AS (
    SELECT 
        fo.user_id, 
        ARRAY_AGG(DISTINCT pn.category_id) AS purchased_categories
    FROM filtered_orders fo
    JOIN products p ON fo.product_id = p.id
    JOIN product_names pn ON p.name_id = pn.id
    GROUP BY fo.user_id
)
SELECT 
    u.id AS user_id,
    u.name AS user_name,
    COALESCE(ur.total_revenue, 0) AS total_revenue,
    COALESCE(ugc.group_count, 0) AS group_count,
    COALESCE(upc.purchased_categories, ARRAY[]::uuid[]) AS purchased_categories
FROM users u
LEFT JOIN user_revenue ur ON u.id = ur.user_id
LEFT JOIN user_group_count ugc ON u.id = ugc.user_id
LEFT JOIN user_product_categories upc ON u.id = upc.user_id
ORDER BY total_revenue DESC
LIMIT 10;

### 1.1.1 Top 10 Users by Revenue

The following table lists the top 10 users who contributed the highest revenue within the last 30 days, along with the number of groups they participated in and the categories of products they purchased.

| user_id                               | user_name | total_revenue | group_count | purchased_categories                                                                                           |
|---------------------------------------|-----------|---------------|-------------|---------------------------------------------------------------------------------------------------------------|
| 02afef07-11f8-4a29-848a-ebae8c3dd783  | Nahom     | 22107.60      | 79          | 33cd8255-6371-4baf-9888-555973935748,4f78f76d-ab7e-4553-9535-93b113150f20,6fe71c72-7796-466a-acbf-aa1c93f1a123,9a00a819-d1f5-49a3-85b6-b1d9a97ef9ed,af95ccc0-6ddb-4b10-82b2-4de8dabfcc46,d24dd74f-bb40-49bc-a19c-f9e90d77f4ea,ed470df7-f8ae-4d26-8865-7241f085785b |
| cdb909c0-8549-4c1a-9345-8a6c22e09dc8  | Dero      | 21982.00      | 68          | 33cd8255-6371-4baf-9888-555973935748,4f78f76d-ab7e-4553-9535-93b113150f20,6fe71c72-7796-466a-acbf-aa1c93f1a123,9a00a819-d1f5-49a3-85b6-b1d9a97ef9ed,af95ccc0-6ddb-4b10-82b2-4de8dabfcc46,d24dd74f-bb40-49bc-a19c-f9e90d77f4ea,ed470df7-f8ae-4d26-8865-7241f085785b |
| a69c75d9-31d2-49fc-98a3-93d937bb011f  | fikadu    | 18405.00      | 72          | 33cd8255-6371-4baf-9888-555973935748,4f78f76d-ab7e-4553-9535-93b113150f20,6fe71c72-7796-466a-acbf-aa1c93f1a123,9a00a819-d1f5-49a3-85b6-b1d9a97ef9ed,af95ccc0-6ddb-4b10-82b2-4de8dabfcc46,d24dd74f-bb40-49bc-a19c-f9e90d77f4ea,ed470df7-f8ae-4d26-8865-7241f085785b |
| d25a7eb5-3e7d-40d2-bc30-e370111e79f1  | Tsion     | 13338.00      | 47          | 33cd8255-6371-4baf-9888-555973935748,4f78f76d-ab7e-4553-9535-93b113150f20,6fe71c72-7796-466a-acbf-aa1c93f1a123,9a00a819-d1f5-49a3-85b6-b1d9a97ef9ed,af95ccc0-6ddb-4b10-82b2-4de8dabfcc46,ed470df7-f8ae-4d26-8865-7241f085785b |
| afdd0def-726e-41ed-b156-563647a629d0  | Solomon   | 12280.00      | 33          | 036ac34e-2d13-4a7e-a941-897b032e9c39,33cd8255-6371-4baf-9888-555973935748,4f78f76d-ab7e-4553-9535-93b113150f20,6f1ecaa5-fac0-4111-ac77-a717940c788d,6fe71c72-7796-466a-acbf-aa1c93f1a123,9a00a819-d1f5-49a3-85b6-b1d9a97ef9ed,af95ccc0-6ddb-4b10-82b2-4de8dabfcc46,ed470df7-f8ae-4d26-8865-7241f085785b |
| ef7c0d65-1619-4685-8527-3b65a194bfd6  | Etalem    | 11996.00      | 46          | 33cd8255-6371-4baf-9888-555973935748,4f78f76d-ab7e-4553-9535-93b113150f20,6fe71c72-7796-466a-acbf-aa1c93f1a123,9a00a819-d1f5-49a3-85b6-b1d9a97ef9ed,af95ccc0-6ddb-4b10-82b2-4de8dabfcc46,ed470df7-f8ae-4d26-8865-7241f085785b |
| 702ee90d-51ac-48c8-9dba-38f0dcb32c61  | ashu      | 10624.00      | 56          | 33cd8255-6371-4baf-9888-555973935748,4f78f76d-ab7e-4553-9535-93b113150f20,6fe71c72-7796-466a-acbf-aa1c93f1a123,9a00a819-d1f5-49a3-85b6-b1d9a97ef9ed,af95ccc0-6ddb-4b10-82b2-4de8dabfcc46,d24dd74f-bb40-49bc-a19c-f9e90d77f4ea,ed470df7-f8ae-4d26-8865-7241f085785b |
| 3cb765d7-76b5-47a8-9634-42f8181baa03  | Mickey    | 10533.60      | 31          | 036ac34e-2d13-4a7e-a941-897b032e9c39,33cd8255-6371-4baf-9888-555973935748,4f78f76d-ab7e-4553-9535-93b113150f20,6f1ecaa5-fac0-4111-ac77-a717940c788d,6fe71c72-7796-466a-acbf-aa1c93f1a123,9a00a819-d1f5-49a3-85b6-b1d9a97ef9ed,af95ccc0-6ddb-4b10-82b2-4de8dabfcc46 |
| e1e1ef41-fce1-4fdd-a1b2-1326bd373fe3  | Belsti    | 9652.00       | 25          | 036ac34e-2d13-4a7e-a941-897b032e9c39,33cd8255-6371-4baf-9888-555973935748,4f78f76d-ab7e-4553-9535-93b113150f20,6f1ecaa5-fac0-4111-ac77-a717940c788d,6fe71c72-7796-466a-acbf-aa1c93f1a123,9a00a819-d1f5-49a3-85b6-b1d9a97ef9ed,af95ccc0-6ddb-4b10-82b2-4de8dabfcc46,ed470df7-f8ae-4d26-8865-7241f085785b |
| 39df2681-cc46-43ca-acd7-33994caf06c3  | lemlem    | 8536.00       | 44          | 33cd8255-6371-4baf-9888-555973935748,4f78f76d-ab7e-4553-9535-93b113150f20,6fe71c72-7796-466a-acbf-aa1c93f1a123,9a00a819-d1f5-49a3-85b6-b1d9a97ef9ed,af95ccc0-6ddb-4b10-82b2-4de8dabfcc46,d24dd74f-bb40-49bc-a19c-f9e90d77f4ea |

### 1.1.2 Generate a query to calculate the conversion rate for group deals, defined as the ratio of completed orders (status = 'completed') to all group deals created.

In [None]:
WITH completed_orders_count AS (
    SELECT 
        gd.id AS group_deals_id,
        COUNT(o.id) AS completed_orders
    FROM group_deals gd
    LEFT JOIN groups g ON gd.id = g.group_deals_id
    LEFT JOIN groups_carts gc ON g.id = gc.group_id
    LEFT JOIN orders o ON gc.id = o.groups_carts_id
    WHERE o.status = 'COMPLETED' -- Only count completed orders
    GROUP BY gd.id
),
total_group_deals AS (
    SELECT 
        COUNT(*) AS total_group_deals
    FROM group_deals
)
SELECT 
    COALESCE(SUM(coc.completed_orders), 0) AS total_completed_orders,
    (SELECT total_group_deals FROM total_group_deals) AS total_group_deals,
    COALESCE(SUM(coc.completed_orders), 0) * 1.0 / NULLIF((SELECT total_group_deals FROM total_group_deals), 0) AS conversion_rate
FROM completed_orders_count coc;

### 1.1.2 Conversion Rate Analysis

The conversion rate for group deals is calculated as the ratio of completed orders to all group deals created. The following metrics are derived from the analysis above:

- **Total Completed Orders:** 292,279
- **Total Group Deals:** 3,603
- **Conversion Rate:** 81.12%

These metrics provide insights into the effectiveness of group deals in converting potential customers into actual buyers.

# Part 2: Data Processing

## 2.2 Advanced Data Aggregation

A script that fetches data from the database to calculate a monthly cohort analysis of users, grouping them by their signup month (`created_at`) and showing retention percentages based on their group deal participation over the next 3 months.

In [6]:
import pandas as pd

# Fetch data from the database
query = """
SELECT
    u.id AS user_id,
    u.created_at AS signup_date,
    g.created_at AS group_deal_date
FROM users u
LEFT JOIN groups_carts gc ON u.id = gc.user_id
LEFT JOIN groups g ON gc.group_id = g.id
WHERE u.created_at IS NOT NULL
  AND g.created_at IS NOT NULL
"""
df = pd.read_sql(query, engine)

# Convert dates to datetime
df['signup_date'] = pd.to_datetime(df['signup_date'])
df['group_deal_date'] = pd.to_datetime(df['group_deal_date'])

# Extract year and month for cohort analysis
df['signup_month'] = df['signup_date'].dt.to_period('M')
df['group_deal_month'] = df['group_deal_date'].dt.to_period('M')

# Calculate the month difference between signup and group deal participation
df['month_diff'] = (df['group_deal_date'].dt.year - df['signup_date'].dt.year) * 12 + (df['group_deal_date'].dt.month - df['signup_date'].dt.month)

# Filter data for the first 3 months after signup and ensure no negative values
df = df[(df['month_diff'] >= 0) & (df['month_diff'] <= 3)]

# Create a cohort table
cohort_data = df.groupby(['signup_month', 'month_diff']).agg({'user_id': 'nunique'}).reset_index()

# Pivot the cohort table
cohort_pivot = cohort_data.pivot_table(index='signup_month', columns='month_diff', values='user_id')

# Calculate retention percentages
cohort_size = cohort_pivot.iloc[:, 0]
retention_table = cohort_pivot.divide(cohort_size, axis=0) * 100
retention_table = retention_table.round(2)
retention_table = pd.DataFrame(retention_table)
retention_table = retention_table.map(lambda x: f"{x}%" if pd.notna(x) else "nan")
retention_table

month_diff,0,1,2,3
signup_month,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1
2023-10,100.0%,70.83%,50.0%,47.92%
2023-11,100.0%,33.98%,22.79%,17.76%
2023-12,100.0%,53.8%,45.76%,46.53%
2024-01,100.0%,50.45%,46.33%,50.99%
2024-02,100.0%,59.68%,58.56%,49.76%
2024-03,100.0%,46.91%,32.92%,34.23%
2024-04,100.0%,30.18%,23.95%,17.18%
2024-05,100.0%,25.89%,14.27%,14.7%
2024-06,100.0%,26.17%,21.74%,16.74%
2024-07,100.0%,34.05%,21.89%,13.88%


### 2.2 Cohort Analysis

The cohort analysis below shows the retention rates of users based on their signup month. The retention rate is calculated as the percentage of users who participated in group deals in the subsequent months after their signup.

| signup_month | 0       | 1       | 2       | 3       |
|--------------|---------|---------|---------|---------|
| 2023-10      | 100.0%  | 70.83%  | 50.0%   | 47.92%  |
| 2023-11      | 100.0%  | 33.98%  | 22.79%  | 17.76%  |
| 2023-12      | 100.0%  | 53.8%   | 45.76%  | 46.53%  |
| 2024-01      | 100.0%  | 50.45%  | 46.33%  | 50.99%  |
| 2024-02      | 100.0%  | 59.68%  | 58.56%  | 49.76%  |
| 2024-03      | 100.0%  | 46.91%  | 32.92%  | 34.23%  |
| 2024-04      | 100.0%  | 30.18%  | 23.95%  | 17.18%  |
| 2024-05      | 100.0%  | 25.89%  | 14.27%  | 14.7%   |
| 2024-06      | 100.0%  | 26.17%  | 21.74%  | 16.74%  |
| 2024-07      | 100.0%  | 34.05%  | 21.89%  | 13.88%  |
| 2024-08      | 100.0%  | 37.8%   | 22.85%  | nan     |
| 2024-09      | 100.0%  | 36.46%  | nan     | nan     |
| 2024-10      | 100.0%  | nan     | nan     | nan     |

## 2.3. Dynamic Data Preparation
### 2.3.1. A Function to Dynamically Fetch the Most Popular Product Categories and Calculate Their Sales Growth Percentage

This function dynamically fetches the most popular product categories from the database and calculates their sales growth percentage compared to the previous period (weekly or monthly). The function performs the following steps:

1. **Database Connection:** Establishes a connection to the PostgreSQL database using SQLAlchemy.
2. **Fetch Sales Data:** Retrieves sales data by joining necessary tables in the database.
3. **Ensure Date Format:** Ensures the date column is in datetime format for accurate calculations.
4. **Determine Date Ranges:** Determines the date ranges for the current and previous periods based on the specified aggregation period (weekly or monthly).
5. **Filter Data:** Filters the sales data for the current and previous periods.
6. **Aggregate Sales:** Aggregates sales by category for both the current and previous periods.
7. **Merge Sales Data:** Merges the sales data from the two periods.
8. **Calculate Growth Percentage:** Calculates the sales growth percentage for each category.
9. **Handle NaN and Inf Values:** Replaces NaN and Inf growth values with 0 for categories with no previous sales.
10. **Sort and Select Top Categories:** Sorts the categories by current sales and selects the top categories based on the specified number.

The function returns a DataFrame containing the category, current sales, previous sales, and growth percentage for the top product categories.

In [7]:
import pandas as pd

def fetch_sales_data(engine):
    """
    Fetch sales data from the database by joining necessary tables.

    Args:
        engine: SQLAlchemy engine object.

    Returns:
        pd.DataFrame: A DataFrame containing sales data.
    """
    query = """
    SELECT
        o.created_at AS order_date,
        c.name AS category,
        o.total_amount AS sales
    FROM orders o
    JOIN groups_carts gc ON o.groups_carts_id = gc.id
    JOIN groups g ON gc.group_id = g.id
    JOIN group_deals gd ON g.group_deals_id = gd.id
    JOIN products p ON gd.product_id = p.id
    JOIN product_names pn ON p.name_id = pn.id
    JOIN categories c ON pn.category_id = c.id
    WHERE o.status = 'COMPLETED'
    """
    df = pd.read_sql(query, engine)
    return df

def calculate_sales_growth(data, date_column, category_column, sales_column, period='weekly'):
    """
    Calculate sales growth percentage for the most popular product categories.

    Parameters:
        data (pd.DataFrame): Input DataFrame containing sales data.
        date_column (str): Column name for the order date.
        category_column (str): Column name for the product category.
        sales_column (str): Column name for the sales amount.
        period (str): Aggregation period ('weekly' or 'monthly'). Default is 'weekly'.

    Returns:
        pd.DataFrame: A DataFrame with category, current sales, previous sales, and growth percentage.
    """
    # Ensure the date column is in datetime format
    data[date_column] = pd.to_datetime(data[date_column])

    # Determine date ranges for current and previous periods
    latest_date = data[date_column].max()

    if period == 'weekly':
        current_start = latest_date - pd.Timedelta(days=7)
        previous_start = current_start - pd.Timedelta(days=7)
        previous_end = current_start
    elif period == 'monthly':
        current_start = latest_date - pd.DateOffset(months=1)
        previous_start = current_start - pd.DateOffset(months=1)
        previous_end = current_start
    else:
        raise ValueError("Invalid period. Use 'weekly' or 'monthly'.")

    # Filter data for the current and previous periods
    current_data = data[(data[date_column] > current_start)]
    previous_data = data[(data[date_column] > previous_start) & (data[date_column] <= previous_end)]

    # Aggregate sales by category for both periods
    current_sales = current_data.groupby(category_column)[sales_column].sum().reset_index()
    current_sales.rename(columns={sales_column: 'current_sales'}, inplace=True)

    previous_sales = previous_data.groupby(category_column)[sales_column].sum().reset_index()
    previous_sales.rename(columns={sales_column: 'previous_sales'}, inplace=True)

    # Merge the two periods' sales data
    sales_comparison = pd.merge(current_sales, previous_sales, on=category_column, how='outer').fillna(0)

    # Calculate growth percentage
    sales_comparison['growth_percentage'] = (
        (sales_comparison['current_sales'] - sales_comparison['previous_sales']) / 
        sales_comparison['previous_sales'].replace(0, float('nan'))
    ) * 100

    # Replace NaN and Inf growth values with 0 (for categories with no previous sales)
    sales_comparison['growth_percentage'] = sales_comparison['growth_percentage'].fillna(0).replace(float('inf'), 0)

    # Sort by current sales
    sorted_categories = sales_comparison.sort_values(by='current_sales', ascending=False)

    return sorted_categories

# Fetch sales data from the database
sales_data = fetch_sales_data(engine)

# Calculate sales growth
result = calculate_sales_growth(
    data=sales_data,
    date_column='order_date',
    category_column='category',
    sales_column='sales',
    period='monthly' # options are 'weekly' or 'monthly'
)

result_df = pd.DataFrame(result)
result_df

Unnamed: 0,category,current_sales,previous_sales,growth_percentage
11,Vegetable,1793329.0,2429761.0,-26.193193
5,Fruit,930411.0,874894.0,6.345569
9,Packed Food & Drink,187159.25,187761.9,-0.320965
3,Condiments,123603.0,188187.0,-34.319055
1,Baking Goods,115700.0,100122.0,15.559018
8,Meat and Poultry,64776.0,86776.0,-25.352632
10,Personal Care,62010.4,36971.35,67.72555
7,Household,18738.0,17782.3,5.374445
2,Cloth & Fashion,17013.0,38243.0,-55.513427
4,Dairy,15135.0,44578.0,-66.048275


### 2.3.1 Sales Growth Analysis

The table below shows the sales growth percentage for the most popular product categories. The growth percentage is calculated by comparing the current sales to the previous sales.

| Category              | Current Sales | Previous Sales | Growth Percentage |
|-----------------------|---------------|----------------|-------------------|
| Vegetable             | 1,793,329.00  | 2,429,761.00   | -26.19%           |
| Fruit                 | 930,411.00    | 874,894.00     | 6.35%             |
| Packed Food & Drink   | 187,159.25    | 187,761.90     | -0.32%            |
| Condiments            | 123,603.00    | 188,187.00     | -34.32%           |
| Baking Goods          | 115,700.00    | 100,122.00     | 15.56%            |
| Meat and Poultry      | 64,776.00     | 86,776.00      | -25.35%           |
| Personal Care         | 62,010.40     | 36,971.35      | 67.73%            |
| Household             | 18,738.00     | 17,782.30      | 5.37%             |
| Cloth & Fashion       | 17,013.00     | 38,243.00      | -55.51%           |
| Dairy                 | 15,135.00     | 44,578.00      | -66.05%           |
| Holiday               | 12,915.00     | 19,674.00      | -34.35%           |
| Baby Items            | 5,214.00      | 5,790.00       | -9.95%            |

### 2.3.2 Data Preprocessing Class


This class provides a reusable Python implementation to preprocess and normalize data for various tables. The class performs the following steps:

1. **Database Connection:** Establishes a connection to the PostgreSQL database using SQLAlchemy.
2. **Load Data:** Loads data from the specified table in the database.
3. **Handle NULL Values:** Handles NULL values using the specified strategy (mean, median, mode, or constant).
4. **Encode Categorical Variables:** Encodes categorical variables using one-hot encoding or label encoding.
5. **Aggregate Timestamps:** Aggregates timestamps to specified periods (daily, weekly, monthly, or yearly).
6. **Preprocess Data:** Combines all preprocessing steps into a single method to preprocess and normalize data for a specific table.

The class returns a DataFrame containing the preprocessed data.
(e.g., handling NULLs, encoding categorical variables, aggregating timestamps to periods).

In [8]:
import pandas as pd
from sklearn.preprocessing import LabelEncoder

class DataPreprocessor:
    def __init__(self, engine):
        self.engine = engine
        self.label_encoders = {}

    def load_data(self, table_name):
        """
        Load data from the database.
        
        Args:
            table_name (str): Name of the table to load data from.
        
        Returns:
            pd.DataFrame: DataFrame containing the loaded data.
        """
        query = f"SELECT * FROM {table_name}"
        df = pd.read_sql(query, self.engine)
        return df

    def handle_nulls(self, df, strategy='mean'):
        """
        Handle NULL values using the specified strategy.
        
        Args:
            df (pd.DataFrame): Input DataFrame.
            strategy (str): Strategy to handle NULLs ('mean', 'median', 'mode', 'constant').
        
        Returns:
            pd.DataFrame: DataFrame with NULL values handled.
        """
        if strategy in ['mean', 'median', 'mode']:
            numeric_cols = df.select_dtypes(include=['number']).columns
            if strategy == 'mean':
                df[numeric_cols] = df[numeric_cols].fillna(df[numeric_cols].mean())
            elif strategy == 'median':
                df[numeric_cols] = df[numeric_cols].fillna(df[numeric_cols].median())
            elif strategy == 'mode':
                df[numeric_cols] = df[numeric_cols].fillna(df[numeric_cols].mode().iloc[0])
        elif strategy == 'constant':
            df = df.fillna(0)
        else:
            raise ValueError("Invalid null strategy. Use 'mean', 'median', 'mode', or 'constant'.")
        return df

    def encode_categorical(self, df, columns, encoding_type='onehot'):
        """
        Encode categorical variables using the specified encoding type.
        
        Args:
            df (pd.DataFrame): Input DataFrame.
            columns (list): List of categorical columns to encode.
            encoding_type (str): Encoding type ('onehot' or 'label').
        
        Returns:
            pd.DataFrame: DataFrame with categorical variables encoded.
        """
        if encoding_type == 'onehot':
            return pd.get_dummies(df, columns=columns)
        elif encoding_type == 'label':
            for column in columns:
                if column not in self.label_encoders:
                    self.label_encoders[column] = LabelEncoder()
                    df[column] = self.label_encoders[column].fit_transform(df[column])
                else:
                    df[column] = self.label_encoders[column].transform(df[column])
            return df
        else:
            raise ValueError("Invalid encoding type. Use 'onehot' or 'label'.")

    def aggregate_timestamps(self, df, timestamp_column, period='M'):
        """
        Aggregate timestamps to specified periods.
        
        Args:
            df (pd.DataFrame): Input DataFrame.
            timestamp_column (str): Column containing timestamp values.
            period (str): Resampling period ('D', 'W', 'M', 'Y').
        
        Returns:
            pd.DataFrame: DataFrame with aggregated timestamps.
        """
        df[timestamp_column] = pd.to_datetime(df[timestamp_column])
        df[f'{timestamp_column}_period'] = df[timestamp_column].dt.to_period(period)
        return df

    def preprocess(self, table_name, null_strategy='mean', categorical_columns=None, encoding_type='onehot', timestamp_column=None, period='M'):
        """
        Preprocess and normalize data for a specific table.
        
        Parameters:
            table_name (str): Name of the database table to preprocess.
            null_strategy (str): Strategy to handle NULLs ('mean', 'median', 'mode', 'constant').
            categorical_columns (list): List of categorical columns to encode.
            encoding_type (str): Encoding type for categorical variables ('onehot' or 'label').
            timestamp_column (str): Column containing timestamp values.
            period (str): Resampling period for timestamps ('D', 'W', 'M', 'Y').
        
        Returns:
            pd.DataFrame: Preprocessed DataFrame.
        """
        data = self.load_data(table_name)

        # Handle NULL values
        data = self.handle_nulls(data, strategy=null_strategy)

        # Encode categorical variables
        if categorical_columns:
            data = self.encode_categorical(data, columns=categorical_columns, encoding_type=encoding_type)

        # Aggregate timestamps
        if timestamp_column:
            data = self.aggregate_timestamps(data, timestamp_column=timestamp_column, period=period)

        return data

# usage
if __name__ == "__main__":
    preprocessor = DataPreprocessor(engine)

    preprocessed_data = preprocessor.preprocess(
        table_name='orders',  # Replace with your actual table name
        null_strategy='mean',
        categorical_columns=['status', 'payment_method'],  # Replace with your actual categorical columns
        encoding_type='onehot',
        timestamp_column='created_at',  # Replace with your actual timestamp column
        period='M'
    )
    print(preprocessed_data)

                                          id personal_cart_id  \
0       349e932b-bf57-41b4-be2d-26c266ffeb1a             None   
1       0badd3d4-b4bd-4256-9189-b1f5a1eb033a             None   
2       ca0a288f-85eb-485d-b494-2c26b3ce23ff             None   
3       b3b3843f-cacd-4d24-a9b0-4220a11cf08b             None   
4       8bf85ebd-e96b-4409-b01a-b3a22cc2894e             None   
...                                      ...              ...   
377911  83a07166-f19d-4645-8764-04a669f54eef             None   
377912  bf578f5a-6770-4468-b42a-fec5d510dcd9             None   
377913  94785d86-f0a0-4308-aa74-a63166e682a3             None   
377914  092e7913-5ae8-4c85-b3fa-fb5d28062f64             None   
377915  46ea272a-e940-438d-a38c-e5ee2212212b             None   

                             groups_carts_id  total_amount  \
0       e917fa7d-17eb-4be4-a54e-216ddbdef68b        422.75   
1       48ae0049-dabc-4631-a670-6df813b0f431        750.50   
2       3c933d2c-04ef-466f-b797-1