# Perkenalan


Milestone 3

Nama  : Hisham Audiputra Suharto
Batch : CODA-006-RMT

Program ini dibuat untuk melakukan automatisasi transform dan load data dari Kaggle ke MongoDB. Adapun dataset yang dipakai adalah dataset mengenai Sales Performance.



# Transform Data

In [None]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import when, col, sum, row_number
from pyspark.sql.window import Window
from great_expectations.data_context import FileDataContext
import shutil
import os

spark = SparkSession.builder \
    .appName("WriteToPostgres") \
    .config("spark.jars.packages", "org.postgresql:postgresql:42.6.0") \
    .getOrCreate()

context = FileDataContext.create(project_root_dir='./')

path_data = "Dataset/Supplement_Sales_Weekly_Expanded.csv"

raw_data = spark.read.csv(path_data, header=True, inferSchema=True)

def Add_column_ID(df, ordercolumnby, nameofnewcolumn):
    # Define window
    ordered = Window.orderBy(ordercolumnby)

    # Add row number column
    df = df.withColumn(nameofnewcolumn, row_number().over(ordered))

    # Reorder columns
    cols = [nameofnewcolumn] + [col for col in df.columns if col != nameofnewcolumn]
    return df.select(cols)

raw_data_Clean = Add_column_ID(raw_data,"Date","id")

def save_as_csv(df, output_dir, output_filename):
    tmp_path = os.path.join(output_dir, "_tmp_output")

    # Write to temporary directory
    df.coalesce(1).write.mode("overwrite").option("header", "true").csv(tmp_path)

    # Find the part file
    for file in os.listdir(tmp_path):
        if file.startswith("part-") and file.endswith(".csv"):
            full_temp_file_path = os.path.join(tmp_path, file)
            break
    else:
        raise FileNotFoundError("CSV part file not found in temp folder.")

    # Move and rename the file
    final_output_path = os.path.join(output_dir, output_filename)
    shutil.move(full_temp_file_path, final_output_path)

    # Clean up temporary folder
    shutil.rmtree(tmp_path)

    print(f"Saved: {final_output_path}")

save_as_csv(raw_data_Clean, "Dataset", "Supplement_Sales_Weekly_Expanded_Clean.csv")

Saved: Dataset/Supplement_Sales_Weekly_Expanded_Clean.csv


In [93]:
raw_data.show()

+----------+------------------+-----------+----------+-----+-------+--------+--------------+--------+--------+
|      Date|      Product Name|   Category|Units Sold|Price|Revenue|Discount|Units Returned|Location|Platform|
+----------+------------------+-----------+----------+-----+-------+--------+--------------+--------+--------+
|2020-01-06|      Whey Protein|    Protein|       143|31.98|4573.14|    0.03|             2|  Canada| Walmart|
|2020-01-06|         Vitamin C|    Vitamin|       139|42.51|5908.89|    0.04|             0|      UK|  Amazon|
|2020-01-06|          Fish Oil|      Omega|       161|12.91|2078.51|    0.25|             0|  Canada|  Amazon|
|2020-01-06|      Multivitamin|    Vitamin|       140|16.07| 2249.8|    0.08|             0|  Canada| Walmart|
|2020-01-06|       Pre-Workout|Performance|       157|35.47|5568.79|    0.25|             3|  Canada|   iHerb|
|2020-01-06|              BCAA| Amino Acid|       154|41.19|6343.26|    0.13|             1|      UK| Walmart|
|

In [92]:
raw_data_Clean.printSchema()

root
 |-- id: integer (nullable = false)
 |-- Date: date (nullable = true)
 |-- Product Name: string (nullable = true)
 |-- Category: string (nullable = true)
 |-- Units Sold: integer (nullable = true)
 |-- Price: double (nullable = true)
 |-- Revenue: double (nullable = true)
 |-- Discount: double (nullable = true)
 |-- Units Returned: integer (nullable = true)
 |-- Location: string (nullable = true)
 |-- Platform: string (nullable = true)



In [86]:
raw_data.show()

+----------+------------------+-----------+----------+-----+-------+--------+--------------+--------+--------+
|      Date|      Product Name|   Category|Units Sold|Price|Revenue|Discount|Units Returned|Location|Platform|
+----------+------------------+-----------+----------+-----+-------+--------+--------------+--------+--------+
|2020-01-06|      Whey Protein|    Protein|       143|31.98|4573.14|    0.03|             2|  Canada| Walmart|
|2020-01-06|         Vitamin C|    Vitamin|       139|42.51|5908.89|    0.04|             0|      UK|  Amazon|
|2020-01-06|          Fish Oil|      Omega|       161|12.91|2078.51|    0.25|             0|  Canada|  Amazon|
|2020-01-06|      Multivitamin|    Vitamin|       140|16.07| 2249.8|    0.08|             0|  Canada| Walmart|
|2020-01-06|       Pre-Workout|Performance|       157|35.47|5568.79|    0.25|             3|  Canada|   iHerb|
|2020-01-06|              BCAA| Amino Acid|       154|41.19|6343.26|    0.13|             1|      UK| Walmart|
|

# Data Validation

In [16]:
# Give a name to a Datasource. This name must be unique between Datasources.
datasource_name = 'supplement_sales'
datasource = context.sources.add_pandas(datasource_name)

# Give a name to a data asset
asset_name = 'sales_performance'
path_to_data = 'Dataset/Supplement_Sales_Weekly_Expanded_Clean.csv'
asset = datasource.add_csv_asset(asset_name, filepath_or_buffer=path_to_data)

# Build batch request
batch_request = asset.build_batch_request()

In [17]:
# Creat an expectation suite
expectation_suite_name = 'expectation-sales_performance_review-dataset'
context.add_or_update_expectation_suite(expectation_suite_name)

# Create a validator using above expectation suite
validator = context.get_validator(
    batch_request = batch_request,
    expectation_suite_name = expectation_suite_name
)

# Check the validator
validator.head()

Calculating Metrics:   0%|          | 0/1 [00:00<?, ?it/s]

Unnamed: 0,id,Date,Product Name,Category,Units Sold,Price,Revenue,Discount,Units Returned,Location,Platform
0,0,2020-01-06,Whey Protein,Protein,143,31.98,4573.14,0.03,2,Canada,Walmart
1,1,2020-01-06,Vitamin C,Vitamin,139,42.51,5908.89,0.04,0,UK,Amazon
2,2,2020-01-06,Fish Oil,Omega,161,12.91,2078.51,0.25,0,Canada,Amazon
3,3,2020-01-06,Multivitamin,Vitamin,140,16.07,2249.8,0.08,0,Canada,Walmart
4,4,2020-01-06,Pre-Workout,Performance,157,35.47,5568.79,0.25,3,Canada,iHerb


In [18]:
validator.expect_column_values_to_be_unique('id')

Calculating Metrics:   0%|          | 0/8 [00:00<?, ?it/s]

{
  "success": true,
  "result": {
    "element_count": 4384,
    "unexpected_count": 0,
    "unexpected_percent": 0.0,
    "partial_unexpected_list": [],
    "missing_count": 0,
    "missing_percent": 0.0,
    "unexpected_percent_total": 0.0,
    "unexpected_percent_nonmissing": 0.0
  },
  "meta": {},
  "exception_info": {
    "raised_exception": false,
    "exception_traceback": null,
    "exception_message": null
  }
}

In [19]:
validator.expect_column_to_exist(column='Revenue')

Calculating Metrics:   0%|          | 0/2 [00:00<?, ?it/s]

{
  "success": true,
  "result": {},
  "meta": {},
  "exception_info": {
    "raised_exception": false,
    "exception_traceback": null,
    "exception_message": null
  }
}

In [20]:
validator.expect_column_values_to_be_in_set('Platform', ["Walmart", "Amazon", "iHerb"])

Calculating Metrics:   0%|          | 0/8 [00:00<?, ?it/s]

{
  "success": true,
  "result": {
    "element_count": 4384,
    "unexpected_count": 0,
    "unexpected_percent": 0.0,
    "partial_unexpected_list": [],
    "missing_count": 0,
    "missing_percent": 0.0,
    "unexpected_percent_total": 0.0,
    "unexpected_percent_nonmissing": 0.0
  },
  "meta": {},
  "exception_info": {
    "raised_exception": false,
    "exception_traceback": null,
    "exception_message": null
  }
}

In [22]:
validator.expect_column_values_to_be_in_type_list('Price', ['integer', 'float'])

Calculating Metrics:   0%|          | 0/1 [00:00<?, ?it/s]

{
  "success": true,
  "result": {
    "observed_value": "float64"
  },
  "meta": {},
  "exception_info": {
    "raised_exception": false,
    "exception_traceback": null,
    "exception_message": null
  }
}

In [23]:
validator.expect_column_mean_to_be_between('Discount', 0, 1)

Calculating Metrics:   0%|          | 0/4 [00:00<?, ?it/s]

{
  "success": true,
  "result": {
    "observed_value": 0.12439781021897811
  },
  "meta": {},
  "exception_info": {
    "raised_exception": false,
    "exception_traceback": null,
    "exception_message": null
  }
}

In [24]:
validator.expect_column_values_to_be_in_type_list('Revenue', ['integer', 'float'])

Calculating Metrics:   0%|          | 0/1 [00:00<?, ?it/s]

{
  "success": true,
  "result": {
    "observed_value": "float64"
  },
  "meta": {},
  "exception_info": {
    "raised_exception": false,
    "exception_traceback": null,
    "exception_message": null
  }
}

In [25]:
validator.expect_column_values_to_be_in_type_list('Discount', ['integer', 'float'])

Calculating Metrics:   0%|          | 0/1 [00:00<?, ?it/s]

{
  "success": true,
  "result": {
    "observed_value": "float64"
  },
  "meta": {},
  "exception_info": {
    "raised_exception": false,
    "exception_traceback": null,
    "exception_message": null
  }
}

In [26]:
# Create a checkpoint
checkpoint = context.add_or_update_checkpoint(
    name = 'checkpoint',
    validator = validator,
)

In [28]:
# Run a checkpoint
checkpoint_result = checkpoint.run()

Calculating Metrics: 0it [00:00, ?it/s]

In [62]:
save_as_csv(raw_data_Clean, "Dataset", "Supplement_Sales_Weekly_Expanded_Clean.csv")

Saved: Dataset/Supplement_Sales_Weekly_Expanded_Clean.csv


# Google Slide

https://docs.google.com/presentation/d/1cCizkblByNhYRDtWuIUFWLRygMQSWQ4xemJdUG3GshY/edit?usp=sharing