<a href="https://www.kaggle.com/code/duysakura/visualization-2-0?scriptVersionId=294200800" target="_blank"><img align="left" alt="Kaggle" title="Open in Kaggle" src="https://kaggle.com/static/images/open-in-kaggle.svg"></a>

In [1]:
!pip install pyspark



In [2]:
import boto3
import sys
from pyspark.sql import SparkSession
from pyspark.sql.functions import col
from kaggle_secrets import UserSecretsClient

In [3]:
user_secrets = UserSecretsClient()
MINIO_ACCESS_KEY = user_secrets.get_secret("MINIO_ACCESS_KEY")
MINIO_ENDPOINT = user_secrets.get_secret("MINIO_ENDPOINT")
MINIO_SECRET_KEY = user_secrets.get_secret("MINIO_SECRET_KEY")
MINIO_BUCKET_NAME = 'olist-data'
MINIO_CONFIG = {
    'endpoint_url': MINIO_ENDPOINT,
    'aws_access_key_id': MINIO_ACCESS_KEY,
    'aws_secret_access_key': MINIO_SECRET_KEY
}

MYSQL_URL = user_secrets.get_secret("MYSQL_URL")
MYSQL_PROPERTIES = {
    'user': user_secrets.get_secret("MYSQL_USER"),
    'password': user_secrets.get_secret("MYSQL_PASSWORD"),
    'driver': user_secrets.get_secret("MYSQL_DRIVER")
}

In [4]:
def write_to_mysql(df, table_name):
    try:
        jdbc_url = MYSQL_URL

        params = [
            "rewriteBatchedStatements=true",
            "allowPublicKeyRetrieval=true",
            "useSSL=false"
        ]

        if "?" in jdbc_url:
            jdbc_url = jdbc_url + "&" + "&".join(params)
        else:
            jdbc_url = jdbc_url + "?" + "&".join(params)

        df.coalesce(4).write.jdbc(
            url=jdbc_url,
            table=table_name,
            mode='overwrite',
            properties={
                **MYSQL_PROPERTIES,
                "batchsize": "5000",
                "isolationLevel": "NONE"
            }
        )
        
        print(f"Lưu bảng {table_name} thành công")
        
    except Exception as e:
        print(f"Lỗi khi ghi bảng {table_name}: {e}")

In [5]:
packages = [
    "org.apache.hadoop:hadoop-aws:3.3.4",            
    "com.amazonaws:aws-java-sdk-bundle:1.12.540",
    "com.mysql:mysql-connector-j:8.2.0"
]

spark = SparkSession.builder \
    .appName("Olist's Brazilian E-Commerce") \
    .config("spark.jars.packages", ",".join(packages)) \
    .config("spark.hadoop.fs.s3a.endpoint", MINIO_ENDPOINT) \
    .config("spark.hadoop.fs.s3a.access.key", MINIO_ACCESS_KEY) \
    .config("spark.hadoop.fs.s3a.secret.key", MINIO_SECRET_KEY) \
    .config("spark.hadoop.fs.s3a.path.style.access", "true") \
    .config("spark.hadoop.fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem") \
    .config("spark.hadoop.fs.s3a.connection.ssl.enabled", "true") \
    .config("spark.hadoop.fs.s3a.aws.credentials.provider", "org.apache.hadoop.fs.s3a.SimpleAWSCredentialsProvider") \
    .config("spark.hadoop.fs.s3a.fast.upload", "true") \
    .master("local[*]") \
    .getOrCreate()

spark.sparkContext.setLogLevel("WARN")

:: loading settings :: url = jar:file:/usr/local/lib/python3.12/dist-packages/pyspark/jars/ivy-2.5.1.jar!/org/apache/ivy/core/settings/ivysettings.xml


Ivy Default Cache set to: /root/.ivy2/cache
The jars for the packages stored in: /root/.ivy2/jars
org.apache.hadoop#hadoop-aws added as a dependency
com.amazonaws#aws-java-sdk-bundle added as a dependency
com.mysql#mysql-connector-j added as a dependency
:: resolving dependencies :: org.apache.spark#spark-submit-parent-370df4b7-cf1e-4c39-a643-2730986f27a1;1.0
	confs: [default]
	found org.apache.hadoop#hadoop-aws;3.3.4 in central
	found org.wildfly.openssl#wildfly-openssl;1.0.7.Final in central
	found com.amazonaws#aws-java-sdk-bundle;1.12.540 in central
	found com.mysql#mysql-connector-j;8.2.0 in central
	found com.google.protobuf#protobuf-java;3.21.9 in central
downloading https://repo1.maven.org/maven2/org/apache/hadoop/hadoop-aws/3.3.4/hadoop-aws-3.3.4.jar ...
	[SUCCESSFUL ] org.apache.hadoop#hadoop-aws;3.3.4!hadoop-aws.jar (113ms)
downloading https://repo1.maven.org/maven2/com/amazonaws/aws-java-sdk-bundle/1.12.540/aws-java-sdk-bundle-1.12.540.jar ...
	[SUCCESSFUL ] com.amazonaws#a

In [6]:
s3 = boto3.client('s3', **MINIO_CONFIG)

try:
    response = s3.list_objects_v2(Bucket=MINIO_BUCKET_NAME)

    if 'Contents' not in response:
        print("Bucket rỗng")
        sys.exit(0)

    # I. Load dữ liệu thô từ MinIO và chuyển thành bảng
    base_url = f"s3a://{MINIO_BUCKET_NAME}"
    for obj in response['Contents']:
        file_name = obj['Key']

        if not file_name.endswith('.csv'):
            continue

        df = spark.read.csv(f"{base_url}/{file_name}", header=True, multiLine=True, inferSchema=True)
        table_name = file_name.replace('olist_', '').replace('_dataset.csv', '').replace('.csv', '')
        df.createOrReplaceTempView(table_name)

    # II. Làm sạch dữ liệu
    # 1. Chuyển tên sản phẩm trong products về tiếng Anh
    df_clean_products = spark.sql("""
        select 
            p.product_id,
            coalesce(pt.product_category_name_english, p.product_category_name, 'unknown') product_category_name,
            p.product_photos_qty,
            p.product_weight_g,
            p.product_length_cm,
            p.product_height_cm,
            p.product_width_cm
        from products p
        left join product_category_name_translation pt on p.product_category_name = pt.product_category_name
    """)
    df_clean_products.createOrReplaceTempView("products")

    # 2. Chuẩn hóa geolocation với tọa độ là trung bình các tọa độ và tên thành phố theo chuẩn Title Case, không dấu
    df_clean_geolocation = spark.sql("""
        select 
            geolocation_zip_code_prefix,
            avg(geolocation_lat) geolocation_lat,
            avg(geolocation_lng) geolocation_lng,
            initcap(
                translate(
                    first(geolocation_city), 
                    'áàảãạâấầẩẫậăắằẳẵặéèẻẽẹêếềểễệíìỉĩịóòỏõọôốồổỗộơớờởỡợúùủũụưứừửữựýỳỷỹỵđÁÀẢÃẠÂẤẦẨẪẬĂẮẰẲẴẶÉÈẺẼẸÊẾỀỂỄỆÍÌỈĨỊÓÒỎÕỌÔỐỒỔỖỘƠỚỜỞỠỢÚÙỦŨỤƯỨỪỬỮỰÝỲỶỸỊĐ',
                    'aaaaaaaaaaaaaaaaaeeeeeeeeeeeiiiiioooooooooooooooooouuuuuuuuuuuyyyyydAAAAAAAAAAAAAAAAAEEEEEEEEEEEIIIIIOOOOOOOOOOOOOOOOOOUUUUUUUUUUUYYYYYD'
                )
            ) geolocation_city,
            first(geolocation_state) geolocation_state
        from geolocation
        group by geolocation_zip_code_prefix
    """)
    df_clean_geolocation.createOrReplaceTempView("geolocation")

    # 3. Loại bỏ các đơn hàng không giao thành công
    df_clean_orders = spark.sql("""
        select * from orders
        where order_delivered_customer_date is not null
    """)
    df_clean_orders.createOrReplaceTempView("orders")

    # 4. Loại bỏ những reviews có ngày tháng không hợp lệ và chuẩn hóa kiểu dữ liệu của các cột ngày tháng
    df_clean_order_reviews = spark.sql("""
        select * from order_reviews
        where review_creation_date is not null and review_answer_timestamp is not null
    """)
    df_clean_order_reviews = df_clean_order_reviews \
        .withColumn("review_creation_date", col("review_creation_date").cast("timestamp")) \
        .withColumn("review_answer_timestamp", col("review_answer_timestamp").cast("timestamp")) \
        .withColumnRenamed("review_answer_timestamp", "review_answer_date")
    df_clean_order_reviews.createOrReplaceTempView("order_reviews")

    # III. Làm giàu dữ liệu
    # 1. Tính thời gian giao hàng và kiểm tra đơn hàng có bị giao muộn không
    df_enrich_orders = spark.sql("""
        select 
            *,
            datediff(order_delivered_customer_date, order_purchase_timestamp) actual_delivery_days,
            (order_delivered_customer_date > order_estimated_delivery_date) is_late
        from orders
    """)
    df_enrich_orders.createOrReplaceTempView("orders")

    # 2. Tính tổng giá trị đơn hàng trong order_items
    df_enrich_order_items = spark.sql("""
        select 
            *,
            round((price + freight_value), 2) total_item_value
        from order_items
    """)
    df_enrich_order_items.createOrReplaceTempView("order_items")

    # 3. Phân loại các thành phố, bang theo khu vực
    region_data = [
        ('AC', 'North'), ('AP', 'North'), ('AM', 'North'), ('PA', 'North'), ('RO', 'North'), ('RR', 'North'), ('TO', 'North'),
        ('AL', 'Northeast'), ('BA', 'Northeast'), ('CE', 'Northeast'), ('MA', 'Northeast'), ('PB', 'Northeast'), ('PE', 'Northeast'), ('PI', 'Northeast'), ('RN', 'Northeast'), ('SE', 'Northeast'),
        ('DF', 'Central-West'), ('GO', 'Central-West'), ('MT', 'Central-West'), ('MS', 'Central-West'),
        ('ES', 'Southeast'), ('MG', 'Southeast'), ('RJ', 'Southeast'), ('SP', 'Southeast'),
        ('PR', 'South'), ('RS', 'South'), ('SC', 'South')
    ]
    df_regions = spark.createDataFrame(region_data, ["geolocation_state", "region"])
    df_regions.createOrReplaceTempView("regions")
    df_enrich_geolocation = spark.sql("""
        select
            g.*,
            r.region geolocation_region
        from geolocation g
        left join regions r on g.geolocation_state = r.geolocation_state
    """)
    df_enrich_geolocation.createOrReplaceTempView("geolocation")

    # IV. Mô hình hóa
    df_fact_order_items = spark.sql("""
        select 
            o.order_id,
            o.customer_id,
            oi.product_id,
            oi.seller_id,
            o.order_purchase_timestamp,
            o.order_delivered_customer_date,
            oi.price,
            oi.freight_value,
            oi.total_item_value,
            o.actual_delivery_days,
            o.is_late
        from orders o
        join order_items oi on o.order_id = oi.order_id
    """)

    df_dim_customers = spark.sql("""
        select 
            c.customer_id,
            c.customer_unique_id,
            g.*
        from customers c
        left join geolocation g on c.customer_zip_code_prefix = g.geolocation_zip_code_prefix
    """)

    df_dim_sellers = spark.sql("""
        select 
            s.seller_id,
            g.*
        from sellers s
        left join geolocation g on s.seller_zip_code_prefix = g.geolocation_zip_code_prefix
    """)

    tables = {
        'fact_order_items': df_fact_order_items,
        'fact_order_payments': spark.table('order_payments'),
        'fact_order_reviews': spark.table('order_reviews'),
        'dim_customers': df_dim_customers,
        'dim_products': spark.table('products'),
        'dim_sellers': df_dim_sellers
    }

    for table_name, df in tables.items():
        write_to_mysql(df, table_name)

    print("Đã hoàn thiện quá trình nhập dữ liệu vào database")

except Exception as e:
    print(f"Lỗi: {e}")

26/01/26 21:19:05 WARN MetricsConfig: Cannot locate configuration: tried hadoop-metrics2-s3a-file-system.properties,hadoop-metrics2.properties
                                                                                

Lưu bảng fact_order_items thành công


                                                                                

Lưu bảng fact_order_payments thành công


                                                                                

Lưu bảng fact_order_reviews thành công


                                                                                

Lưu bảng dim_customers thành công


                                                                                

Lưu bảng dim_products thành công


                                                                                

Lưu bảng dim_sellers thành công
Đã hoàn thiện quá trình nhập dữ liệu vào database


In [7]:
spark.stop()