## **Mentoring Week 2 - Data Profiling and Data Quality** ##

### Building Data Pipeline with Python and Pyspark - Pacmann AI ###

As a Data Engineer, we need to understand and assessing the quality of a given dataset containing sales data. This responsibilities include:

1. **Data Profiling:** Explore the dataset to gain insights into its structure and attributes.

2. **Data Quality Check:** Assess the validity and consistency of the data. Identify any anomalies or missing values.

3. **Recommendations:** Based on your findings, provide recommendations for cleaning and improving the dataset.

### **The Dataset**

* Use Docker Compose to run the container: [repository](https://github.com/Kurikulum-Sekolah-Pacmann/data_pipeline_paccafe)
* This dataset provides detailed information about cafe sales, employees, member customers, and inventory tracking history.

## **Output:** ##

## **Data Profiling** ##

In [None]:
import pandas as pd
from sqlalchemy import create_engine
import json
import os
from datetime import datetime
from dotenv import load_dotenv

# Load environment variables
load_dotenv()

DB_SOURCE = {
    "dbname": os.getenv("SRC_POSTGRES_DB"),
    "user": os.getenv("SRC_POSTGRES_USER"),
    "password": os.getenv("SRC_POSTGRES_PASSWORD"),
    "host": os.getenv("SRC_POSTGRES_HOST"),
    "port": os.getenv("SRC_POSTGRES_PORT")
}

def db_engine():
    """Creates a SQLAlchemy engine."""
    try:
        db_url = f"postgresql://{DB_SOURCE['user']}:{DB_SOURCE['password']}@{DB_SOURCE['host']}:{DB_SOURCE['port']}/{DB_SOURCE['dbname']}"
        engine = create_engine(db_url)
        return engine
    except Exception as e:
        print(f"Error creating engine: {e}")
        return None

def list_tables():
    """Retrieve all table names from the database."""
    engine = db_engine()
    query = "SELECT table_name FROM information_schema.tables WHERE table_schema = 'public'"
    df = pd.read_sql(query, engine)
    return df["table_name"].tolist()

def extract_data():
    """Extract data from all tables into a dictionary of DataFrames."""
    tables = list_tables()
    engine = db_engine()
    data = {}
    for table in tables:
        data[table] = pd.read_sql(f"SELECT * FROM {table}", engine)
    return data

def table_shapes(data):
    """Return the shape (rows, columns) of each table."""
    result = {}
    for table, df in data.items():
        result[table] = df.shape
    return result

def column_types(data):
    """Return data types of columns in each table."""
    result = {}
    for table, df in data.items():
        result[table] = {}
        for col in df.columns:
            result[table][col] = str(df[col].dtype)
    return result

def unique_values(data):
    """Return unique values of specific records in each table for specific columns."""
    selected_columns = {
        'employees': ['role'],
        'orders': ['payment_method'],
        'products': ['category'],
        'inventory': ['reason']
    }

    result = {}

    # Iterate through selected tables and columns
    for table, columns in selected_columns.items():
        result[table] = {}
        if table in data:
            for col in columns:
                if col in data[table].columns:
                    result[table][col] = data[table][col].unique().tolist()
                else:
                    result[table][col] = []  # If the column doesn't exist in the table, return an empty list
        else:
            result[table] = {}  # If the table doesn't exist in the data, return an empty dict

    return result

def profile_report():
    """Generate a data profiling report and save as JSON."""
    data = extract_data()
    report = {
        "person_in_charge": "Reza",
        "date_profiling": str(datetime.now()),
        "result": {}
    }

    table_shapes_result = table_shapes(data)
    column_types_result = column_types(data)
    unique_values_result = unique_values(data)

    for table in data.keys():
        report["result"][table] = {
            "shape": table_shapes_result[table],
            "data_types": column_types_result[table],
            "unique_values": unique_values_result.get(table, {})
        }

    # Ensure the output folder exists
    output_folder = 'output'
    output_file = 'paccafe_profiling_report.json'

    if not os.path.exists(output_folder):
        os.makedirs(output_folder)

    output_path = os.path.join(output_folder, output_file)
    with open(output_path, "w") as f:
        json.dump(report, f, indent=4)

    return report

# Running profiling
profile_report()


## **Data Quality** ##

In [14]:
from dotenv import load_dotenv
import os
from sqlalchemy import create_engine
import pandas as pd
import re
import json
from datetime import datetime

# Load environment variables
load_dotenv()

DB_SOURCE = {
    "dbname": os.getenv("SRC_POSTGRES_DB"),
    "user": os.getenv("SRC_POSTGRES_USER"),
    "password": os.getenv("SRC_POSTGRES_PASSWORD"),
    "host": os.getenv("SRC_POSTGRES_HOST"),
    "port": os.getenv("SRC_POSTGRES_PORT")
}

def db_engine():
    """Creates a SQLAlchemy engine."""
    try:
        db_url = f"postgresql://{DB_SOURCE['user']}:{DB_SOURCE['password']}@{DB_SOURCE['host']}:{DB_SOURCE['port']}/{DB_SOURCE['dbname']}"
        engine = create_engine(db_url)
        return engine
    except Exception as e:
        print(f"Error creating engine: {e}")
        return None

def check_missing_values(table_name):
    """Check for missing values in a table."""
    df = pd.read_sql(f"SELECT * FROM {table_name}", db_engine())
    missing_values = {}
    for col in df.columns:
        missing_values[col] = int(df[col].isnull().sum())
    return missing_values

def is_valid_date(date_str, date_format):
    """Validate if a string matches the specified date format using regex."""
    # Define regex patterns based on the format
    patterns = {
        "%Y-%m-%d": r"^\d{4}-\d{2}-\d{2}$",  # Matches format 2025-02-16
        "%Y-%m-%d %H:%M:%S": r"^\d{4}-\d{2}-\d{2} \d{2}:\d{2}:\d{2}$",  # Matches format 2025-02-16 14:30:00
        "%d/%m/%Y": r"^(0?[1-9]|[12][0-9]|3[01])/(0?[1-9]|1[0-2])/\d{4}$"  # Matches format 16/02/2025
    }

    # Check if the regex for the given format exists and matches the date string
    if date_format in patterns:
        pattern = patterns[date_format]
        return bool(re.match(pattern, date_str))  # Returns True if match found, False otherwise
    return False  # If no matching pattern, return False

def validate_date_format(data, selected_columns):
    """Check if the date format in a column is valid based on the specified date formats, and summarize counts of True/False."""
    result = {}
    for table, columns in selected_columns.items():
        if table in data:
            result[table] = {}
            for col, date_format in columns.items():
                if col in data[table].columns:
                    valid_dates = {}  # Dictionary to store only True/False counts that exist
                    for idx, date_str in enumerate(data[table][col].astype(str)):  # Loop through each date value in the column
                        is_valid = is_valid_date(date_str, date_format)  # Validate each date string
                        # Add to the dictionary only if the result is True or False
                        if is_valid:
                            valid_dates['True'] = valid_dates.get('True', 0) + 1
                        else:
                            valid_dates['False'] = valid_dates.get('False', 0) + 1
                    result[table][col] = valid_dates  # Save the counts for this column
    return result
    
def check_numeric_values(data, selected_columns):
    """Check if a column contains numeric values (using for loop)."""
    result = {}
    for table, columns in selected_columns.items():
        if table in data:
            result[table] = {}
            for col in columns:
                if col in data[table].columns:
                    df = data[table][col]
                    numeric_check = []  # List to store True/False results
                    for x in df:  # Iterate through each element in the Series
                        numeric_check.append(isinstance(x, (int, float)))

                    # Convert the list to a pandas Series for value_counts()
                    numeric_series = pd.Series(numeric_check)
                    result[table][col] = numeric_series.value_counts().to_dict()
    return result

def check_negative_values(data, selected_columns):
    """Check for negative values in numerical columns, considering both actual negative values and the presence of a '-' symbol."""
    result = {}
    for table, columns in selected_columns.items():
        if table in data:
            result[table] = {}
            for col in columns:
                if col in data[table].columns:
                    # Get the column data
                    df = data[table][col]
                    
                    # Convert to numeric values (invalid entries become NaN)
                    numeric_df = pd.to_numeric(df, errors='coerce')
                    
                    # Count values that are less than 0 (this handles actual negative values)
                    negative_values_count = (numeric_df < 0).sum()
                    
                    # Ensure the column is of string type to use .str.contains
                    # Convert the values to strings and count entries with a '-' symbol (handles cases like "$-4" or "-$4")
                    negative_symbol_count = df.astype(str).str.contains('-').sum()
                    
                    # The final negative count will be the sum of both conditions
                    result[table][col] = negative_values_count + negative_symbol_count

    return result

def convert_to_serializable(value):
    """Convert pandas types to native Python types for JSON serialization."""
    if isinstance(value, pd.Timestamp):
        return value.isoformat()  # Convert pandas Timestamp to ISO format
    if isinstance(value, pd.Timedelta):
        return str(value)  # Convert pandas Timedelta to string
    if isinstance(value, (pd.Int64Dtype, pd.Float64Dtype)):
        return value.item()  # Convert pandas nullable types to native Python types (int or float)
    if pd.api.types.is_integer_dtype(value):  # Check if it's an integer type
        return int(value)
    if pd.api.types.is_float_dtype(value): # Check if it's a float type
        return float(value)
    if isinstance(value, (int, float)):
        return value  # If it's already a native int or float, return as is
    return value  # Return other types (like strings) as they are

def generate_data_quality_report():
    """Generate and save a data quality report."""
    # Extract data
    data = extract_data()

    # Define selected columns for validation
    date_columns = {
        "employees": {"hire_date": "%Y-%m-%d"},
        "inventory_tracking": {"change_date": "%Y-%m-%d"},
        "orders": {"order_date": "%Y-%m-%d %H:%M:%S"}
    }
    
    numeric_columns = {
        "products": ["unit_price", "cost_price"],
        "orders": ["total_amount"],
        "order_details": ["unit_price", "quantity", "subtotal"],
        "inventory_tracking": ["quantity_change"],
        "customers": ["loyalty_points"]
    }
    
    negative_columns = {
        "products": ["unit_price", "cost_price"],
        "orders": ["total_amount"],
        "order_details": ["unit_price", "quantity", "subtotal"],
        "inventory_tracking": ["quantity_change"],
        "customers": ["loyalty_points"]
    }

    # Initialize report structure
    report = {
        "person_in_charge": "Reza",
        "date_quality_check": str(datetime.now()),
        "result": {}
    }

    # Generate the report for each table
    for table in data.keys():
        table_report = {}

        # Check for missing values
        missing_values = {}
        for col, val in check_missing_values(table).items():
            missing_values[col] = convert_to_serializable(val)
        table_report["missing_values"] = missing_values

        # Validate dates
        date_validity = {}
        date_validation_results = validate_date_format(data, date_columns).get(table, {}) # Get the dictionary for the current table or empty if not present
        for col, val in date_validation_results.items():
            date_validity[col] = convert_to_serializable(val)
        table_report["date_validity"] = date_validity

        # Check numeric values
        numeric_validity = {}
        numeric_validation_results = check_numeric_values(data, numeric_columns).get(table, {}) # Get the dictionary for the current table or empty if not present
        for col, val in numeric_validation_results.items():
            numeric_validity[col] = convert_to_serializable(val)
        table_report["numeric_validity"] = numeric_validity

        # Check for negative values
        negative_validity = {}
        negative_validation_results = check_negative_values(data, negative_columns).get(table, {}) # Get the dictionary for the current table or empty if not present
        for col, val in negative_validation_results.items():
            negative_validity[col] = convert_to_serializable(val)
        table_report["negative_validity"] = negative_validity
        
        # Add table report to main report
        report["result"][table] = table_report

    # Ensure the output folder exists
    output_folder = 'output'
    output_file = 'paccafe_quality_report.json'
    if not os.path.exists(output_folder):
        os.makedirs(output_folder)

    # Save the report as JSON in the /output/ folder
    output_path = os.path.join(output_folder, output_file)
    with open(output_path, "w") as f:
        json.dump(report, f, indent=4)

    print(f"Data quality report saved to {output_path}")

    return report

def extract_data():
    """Extract data from all tables into a dictionary of DataFrames."""
    tables = list_tables()
    engine = db_engine()
    data = {}
    for table in tables:
        data[table] = pd.read_sql(f"SELECT * FROM {table}", engine)
    return data

def list_tables():
    """Retrieve all table names from the database."""
    engine = db_engine()
    query = "SELECT table_name FROM information_schema.tables WHERE table_schema = 'public'"
    df = pd.read_sql(query, engine)
    return df["table_name"].tolist()

# Run the data quality check report
generate_data_quality_report()


Data quality report saved to output\paccafe_quality_report.json


{'person_in_charge': 'Reza',
 'date_quality_check': '2025-02-16 22:09:31.530729',
 'result': {'products': {'missing_values': {'product_id': 0,
    'product_name': 0,
    'category': 0,
    'unit_price': 0,
    'cost_price': 0,
    'in_stock': 0,
    'created_at': 0},
   'date_validity': {},
   'numeric_validity': {'unit_price': {False: 54}, 'cost_price': {False: 54}},
   'negative_validity': {'unit_price': 2, 'cost_price': 3}},
  'inventory_tracking': {'missing_values': {'tracking_id': 0,
    'product_id': 0,
    'quantity_change': 0,
    'change_date': 0,
    'reason': 0,
    'created_at': 0},
   'date_validity': {'change_date': {'True': 162}},
   'numeric_validity': {'quantity_change': {True: 162}},
   'negative_validity': {'quantity_change': 0}},
  'orders': {'missing_values': {'order_id': 0,
    'employee_id': 0,
    'customer_id': 250,
    'order_date': 0,
    'total_amount': 0,
    'payment_method': 0,
    'order_status': 0,
    'created_at': 0},
   'date_validity': {'order_date'

## **Recommendations** ##

After reviewing the dataset, the following steps are recommended to improve data quality and ensure consistency for further processing.

1.   **Data Type Conversion:** Convert the `unit_price` and `cost_price` columns in the `products` table from string to integer. This aligns with the integer data type of the `unit_price` column in the `order_detail` table and facilitates numerical analysis.

2.   **Data Cleaning:** Advise users to remove any currency symbols or special characters from the `unit_price` and `cost_price` values in the `products` table. These characters can interfere with accurate numerical calculations.

3.   **Negative Value Handling:** Consult with users regarding the negative values present in the `unit_price` and `cost_price` columns of the `products` table. Determine whether these values should be removed, corrected, or handled in another specific way.

4.   **Missing Customer IDs:** Discuss with users the 250 missing `customer_id` values.  Explore options such as replacing them with a `guest_id`, ignoring them, or attempting to complete the missing data if sufficient information is available.

5.   **"ERROR" Payment Method:** Confirm with users whether the `payment_method = "ERROR"` entries in the `orders` table should be retained as is or corrected to the appropriate payment method.

6.   **Missing Phone Numbers:** Discuss with users how to handle the missing phone number data in the `customer` table. Options include leaving the data as missing, deleting the corresponding records, or planning to update the data at a later time.

7.   **Employee Role Data:** Consult with users about the role `today`, `third`, and `me` columns in the `employees` table.  Decide whether these values should be corrected, kept as they are, or removed entirely.

8.   **Date Format Conversion:** Recommend converting the `hire_date` column in the `employees` table from its current object data type to the `datetime` format. This will enable proper date-based analysis and filtering.

9.   **Data Quality Metrics and Validation Strategy:** Discuss with users a comprehensive strategy for handling missing values, date validity, numeric validity and negative validity throughout the dataset.  This may involve establishing percentage thresholds (e.g., accepting validation data below 5%, while requiring correction above 5%) or other appropriate methods for imputation or removal.
