In [3]:
from etl.extractor.extractor import DataFrameExtractor
from pyspark.sql import SparkSession
from etl.tranform.transform import CustomerDataTransformer,PurchaseHistoryDataTransformer,ProductDataTransformer
from etl.validator_data import ValidatorImpl

from pyspark.sql.types import *
from pyspark.sql.functions import col, countDistinct, when, count, isnan

In [5]:
# # Tạo Spark Session
spark = SparkSession.builder \
    .appName("PySpark MySQL Connection") \
    .master("local[*]") \
    .config("spark.jars", "./connector/mysql-connector-java-8.0.30.jar") \
    .getOrCreate()

**Extractor**

In [6]:
# B1: Trích xuất dữ liệu từ các nguồn
fas_purchase_df = (DataFrameExtractor(spark)
                    .extract("csv", "dataset/fashion_purchase_history.csv"))
customer_df = (DataFrameExtractor(spark)
                    .extract("csv", "dataset/customer.csv"))
product_df = (DataFrameExtractor(spark)
                    .extract("csv", "dataset/products.csv"))

**Transfrom**

In [7]:
# kiểm tra kiểu dữ liệu
print('\033[1m' +"Kiểu dữ liệu:")
customer_df.printSchema()
fas_purchase_df.printSchema()
product_df.printSchema()

# kiểm tra null
print('\033[1m' + "Null:")
ValidatorImpl().check_null_values(customer_df)
ValidatorImpl().check_null_values(fas_purchase_df)
ValidatorImpl().check_null_values(product_df)

# kiểm tra giá trị lặp
print('\033[1m' +"Bản ghi lặp:")
ValidatorImpl().check_duplicate_records(customer_df,['customer_id', 'first_name', 'last_name', 'gender', 'date_of_birth'])
ValidatorImpl().check_duplicate_records(fas_purchase_df,['customer_id', 'item_purchased', 'date_purchase'])
ValidatorImpl().check_duplicate_records(product_df,['item', 'category'])

[1mKiểu dữ liệu:
root
 |-- customer_id: integer (nullable = true)
 |-- first_name: string (nullable = true)
 |-- last_name: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- date_of_birth: timestamp (nullable = true)
 |-- email: string (nullable = true)
 |-- phone_number: string (nullable = true)
 |-- signup_date: timestamp (nullable = true)
 |-- address: string (nullable = true)
 |-- city: string (nullable = true)

root
 |-- customer_id: integer (nullable = true)
 |-- item_purchased: string (nullable = true)
 |-- purchase_amount: double (nullable = true)
 |-- quantity: integer (nullable = true)
 |-- date_purchase: date (nullable = true)
 |-- review_rating: double (nullable = true)
 |-- payment_method: string (nullable = true)

root
 |-- item: string (nullable = true)
 |-- category: string (nullable = true)

[1mNull:
+-----------+----------+---------+------+-------------+-----+------------+-----------+-------+----+
|customer_id|first_name|last_name|gender|date_of_bi

In [8]:
'''
chuyển đổi kiểu dữ liệu:
    - date_of_birth: TimeStamp -> Date
    - date_purchase: Date -> Timestamp
    - review_rating: Double -> Floatx
'''
cleand_customer_df = CustomerDataTransformer().transform(customer_df)

cleand_fas_purchase_df = PurchaseHistoryDataTransformer().transform(fas_purchase_df)

cleand_product_df = ProductDataTransformer().transform(product_df)

In [9]:
# kiểm tra kiểu dữ liệu
cleand_customer_df.printSchema()
cleand_fas_purchase_df.printSchema()
cleand_product_df.printSchema()

root
 |-- customer_id: integer (nullable = true)
 |-- first_name: string (nullable = false)
 |-- last_name: string (nullable = false)
 |-- gender: string (nullable = false)
 |-- date_of_birth: date (nullable = true)
 |-- email: string (nullable = false)
 |-- phone_number: string (nullable = false)
 |-- signup_date: timestamp (nullable = true)
 |-- address: string (nullable = false)
 |-- city: string (nullable = false)

root
 |-- customer_id: integer (nullable = true)
 |-- item_purchased: string (nullable = true)
 |-- purchase_amount: double (nullable = false)
 |-- quantity: integer (nullable = true)
 |-- date_purchase: timestamp (nullable = true)
 |-- review_rating: float (nullable = false)
 |-- payment_method: string (nullable = true)

root
 |-- item: string (nullable = true)
 |-- category: string (nullable = false)



In [10]:
# kiểm tra giá trị lặp
ValidatorImpl().check_duplicate_records(cleand_customer_df,['customer_id', 'first_name', 'last_name', 'gender', 'date_of_birth'])
ValidatorImpl().check_duplicate_records(cleand_fas_purchase_df,['customer_id', 'item_purchased', 'date_purchase'])
ValidatorImpl().check_duplicate_records(cleand_product_df,['item', 'category'])


+-----------+----------+---------+------+-------------+---------------+
|customer_id|first_name|last_name|gender|date_of_birth|duplicate_count|
+-----------+----------+---------+------+-------------+---------------+
+-----------+----------+---------+------+-------------+---------------+

+-----------+--------------+-------------+---------------+
|customer_id|item_purchased|date_purchase|duplicate_count|
+-----------+--------------+-------------+---------------+
+-----------+--------------+-------------+---------------+

+----+--------+---------------+
|item|category|duplicate_count|
+----+--------+---------------+
+----+--------+---------------+



**Load**