# Great Expectation
## Data Pipeline Automation with PySpark, Airflow, and MongoDB

Program ini dibuat untuk membuat sistem automasi data pipeline (ETL) dengan mengkombinasikan penggunaan PySpark, Airflow, dan MongoDB.

In [1]:
# Import Libraries
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, count, when
from great_expectations.data_context import FileDataContext
from great_expectations.core.batch import RuntimeBatchRequest
import great_expectations as gx

# Spark Session Initialization
spark = SparkSession.builder \
    .appName("RetailTransactionAnalysis") \
    .config("spark.driver.memory", "4g") \
    .getOrCreate()

# Create Dataframe from Retail Dataset

In [None]:
retail_transaction = spark.read.csv('Data_Raw.csv', header=True, inferSchema=True, sep=",", quote='"', escape='\\', multiLine=True)
retail_transaction.show()

+----------+---------+--------+-----------+----------------+-------------+--------------------+---------------+------------------+-----------+
|CustomerID|ProductID|Quantity|      Price| TransactionDate|PaymentMethod|       StoreLocation|ProductCategory|DiscountApplied(%)|TotalAmount|
+----------+---------+--------+-----------+----------------+-------------+--------------------+---------------+------------------+-----------+
|    109318|        C|       7|80.07984415|12/26/2023 12:32|         Cash|176 Andrew Cliffs...|          Books|        18.6770995|455.8627638|
|    993229|        C|       4|75.19522942|   8/5/2023 0:00|         Cash|11635 William Wel...|     Home Decor|       14.12136502|258.3065464|
|    579675|        A|       8|31.52881648| 3/11/2024 18:51|         Cash|910 Mendez Ville ...|          Books|       15.94370066|212.0156509|
|    799826|        D|       5|98.88021828|10/27/2023 22:00|       PayPal|87522 Sharon Corn...|          Books|        6.68633678|461.3437694|

In [22]:
from pyspark.sql.functions import regexp_replace

retail_transaction = retail_transaction.withColumn(
    "StoreLocation",
    regexp_replace("StoreLocation", r"\n", " ")
)

In [23]:
retail_transaction.show()

+----------+---------+--------+-----------+----------------+-------------+--------------------+---------------+------------------+-----------+------------------+------------------+
|CustomerID|ProductID|Quantity|      Price| TransactionDate|PaymentMethod|       StoreLocation|ProductCategory|DiscountApplied(%)|TotalAmount|     ExpectedTotal| UndiscountedPrice|
+----------+---------+--------+-----------+----------------+-------------+--------------------+---------------+------------------+-----------+------------------+------------------+
|    109318|        C|       7|80.07984415|12/26/2023 12:32|         Cash|176 Andrew Cliffs...|          Books|        18.6770995|455.8627638|  455.862763850617|      560.55890905|
|    993229|        C|       4|75.19522942|   8/5/2023 0:00|         Cash|11635 William Wel...|     Home Decor|       14.12136502|258.3065464| 258.3065463839015|      300.78091768|
|    579675|        A|       8|31.52881648| 3/11/2024 18:51|         Cash|910 Mendez Ville ...|

# Simple Data Exploration

In [24]:
retail_transaction.printSchema()

root
 |-- CustomerID: integer (nullable = true)
 |-- ProductID: string (nullable = true)
 |-- Quantity: integer (nullable = true)
 |-- Price: double (nullable = true)
 |-- TransactionDate: string (nullable = true)
 |-- PaymentMethod: string (nullable = true)
 |-- StoreLocation: string (nullable = true)
 |-- ProductCategory: string (nullable = true)
 |-- DiscountApplied(%): double (nullable = true)
 |-- TotalAmount: double (nullable = true)
 |-- ExpectedTotal: double (nullable = true)
 |-- UndiscountedPrice: double (nullable = true)



# Great Expectation

In [25]:
import great_expectations as gx
import os

# Initialize Data Context in current directory
context = gx.get_context(project_root_dir='./')

In [None]:
# Define unique datasource and asset name
datasource_name = "local_csv_retail"
asset_name = "retail_transaction"

# Path to local file (adjust if in another folder)
path_to_data = "Data_Raw.csv"

# Add pandas datasource
if datasource_name in context.list_datasources():
    datasource = context.get_datasource(datasource_name)
else:
    datasource = context.sources.add_pandas(datasource_name)

# Add CSV asset from local file
asset = datasource.add_csv_asset(name=asset_name, filepath_or_buffer=path_to_data)

# Build batch request
batch_request = asset.build_batch_request()

In [41]:
# Define expectation suite name
expectation_suite_name = "expectation_retail_transaction"

# Create the suite
context.add_or_update_expectation_suite(expectation_suite_name=expectation_suite_name)

# Create validator
validator = context.get_validator(
    batch_request=batch_request,
    expectation_suite_name=expectation_suite_name
)

# Preview data
validator.head()

Calculating Metrics:   0%|          | 0/1 [00:00<?, ?it/s]

Unnamed: 0,CustomerID,ProductID,Quantity,Price,TransactionDate,PaymentMethod,StoreLocation,ProductCategory,DiscountApplied(%),TotalAmount
0,109318,C,7,80.079844,12/26/2023 12:32,Cash,"176 Andrew Cliffs\nBaileyfort, HI 93354",Books,18.6771,455.862764
1,993229,C,4,75.195229,8/5/2023 0:00,Cash,"11635 William Well Suite 809\nEast Kara, MT 19483",Home Decor,14.121365,258.306546
2,579675,A,8,31.528816,3/11/2024 18:51,Cash,"910 Mendez Ville Suite 909\nPort Lauraland, MO...",Books,15.943701,212.015651
3,799826,D,5,98.880218,10/27/2023 22:00,PayPal,"87522 Sharon Corners Suite 500\nLake Tammy, MO...",Books,6.686337,461.343769
4,121413,A,7,93.188512,12/22/2023 11:38,Cash,"0070 Michelle Island Suite 143\nHoland, VA 80142",Electronics,4.030096,626.030484


## Expectation

In [11]:
# 1. Expect composite key uniqueness (CustomerID + TransactionDate + ProductID)
validator.expect_compound_columns_to_be_unique(
    column_list=["CustomerID", "TransactionDate", "ProductID"],
    ignore_row_if="any_value_is_missing",
    meta={
        "business_rule": "Each product purchase per transaction should be unique"
    }
)

Calculating Metrics:   0%|          | 0/7 [00:00<?, ?it/s]

{
  "success": true,
  "result": {
    "element_count": 100000,
    "unexpected_count": 0,
    "unexpected_percent": 0.0,
    "partial_unexpected_list": [],
    "missing_count": 0,
    "missing_percent": 0.0,
    "unexpected_percent_total": 0.0,
    "unexpected_percent_nonmissing": 0.0
  },
  "meta": {},
  "exception_info": {
    "raised_exception": false,
    "exception_traceback": null,
    "exception_message": null
  }
}

In [12]:
# 2. Expect quantity to be minimum 1
validator.expect_column_values_to_be_between("Quantity", min_value = 1)

Calculating Metrics:   0%|          | 0/8 [00:00<?, ?it/s]

{
  "success": true,
  "result": {
    "element_count": 100000,
    "unexpected_count": 0,
    "unexpected_percent": 0.0,
    "partial_unexpected_list": [],
    "missing_count": 0,
    "missing_percent": 0.0,
    "unexpected_percent_total": 0.0,
    "unexpected_percent_nonmissing": 0.0
  },
  "meta": {},
  "exception_info": {
    "raised_exception": false,
    "exception_traceback": null,
    "exception_message": null
  }
}

In [13]:
# 3. Expect PaymentMethod column to contain only a known set of values
valid_payment = ["Cash", "PayPal", "Credit Card", "Debit Card"]
validator.expect_column_values_to_be_in_set("PaymentMethod", valid_payment)

Calculating Metrics:   0%|          | 0/8 [00:00<?, ?it/s]

{
  "success": true,
  "result": {
    "element_count": 100000,
    "unexpected_count": 0,
    "unexpected_percent": 0.0,
    "partial_unexpected_list": [],
    "missing_count": 0,
    "missing_percent": 0.0,
    "unexpected_percent_total": 0.0,
    "unexpected_percent_nonmissing": 0.0
  },
  "meta": {},
  "exception_info": {
    "raised_exception": false,
    "exception_traceback": null,
    "exception_message": null
  }
}

In [14]:
# 4. Validate numeric DiscountApplied(%)
validator.expect_column_values_to_be_of_type(
    column="DiscountApplied(%)",
    type_="float64",
    meta={
        "data_quality": "Must be numeric for calculations"
    }
)

Calculating Metrics:   0%|          | 0/1 [00:00<?, ?it/s]

{
  "success": true,
  "result": {
    "observed_value": "float64"
  },
  "meta": {},
  "exception_info": {
    "raised_exception": false,
    "exception_traceback": null,
    "exception_message": null
  }
}

In [15]:
# 5. Expect TransactionDate to be properly formatted datetime
validator.expect_column_values_to_match_strftime_format(
    column="TransactionDate",
    strftime_format="%m/%d/%Y %H:%M",  
)

Calculating Metrics:   0%|          | 0/8 [00:00<?, ?it/s]

{
  "success": true,
  "result": {
    "element_count": 100000,
    "unexpected_count": 0,
    "unexpected_percent": 0.0,
    "partial_unexpected_list": [],
    "missing_count": 0,
    "missing_percent": 0.0,
    "unexpected_percent_total": 0.0,
    "unexpected_percent_nonmissing": 0.0
  },
  "meta": {},
  "exception_info": {
    "raised_exception": false,
    "exception_traceback": null,
    "exception_message": null
  }
}

In [16]:
# 6. ProductCategory name length should be reasonable
validator.expect_column_value_lengths_to_be_between(
    column="ProductCategory",
    min_value=3,
    max_value=30,
    meta={
        "description": "Category name should have reasonable character length"
    }
)

Calculating Metrics:   0%|          | 0/9 [00:00<?, ?it/s]

{
  "success": true,
  "result": {
    "element_count": 100000,
    "unexpected_count": 0,
    "unexpected_percent": 0.0,
    "partial_unexpected_list": [],
    "missing_count": 0,
    "missing_percent": 0.0,
    "unexpected_percent_total": 0.0,
    "unexpected_percent_nonmissing": 0.0
  },
  "meta": {},
  "exception_info": {
    "raised_exception": false,
    "exception_traceback": null,
    "exception_message": null
  }
}

In [17]:
# 7. Check if Discount (%) is Logical
validator.expect_column_value_lengths_to_be_between(
    column="DiscountApplied(%)",
    min_value=0,
    max_value=100,
    meta={
        "description": "Discount (%) should be between 0 and 100"
    }
)

Calculating Metrics:   0%|          | 0/9 [00:00<?, ?it/s]

{
  "success": true,
  "result": {
    "element_count": 100000,
    "unexpected_count": 0,
    "unexpected_percent": 0.0,
    "partial_unexpected_list": [],
    "missing_count": 0,
    "missing_percent": 0.0,
    "unexpected_percent_total": 0.0,
    "unexpected_percent_nonmissing": 0.0
  },
  "meta": {},
  "exception_info": {
    "raised_exception": false,
    "exception_traceback": null,
    "exception_message": null
  }
}

In [44]:
# 8. Validate store location format
validator.expect_column_values_to_match_regex(
    column="StoreLocation",
    regex=r"^.+\n.+,\s[A-Z]{2}\s\d{5}$",
    mostly=0.95,
    meta={
        "format": "Address line 1 City, ST ZIPCODE"
    }
)

Calculating Metrics:   0%|          | 0/8 [00:00<?, ?it/s]

{
  "success": false,
  "result": {
    "element_count": 100000,
    "unexpected_count": 10806,
    "unexpected_percent": 10.806000000000001,
    "partial_unexpected_list": [
      "USNV Harrell\nFPO AA 62814",
      "PSC 1498, Box 4142\nAPO AP 10928",
      "Unit 7268 Box 3644\nDPO AP 43969",
      "USNS David\nFPO AE 12953",
      "Unit 4486 Box 3431\nDPO AE 41617",
      "PSC 8454, Box 4823\nAPO AE 17356",
      "Unit 5493 Box 4915\nDPO AE 46180",
      "Unit 4248 Box 3478\nDPO AP 26267",
      "PSC 4308, Box 2125\nAPO AE 53765",
      "PSC 3555, Box 8474\nAPO AA 67962",
      "Unit 1535 Box 5709\nDPO AP 57706",
      "Unit 9800 Box 8766\nDPO AE 93292",
      "PSC 9458, Box 9421\nAPO AA 84039",
      "USNS Jackson\nFPO AA 77311",
      "Unit 4152 Box 6862\nDPO AA 32838",
      "PSC 5089, Box 2406\nAPO AE 06601",
      "Unit 9796 Box 6648\nDPO AA 42931",
      "PSC 5669, Box 2093\nAPO AE 56470",
      "Unit 3436 Box 2527\nDPO AP 61328",
      "PSC 6618, Box 0807\nAPO AA 20014"
    ],

In [45]:
# Save into Expectation Suite

validator.save_expectation_suite(discard_failed_expectations=False)

## Checkpoint

In [46]:
# Create a checkpoint

checkpoint_1 = context.add_or_update_checkpoint(
    name = 'checkpoint_1',
    validator = validator,
)

In [47]:
# Run a checkpoint

checkpoint_result = checkpoint_1.run()

Calculating Metrics:   0%|          | 0/10 [00:00<?, ?it/s]