### **2.2 SQL Analysis**

---

#### **Objectives**

Demonstrate advanced SQL skills by creating a database-driven analysis workflow and answering complex business questions through structured queries.

---

In [6]:
import psycopg2
import polars as pl
import pandas as pd
from tqdm import tqdm
from typing import List
import matplotlib.pyplot as plt
from abc import ABC, abstractmethod
from psycopg2.extras import execute_values

In [27]:
class DataSqlIntegration(ABC):
      """
      Abstract base class for bank marketing data SQL integration operations.

      This class defines the interface for loading bank marketing data into a SQL database
      and querying results. Concrete implementations must provide the actual database
      interaction logic.

      Attributes:
            dataPath (str): Path to the data file to be processed
      """

      def __init__(self, dataPath: str) -> None:
            """
            Initialize the data integration handler.
            
            Args:
            dataPath: Path to the bank marketing data file
            """
            self.dataPath = dataPath

      @abstractmethod
      def createTable(self, connection: psycopg2.connect) -> None:
            """
            Create the bank_marketing table in the database.
            
            Args:
            connection: Active PostgreSQL database connection
            
            Raises:
            psycopg2.Error: If table creation fails
            """
            pass

      @abstractmethod
      def answerQueryAnswer(self, connection:psycopg2.connect, queryString: str, index: List[str]) -> pd.DataFrame:
            """
            Convert SQL query results into a pandas DataFrame.
            
            Args:
            queryRows: List of tuples from SQL query results
            index: List of index values for the DataFrame
            
            Returns:
            pd.DataFrame: Formatted DataFrame from query results
            """
            pass

      @abstractmethod
      def insertRecordsIntoTable(self, connection: psycopg2.connect, alreadyAdded: bool, batch_size: int = 100_000) -> None:
            """
            Insert records from the data file into the database table.

            Args:
            connection: Active PostgreSQL database connection

            Raises:
            psycopg2.Error: If data insertion fails
            """
            pass


class EcommerceDataSqlIntegration(DataSqlIntegration):
      """
      Concrete implementation of bank marketing data SQL integration.
      
      Provides PostgreSQL-specific implementation for loading bank marketing campaign data
      and converting query results to pandas DataFrames.
      """
      
      def createTable(self, connection: psycopg2.connect) -> None:
            """
            Create the ecommerce_events table with proper schema if it doesn't exist.
            """
            cursor = connection.cursor()
            try:
                  cursor.execute("""
                        CREATE TABLE IF NOT EXISTS ecommerce_events (
                        event_id BIGSERIAL PRIMARY KEY,
                        event_time TIMESTAMPTZ NOT NULL,
                        event_type VARCHAR(20) NOT NULL CHECK (event_type IN ('view', 'cart', 'purchase')),
                        product_id BIGINT,
                        category_id BIGINT,
                        brand TEXT,
                        price NUMERIC(12,2),
                        user_id BIGINT,
                        user_session TEXT,

                        isFreeItem BOOLEAN,
                        isLuxuryItem BOOLEAN,
                        isExtremeOutlier BOOLEAN,
                        isAbnormal BOOLEAN,
                        has_purchase BOOLEAN,
                        isMultiDaySession BOOLEAN,
                        hasCategoryCode BOOLEAN,

                        category_l1 TEXT,
                        category_l2 TEXT,
                        category_l3 TEXT,
                        isLowFrequencyCategory BOOLEAN,

                        sessionDuration REAL,
                        sessionStartHour SMALLINT,
                        sessionDayOfWeek TEXT,
                        isWeekendSession BOOLEAN,
                        isMidnightActivity BOOLEAN,
                        isDirectPurchase BOOLEAN,
                        isDirectPurchase_right BOOLEAN,
                        isAbandonedCart BOOLEAN,
                        advancedToCart BOOLEAN,
                        advancedToPurchase BOOLEAN,
                        EngagementDepth INTEGER,

                        popularityScore REAL,
                        categoryConversionRate REAL,
                        isInMultiCategories BOOLEAN
                        );
                  """)
                  connection.commit()
            except psycopg2.Error as e:
                  connection.rollback()
                  raise e

      def addTableIndexes(self, connection: psycopg2.connect) -> None:
            try:
                  cursor = connection.cursor()
                  cursor.execute("CREATE INDEX IF NOT EXISTS idx_user_id ON ecommerce_events(user_id);")
                  cursor.execute("CREATE INDEX IF NOT EXISTS idx_product_id ON ecommerce_events(product_id);")
                  cursor.execute("CREATE INDEX IF NOT EXISTS idx_event_type ON ecommerce_events(event_type);")
                  cursor.execute("CREATE INDEX IF NOT EXISTS idx_event_time_brin ON ecommerce_events USING BRIN(event_time);")
                  cursor.execute("CREATE INDEX IF NOT EXISTS idx_user_event_time ON ecommerce_events(user_id, event_time);")
                  cursor.execute("ANALYZE ecommerce_events;")
                  connection.commit()

            except psycopg2.Error as e:
                  connection.rollback()
            
            finally:
                  cursor.close()

      def insertRecordsIntoTable(self, connection, alreadyAdded: bool, batchSize: int = 100_000):
            if alreadyAdded:
                  return

            try:
                  cursor = connection.cursor()

                  # Lazy read Parquet
                  lazy_df = pl.scan_parquet(self.dataPath)
                  df = lazy_df.collect()
                  total_rows = df.height

                  # Iterate in chunks with progress bar
                  for start in tqdm(range(0, total_rows, batchSize), desc="Inserting rows"):
                        end = min(start + batchSize, total_rows)
                        chunk = df[start:end]
                        data_tuples = [tuple(row) for row in chunk.to_numpy()]

                        if data_tuples:
                              execute_values(cursor, """
                                    INSERT INTO ecommerce_events (
                                          event_time, event_type, product_id, category_id, brand, price, user_id, user_session,
                                          isFreeItem, isLuxuryItem, isExtremeOutlier, isAbnormal, has_purchase, isMultiDaySession,
                                          hasCategoryCode,
                                          category_l1, category_l2, category_l3,
                                          isLowFrequencyCategory,
                                          sessionDuration, sessionStartHour, sessionDayOfWeek, isWeekendSession,
                                          isMidnightActivity, isDirectPurchase, isDirectPurchase_right,
                                          isAbandonedCart, advancedToCart, advancedToPurchase, EngagementDepth,
                                          popularityScore, categoryConversionRate,
                                          isInMultiCategories
                                    ) VALUES %s
                              """, data_tuples)
                              connection.commit()

            except Exception as e:
                  connection.rollback()
                  raise e
            
      def answerQueryAnswer(self, connection: psycopg2.connect, queryString: str, index: List[str]) -> pd.DataFrame:
            """
            Execute SQL query and convert results to a properly formatted pandas DataFrame.
            
            Args:
                  connection: Active PostgreSQL database connection
            queryString: SQL query to execute
            index: List of index values for the DataFrame
            
      Returns:
            pd.DataFrame: Formatted DataFrame with query results
            
      Example:
            >>> df = answerQueryAnswer(conn, "SELECT age, balance FROM bank_marketing", ['row1', 'row2'])
      """
            cursor = connection.cursor()
            try:
                  cursor.execute(queryString)
                  queryRows = cursor.fetchall()
                  
                  # Debug: Print raw query results
                  print(f"Query returned {len(queryRows)} rows")
                  if queryRows:
                        print("First row sample:", queryRows[0])
                  
                  # Create DataFrame with proper column handling
                  if not queryRows:
                        return pd.DataFrame(index=index)
                        
                  # Convert to DataFrame with column names
                  df = pd.DataFrame.from_records(
                        queryRows,
                        columns=[desc[0] for desc in cursor.description],
                        index=index[:len(queryRows)]  # Ensure index matches row count
                  )
                  return df
                  
            except psycopg2.Error as e:
                  print(f"Database error: {e}")
                  return pd.DataFrame()
            finally:
                  cursor.close()

In [30]:
dataSqlIntegration = EcommerceDataSqlIntegration(dataPath="D:\programming\Data Analysis\E-commerceCustomerBehaviorAnalysis\Data\Processed\prsc_fe9_nov_2019.parquet")
connection = psycopg2.connect(database="ecommerceCustomers",host="localhost",user="postgres",
                                    password="postgres",port="5432")

In [None]:
dataSqlIntegration.createTable(connection=connection)

In [None]:
dataSqlIntegration.insertRecordsIntoTable(connection=connection, alreadyAdded=False, batchSize=100000)

In [31]:
dataSqlIntegration.addTableIndexes(connection=connection)


---

# **Task 2.2.1 – Database Design and Data Loading**

---

### 1) Database Schema Design

**Q1. What is the optimal table structure to normalize the e-commerce data while maintaining query performance?**
**A (Strategy):**

* Use a **single wide `ecommerce_events` table** (as you already did) because:

  * Data is clean → no need for staging tables.
  * Analysis and BI dashboards benefit from one denormalized table.
* Keep dimensions (`products`, `customers`, `categories`) optional for future expansion, but not required now.
* Code strategy: rely on your `createTable()` implementation that defines the schema with correct data types and constraints.

---

**Q2. How should we design indexes on user\_id, product\_id, and event\_time to optimize common analytical queries?**
**A (Strategy):**

* Create **BTREE indexes** on:

  * `user_id` → for customer-level queries.
  * `product_id` → for product performance analysis.
  * `event_type` → for funnel/conversion queries.
* Create **BRIN index** on `event_time` → for fast filtering by time ranges (cheap and scalable for 285M rows).
* Optionally, add composite `(user_id, event_time)` for recency analysis.
* Code strategy: extend `createTable()` to run `CREATE INDEX` after table creation.

---

**Q3. What foreign key relationships should be established between customers, products, and events tables?**
**A (Strategy):**

* Since all analysis happens in the **single wide table**, foreign keys aren’t required for integrity (data is already clean).
* If needed in the future:

  * `events.user_id → customers.user_id`
  * `events.product_id → products.product_id`
* Code strategy: skip FK constraints for now to keep inserts fast; revisit if you add dimension tables later.

---

### 2) Data Import and Validation

**Q4. How can we efficiently load 285 million records into a relational database with proper data type conversions?**
**A (Strategy):**

* You already stream from parquet → Polars → numpy tuples → Postgres.
* Optimize by:

  * Using `execute_values()` in batches (you already do this).
  * Tuning `batchSize` (100k is good; test 250k–500k for speed).
  * Running with autocommit off + commit per batch (as you implemented).
* Code strategy: keep your current `insertRecordsIntoTable()` logic.

---

**Q5. What SQL constraints should be implemented to ensure data quality during the import process?**
**A (Strategy):**

* Since data is **already clean**, keep constraints minimal for performance:

  * `NOT NULL` only where logically required (`event_time`, `event_type`).
  * `CHECK` on `event_type IN ('view','cart','purchase')`.
* Skip extra checks like duplicates or null validation (handled upstream in data pipeline).
* Code strategy: your current `createTable()` is sufficient.

---

**Q6. How do we handle duplicate records and maintain referential integrity during bulk data loading?**
**A (Strategy):**

* No duplicates exist → skip deduplication logic.
* Referential integrity is not enforced (since no FK constraints are required).
* Code strategy: your `insertRecordsIntoTable()` just streams parquet → Postgres directly, which is optimal.

---

### 3) Performance Optimization

**Q7. What partitioning strategy should be used for the events table based on event\_time to improve query performance?**
**A (Strategy):**

* Use **range partitioning by month** on `event_time` (optional future step).
* For now, with clean wide data, a **BRIN index** on `event_time` is likely enough.
* Code strategy:

  * Add BRIN index after table creation.
  * If queries slow down, evolve to monthly partitions.

---

**Q8. How should we implement proper indexing for both transactional and analytical workloads?**
**A (Strategy):**

* Transactional (row lookups): `BTREE` on PK (`event_id`).
* Analytical: `BRIN` on `event_time`, `BTREE` on `user_id`, `product_id`, `event_type`.
* Optional: composite indexes `(user_id, event_time)` and `(product_id, event_type)`.
* Code strategy: add an `initIndexes()` method that runs after `createTable()`.

---

**Q9. What materialized views or summary tables would accelerate common business intelligence queries?**
**A (Strategy):**

* Since BI queries will hit 285M rows, create summary layers:

  * `daily_revenue` (date, revenue, unique\_customers).
  * `product_perf` (product\_id, views, carts, purchases, conversion\_rate).
  * `customer_rfm` (user\_id, recency, frequency, monetary).
* Refresh nightly with `REFRESH MATERIALIZED VIEW CONCURRENTLY`.
* Code strategy: add SQL scripts to `answerQueryAnswer()` for generating these summaries.

---


