'''
=================================================

Milestone 3

Nama : Afif Makruf<br>
Batch : RMT-002

Program ini dibuat untuk melakukan explorasi sederhana terhadap data penjualan merchandise influencer pada rentang tahun 2023-2024, serta pembuatan Great Expectation.<br> 
Hasil dari eksplorasi data ini akan digunakan sebagai acuan proses ETL

'''
=================================================

# Eksplorasi Data

Import library yang dibutuhkan

In [1]:
import pyspark
from pyspark.sql import SparkSession

Mulai proses eksplorasi data

In [2]:
# Inisialisasi Spark Session
spark = SparkSession.Builder().getOrCreate()

In [3]:
dr = spark.read.csv('merch_sales.csv', header=True, inferSchema=True)
dr.show(5, vertical=True) # vertical=True --> untuk menampilkan kolom dengan view vertical, agar dapat terlihat jelas berapa kolom dan bagaimana datanya

-RECORD 0--------------------------------------
 Order ID               | 189440               
 Order Date             | 2024-07-21           
 Product ID             | BF1543               
 Product Category       | Clothing             
 Buyer Gender           | Male                 
 Buyer Age              | 30                   
 Order Location         | New Jersey           
 International Shipping | No                   
 Sales Price            | 100                  
 Shipping Charges       | 0                    
 Sales per Unit         | 100                  
 Quantity               | 1                    
 Total Sales            | 100                  
 Rating                 | 4                    
 Review                 | The delivery team... 
-RECORD 1--------------------------------------
 Order ID               | 187385               
 Order Date             | 2024-07-20           
 Product ID             | BF1543               
 Product Category       | Clothing      

In [4]:
dr.printSchema()
dr.count()

root
 |-- Order ID: integer (nullable = true)
 |-- Order Date: date (nullable = true)
 |-- Product ID: string (nullable = true)
 |-- Product Category: string (nullable = true)
 |-- Buyer Gender: string (nullable = true)
 |-- Buyer Age: integer (nullable = true)
 |-- Order Location: string (nullable = true)
 |-- International Shipping: string (nullable = true)
 |-- Sales Price: integer (nullable = true)
 |-- Shipping Charges: integer (nullable = true)
 |-- Sales per Unit: integer (nullable = true)
 |-- Quantity: integer (nullable = true)
 |-- Total Sales: integer (nullable = true)
 |-- Rating: integer (nullable = true)
 |-- Review: string (nullable = true)



7394

In [5]:
dr.describe().show(vertical=True)

-RECORD 0------------------------------------
 summary                | count              
 Order ID               | 7394               
 Product ID             | 7394               
 Product Category       | 7394               
 Buyer Gender           | 7394               
 Buyer Age              | 7394               
 Order Location         | 7394               
 International Shipping | 7394               
 Sales Price            | 7394               
 Shipping Charges       | 7394               
 Sales per Unit         | 7394               
 Quantity               | 7394               
 Total Sales            | 7394               
 Rating                 | 7394               
 Review                 | 7394               
-RECORD 1------------------------------------
 summary                | mean               
 Order ID               | 159735.13402758993 
 Product ID             | NULL               
 Product Category       | NULL               
 Buyer Gender           | NULL    

In [6]:
# Cek Duplicated Row
dr.exceptAll(dr.dropDuplicates()).show(vertical=True)

(0 rows)



## Kesimpulan

Berdasarkan eksplorasi data yang telah dilakukan, didapatkan bahwa:
- Terdapat 15 kolom
- Terdapat 7394 baris
- Tidak terdapat missing value
- Tipe data tiap kolom sudah sesuai
- Tidak terdapat duplicated data
- Nama kolom masih belum sesuai dengan snake case rule

Hal yang perlu dilakukan pada proses Transform:
- Pengubahan nama kolom menjadi snake case rule

# Great Expectation

## Instantiate Data Context

Install Module and Import library

In [1]:
# Install Library
!pip install -q "great-expectations==0.18.19", "numpy<2"

In [2]:
# Import Library
from great_expectations.data_context import FileDataContext

In [3]:
context = FileDataContext.create(project_root_dir='./')

## Connect to Datasource

In [4]:
# Give a name to a Datasource. This name must be unique between Datasources.
datasource_name = 'csv-merch-sales-2023-2024'
datasource = context.sources.add_pandas(datasource_name)

# Give a name to a data asset
asset_name = 'sales-2023-2024'
path_to_data = '/Users/afifmakruf/Desktop/Hacktiv8/RMT-002/Repo/airflow-with-spark/logs/data_staging/P2M3_afif_makruf_data_cleaned.csv'
asset = datasource.add_csv_asset(asset_name, filepath_or_buffer=path_to_data)

# Build batch request
batch_request = asset.build_batch_request()

## Create an Expectation Suite and Validator

In [5]:
# Creat an expectation suite
expectation_suite_name = 'expectation-merch-dataset'
context.add_or_update_expectation_suite(expectation_suite_name)

# Create a validator using above expectation suite
validator = context.get_validator(
    batch_request = batch_request,
    expectation_suite_name = expectation_suite_name
)

# Check the validator
validator.head()

Calculating Metrics:   0%|          | 0/1 [00:00<?, ?it/s]

Unnamed: 0,order_id,order_date,product_id,product_category,buer_gender,buyer_age,order_location,international_shipping,sales_price,shipping_charges,sales_per_unit,quantity,total_sales,rating,review
0,189440,2024-07-21,BF1543,Clothing,Male,30,New Jersey,No,100,0,100,1,100,4,The delivery team handled the product with care.
1,187385,2024-07-20,BF1543,Clothing,Male,32,Las Vegas,No,100,0,100,1,100,3,Had slight delays but the product was in good ...
2,181844,2024-07-21,BF1544,Other,Female,26,Cardiff,Yes,9,40,49,1,49,2,Waste of Money.
3,197934,2024-08-19,BF1544,Other,Male,28,Pittsburgh,No,9,0,9,2,18,3,Had slight delays but the product was in good ...
4,122470,2024-01-06,BF1545,Other,Female,19,Miami,No,10,0,10,3,30,5,Lack of delivery delays is greatly appreciated.


## Expectations

Expectation 1 : Column `order_id` must be unique

In [6]:
validator.expect_column_values_to_be_unique('order_id')

Calculating Metrics:   0%|          | 0/8 [00:00<?, ?it/s]

{
  "success": true,
  "result": {
    "element_count": 7394,
    "unexpected_count": 0,
    "unexpected_percent": 0.0,
    "partial_unexpected_list": [],
    "missing_count": 0,
    "missing_percent": 0.0,
    "unexpected_percent_total": 0.0,
    "unexpected_percent_nonmissing": 0.0
  },
  "meta": {},
  "exception_info": {
    "raised_exception": false,
    "exception_traceback": null,
    "exception_message": null
  }
}

Expectation 2 : Column `buyer_age` value type must be float or int

In [7]:
validator.expect_column_values_to_be_in_type_list(
    column='buyer_age',
    type_list=['FLOAT', 'INT']
)
#Integer saja

Calculating Metrics:   0%|          | 0/1 [00:00<?, ?it/s]

{
  "success": true,
  "result": {
    "observed_value": "int64"
  },
  "meta": {},
  "exception_info": {
    "raised_exception": false,
    "exception_traceback": null,
    "exception_message": null
  }
}

Expectation 2 : Column `sales_price` value type must be float or int

In [8]:
validator.expect_column_values_to_be_in_type_list(
    column='sales_price',
    type_list=['FLOAT', 'INT']
)

Calculating Metrics:   0%|          | 0/1 [00:00<?, ?it/s]

{
  "success": true,
  "result": {
    "observed_value": "int64"
  },
  "meta": {},
  "exception_info": {
    "raised_exception": false,
    "exception_traceback": null,
    "exception_message": null
  }
}

Expectation 2 : Column `shipping_charges` value type must be float or int

In [9]:
validator.expect_column_values_to_be_in_type_list(
    column='shipping_charges',
    type_list=['FLOAT', 'INT']
)

Calculating Metrics:   0%|          | 0/1 [00:00<?, ?it/s]

{
  "success": true,
  "result": {
    "observed_value": "int64"
  },
  "meta": {},
  "exception_info": {
    "raised_exception": false,
    "exception_traceback": null,
    "exception_message": null
  }
}

Expectation 2 : Column `sales_per_unit` value type must be float or int

In [10]:
validator.expect_column_values_to_be_in_type_list(
    column='sales_per_unit',
    type_list=['FLOAT', 'INT']
)

Calculating Metrics:   0%|          | 0/1 [00:00<?, ?it/s]

{
  "success": true,
  "result": {
    "observed_value": "int64"
  },
  "meta": {},
  "exception_info": {
    "raised_exception": false,
    "exception_traceback": null,
    "exception_message": null
  }
}

Expectation 2 : Column `quantity` value type must be float or int

In [11]:
validator.expect_column_values_to_be_in_type_list(
    column='quantity',
    type_list=['FLOAT', 'INT']
)
# integer

Calculating Metrics:   0%|          | 0/1 [00:00<?, ?it/s]

{
  "success": true,
  "result": {
    "observed_value": "int64"
  },
  "meta": {},
  "exception_info": {
    "raised_exception": false,
    "exception_traceback": null,
    "exception_message": null
  }
}

Expectation 2 : Column `total_sales` value type must be float or int

In [12]:
validator.expect_column_values_to_be_in_type_list(
    column='total_sales',
    type_list=['FLOAT', 'INT']
)

Calculating Metrics:   0%|          | 0/1 [00:00<?, ?it/s]

{
  "success": true,
  "result": {
    "observed_value": "int64"
  },
  "meta": {},
  "exception_info": {
    "raised_exception": false,
    "exception_traceback": null,
    "exception_message": null
  }
}

Expectation 2 : Column `rating` value type must be float or int

In [13]:
validator.expect_column_values_to_be_in_type_list(
    column='rating',
    type_list=['FLOAT', 'INT']
)
#Integer

Calculating Metrics:   0%|          | 0/1 [00:00<?, ?it/s]

{
  "success": true,
  "result": {
    "observed_value": "int64"
  },
  "meta": {},
  "exception_info": {
    "raised_exception": false,
    "exception_traceback": null,
    "exception_message": null
  }
}

Expectation 3 : Column `total_sales` must be exist to calculate the amount of transaction costs that must be paid

In [14]:
validator.expect_column_to_exist(column='total_sales')

Calculating Metrics:   0%|          | 0/2 [00:00<?, ?it/s]

{
  "success": true,
  "result": {},
  "meta": {},
  "exception_info": {
    "raised_exception": false,
    "exception_traceback": null,
    "exception_message": null
  }
}

Expectation 4 : Column `product_category` must contain one of the following 3 things :
1. Clothing
2. Other
3. Ornaments

In [15]:
validator.expect_column_values_to_be_in_set('product_category', ['Clothing', 'Other', 'Ornaments'])

Calculating Metrics:   0%|          | 0/8 [00:00<?, ?it/s]

{
  "success": true,
  "result": {
    "element_count": 7394,
    "unexpected_count": 0,
    "unexpected_percent": 0.0,
    "partial_unexpected_list": [],
    "missing_count": 0,
    "missing_percent": 0.0,
    "unexpected_percent_total": 0.0,
    "unexpected_percent_nonmissing": 0.0
  },
  "meta": {},
  "exception_info": {
    "raised_exception": false,
    "exception_traceback": null,
    "exception_message": null
  }
}

Expectation 5 : Column `total_sales` must be greater or equal than Column `sales_price`

In [16]:
validator.expect_column_pair_values_a_to_be_greater_than_b(column_A='total_sales',column_B='sales_price', or_equal=True)

Calculating Metrics:   0%|          | 0/7 [00:00<?, ?it/s]

{
  "success": true,
  "result": {
    "element_count": 7394,
    "unexpected_count": 0,
    "unexpected_percent": 0.0,
    "partial_unexpected_list": [],
    "missing_count": 0,
    "missing_percent": 0.0,
    "unexpected_percent_total": 0.0,
    "unexpected_percent_nonmissing": 0.0
  },
  "meta": {},
  "exception_info": {
    "raised_exception": false,
    "exception_traceback": null,
    "exception_message": null
  }
}

Expectation 6 : Column `rating` value must be between 1 to 5

In [17]:
validator.expect_column_values_to_be_between(column="rating", min_value=1, max_value=5)

Calculating Metrics:   0%|          | 0/8 [00:00<?, ?it/s]

{
  "success": true,
  "result": {
    "element_count": 7394,
    "unexpected_count": 0,
    "unexpected_percent": 0.0,
    "partial_unexpected_list": [],
    "missing_count": 0,
    "missing_percent": 0.0,
    "unexpected_percent_total": 0.0,
    "unexpected_percent_nonmissing": 0.0
  },
  "meta": {},
  "exception_info": {
    "raised_exception": false,
    "exception_traceback": null,
    "exception_message": null
  }
}

Expectation 7 : Column `order_date` value format must be in Year-Month-Day

In [18]:
validator.expect_column_values_to_match_strftime_format('order_date', '%Y-%m-%d')

Calculating Metrics:   0%|          | 0/8 [00:00<?, ?it/s]

{
  "success": true,
  "result": {
    "element_count": 7394,
    "unexpected_count": 0,
    "unexpected_percent": 0.0,
    "partial_unexpected_list": [],
    "missing_count": 0,
    "missing_percent": 0.0,
    "unexpected_percent_total": 0.0,
    "unexpected_percent_nonmissing": 0.0
  },
  "meta": {},
  "exception_info": {
    "raised_exception": false,
    "exception_traceback": null,
    "exception_message": null
  }
}

Expectation 8 : Column `product_id` must be starting from BF

In [19]:
validator.expect_column_values_to_match_regex('product_id', '^BF.*')

Calculating Metrics:   0%|          | 0/8 [00:00<?, ?it/s]

{
  "success": true,
  "result": {
    "element_count": 7394,
    "unexpected_count": 0,
    "unexpected_percent": 0.0,
    "partial_unexpected_list": [],
    "missing_count": 0,
    "missing_percent": 0.0,
    "unexpected_percent_total": 0.0,
    "unexpected_percent_nonmissing": 0.0
  },
  "meta": {},
  "exception_info": {
    "raised_exception": false,
    "exception_traceback": null,
    "exception_message": null
  }
}

Save into Expectation Suite

In [20]:
validator.save_expectation_suite(discard_failed_expectations=False)

Checkpoint

In [21]:
checkpoint_1 = context.add_or_update_checkpoint(
    name = 'checkpoint_1',
    validator = validator,
)

Run Checkpoint

In [22]:
checkpoint_result = checkpoint_1.run()

Calculating Metrics:   0%|          | 0/44 [00:00<?, ?it/s]

Build Data Docs

In [23]:
context.build_data_docs()

{'local_site': 'file:///Users/afifmakruf/Desktop/Hacktiv8/RMT-002/Repo/p2-coda002-rmt-m3-afifmakruf/gx/uncommitted/data_docs/local_site/index.html'}

## Kesimpulan

Berdasarkan Expectation yang telah dibuat, maka perlu beberapa hal yang dilakukan pada proses transform, diantaranya:
- Kolom dengan jenis data numerik akan dilakukan validasi. Apabila terdapat data yang masuk dan tidak bertipe data integer atau float, maka akan dilakukan penyesuaian
- Kolom `order_date` memiliki format yang telah ditentukan, yakni %Y%m%d. Apabila terdapat data yang masuk dan belum sesuai, maka akan dilakukan penyesuaian