In [1]:
from pyspark.sql import SparkSession
import os

In [2]:
spark = SparkSession.builder.appName("Spark-Transform-Data").getOrCreate()
sc = spark.sparkContext

your 131072x1 screen size is bogus. expect trouble
24/11/14 13:33:38 WARN Utils: Your hostname, Huy resolves to a loopback address: 127.0.1.1; using 10.255.255.254 instead (on interface lo)
24/11/14 13:33:38 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
24/11/14 13:33:39 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


### Xây dựng hàm đọc và hàm biến đổi dữ liệu

In [3]:
def load_data(product_type):
    """Đọc dữ liệu từ máy và lưu dưới dạng Spark DataFrame.

    Args:
        product_type (str): Loại sản phẩm

    Returns:
        tuple: Gồm hai Spark DataFrames:
               - `current_df`: DataFrame chứa dữ liệu hiện tại
               - `previous_df`: DataFrame chứa dữ liệu từ lần chạy trước (hoặc None nếu không tồn tại)
    """
    previous_df = spark.read.csv(f'./data/raw/last_run/tgdd_{product_type}_data.csv', header=True) \
                     if os.path.exists(f'./data/raw/last_run/tgdd_{product_type}_data.csv') else None
    current_df = spark.read.csv(f'./data/raw/tgdd_{product_type}_data.csv', header=True)
    
    return current_df, previous_df
   

In [4]:
from functions import string_to_int_udf, split_name_product_udf, convert_storage_udf
import pyspark.sql.functions as F

def transform_data(spark_df, product_type):
    """Biến đổi dữ liệu

    Args:
        spark_df (Spark DataFrame): DataFrame chứa dữ liệu cần được biến đổi
        product_type (str): Loại sản phẩm

    Returns:
        Spark DataFrame: Dữ liệu sau khi được biến đổi.
    """    
    transformed_spark_df = spark_df.withColumn("name_and_core", split_name_product_udf(spark_df.name))
    transformed_spark_df = transformed_spark_df.withColumn("Name", F.col("name_and_core.names")) \
                                                 .withColumn("Core", F.col("name_and_core.cores")) 
        
    transformed_spark_df= transformed_spark_df.withColumn("OldPrice", string_to_int_udf(F.col("old_price")))
    transformed_spark_df= transformed_spark_df.withColumn("CurrentPrice", string_to_int_udf(F.col("current_price")))
    transformed_spark_df= transformed_spark_df.withColumn("GiftValue", string_to_int_udf(F.col("gift")))
    transformed_spark_df= transformed_spark_df.withColumnRenamed('brand', 'Brand')\
                                        .withColumnRenamed('total_rating', 'TotalRating')\
                                        .withColumnRenamed('avg_score', 'AverageScore')\
                                        .withColumnRenamed('link', 'ProductLink')
                                        
    transformed_spark_df = transformed_spark_df.withColumn('DiscountPercent', F.round(((F.col("OldPrice") - F.col("CurrentPrice")) / F.col("OldPrice") * 100), 2))
    transformed_spark_df = transformed_spark_df.orderBy(['Name', 'CurrentPrice'], ascending = [1, 0])\
                                         .withColumn("ProductType", F.lit('Laptop'))\
                                         .withColumn('Brand', F.when(transformed_spark_df.Brand == 'MacBook', 'Apple').otherwise(transformed_spark_df.Brand))
    if product_type == 'laptop':                                            
        transformed_spark_df= transformed_spark_df.withColumn("RAM", convert_storage_udf(F.col("ram")))
        transformed_spark_df= transformed_spark_df.withColumn("SSD", convert_storage_udf(F.col("ssd")))
        transformed_spark_df = transformed_spark_df.select(['Name', 'Brand', 'ProductType', 'Core', 'RAM', 'SSD', 'OldPrice', 'CurrentPrice', \
                                                            "DiscountPercent", "GiftValue", 'TotalRating', 'AverageScore', 'ProductLink'])
    else:
        transformed_spark_df = transformed_spark_df.withColumnRenamed('screen', 'Screen')
        transformed_spark_df = transformed_spark_df.select(['Name', 'ProductType', 'Brand', 'Screen', 'OldPrice', 'CurrentPrice', \
                                                           "DiscountPercent", 'GiftValue', 'TotalRating', 'AverageScore', 'ProductLink'])
    
    return transformed_spark_df

#### Danh mục laptop

In [5]:
current_laptop_df, previous_laptop_df = load_data('laptop')
if previous_laptop_df:
    new_records = current_laptop_df.subtract(previous_laptop_df)
    if new_records:
        transformed_laptop_df = transform_data(new_records, 'laptop')
else:
    transformed_laptop_df = transform_data(current_laptop_df, 'laptop')
transformed_laptop_df.show(truncate=False)    

                                                                                

+-----------------------------------+------+-----------+-------------+---+----+---------+------------+---------------+---------+-----------+------------+------------------------------------------------------------------------------------+
|Name                               |Brand |ProductType|Core         |RAM|SSD |OldPrice |CurrentPrice|DiscountPercent|GiftValue|TotalRating|AverageScore|ProductLink                                                                         |
+-----------------------------------+------+-----------+-------------+---+----+---------+------------+---------------+---------+-----------+------------+------------------------------------------------------------------------------------+
|Lenovo Gaming LOQ 15ARP9           |Lenovo|Laptop     |R7 7435HS    |24 |512 |29490000 |26990000    |8.48           |NULL     |0          |0.0         |https://www.thegioididong.com/laptop/lenovo-loq-15arp9-r7-83jc0040vn                |
|Lenovo Gaming LOQ 15IAX9           |Lenovo|

#### Danh mục máy tính bảng

In [6]:
current_tablet_df, previous_tablet_df = load_data('may-tinh-bang')
if previous_tablet_df:
    new_records = current_tablet_df.subtract(previous_tablet_df)
    if new_records:
        transformed_tablet_df = transform_data(new_records, 'may-tinh-bang')
else:
    transformed_tablet_df = transform_data(current_tablet_df, 'may-tinh-bang')
transformed_tablet_df.show(truncate=False)  

+-------------------------------------+-----------+-------+----------------------+--------+------------+---------------+---------+-----------+------------+-------------------------------------------------------------------------------+
|Name                                 |ProductType|Brand  |Screen                |OldPrice|CurrentPrice|DiscountPercent|GiftValue|TotalRating|AverageScore|ProductLink                                                                    |
+-------------------------------------+-----------+-------+----------------------+--------+------------+---------------+---------+-----------+------------+-------------------------------------------------------------------------------+
|Lenovo Tab                           |Laptop     |Lenovo |IPS LCD 10.95""       |6690000 |6390000     |4.48           |NULL     |0          |0.0         |https://www.thegioididong.com/may-tinh-bang/lenovo-tab-m11-4g-8gb-128gb        |
|Lenovo Tab                           |Laptop     |Lenov

#### Danh mục điện thoại

In [7]:
current_phone_df, previous_phone_df = load_data('dtdd')
if previous_phone_df:
    new_records = current_phone_df.subtract(previous_phone_df)
    if new_records:
        transformed_phone_df = transform_data(new_records, 'dtdd')
else:
    transformed_phone_df = transform_data(current_phone_df, 'dtdd')
transformed_phone_df.show(truncate=False)  

+--------------------------+-----------+--------------+--------------+--------+------------+---------------+---------+-----------+------------+-------------------------------------------------------+
|Name                      |ProductType|Brand         |Screen        |OldPrice|CurrentPrice|DiscountPercent|GiftValue|TotalRating|AverageScore|ProductLink                                            |
+--------------------------+-----------+--------------+--------------+--------+------------+---------------+---------+-----------+------------+-------------------------------------------------------+
|ASUS Zenfone Max          |Laptop     |Asus (Zenfone)|HD+ 6.26""    |NULL    |NULL        |NULL           |NULL     |0          |0.0         |https://www.thegioididong.com/dtdd/asus-zenfone-max-m2 |
|Benco 4G G3Mẫu mới        |Laptop     |Benco         |QQVGA 1.77""  |NULL    |380000      |NULL           |NULL     |0          |0.0         |https://www.thegioididong.com/dtdd/benco-4g-g3         |


#### Lấy thông tin toàn bộ các thương hiệu

In [8]:
laptop_brands_df = transformed_laptop_df.select('Brand').distinct()
tablet_brands_df = transformed_tablet_df.select('Brand').distinct()
phone_brands_df = transformed_phone_df.select('Brand').distinct()

brands = laptop_brands_df.union(tablet_brands_df.union(phone_brands_df)).dropDuplicates().orderBy('Brand')
brands = brands.withColumn('ID', F.monotonically_increasing_id() + 1)
brands = brands.select(["ID", "Brand"])
brands.show()

+---+--------------+
| ID|         Brand|
+---+--------------+
|  1|Asus (Zenfone)|
|  2|         Benco|
|  3|         HONOR|
|  4|          Itel|
|  5|        Lenovo|
|  6|       Masstel|
|  7|        Mobell|
|  8|         Nokia|
|  9|          OPPO|
| 10|       Samsung|
| 11|           TCL|
| 12|         Tecno|
| 13|       Viettel|
| 14|        Xiaomi|
| 15|iPhone (Apple)|
| 16|        realme|
| 17|          vivo|
+---+--------------+



#### Tạo DataFrame chứa toàn bộ dữ liệu

In [9]:
# Tạo các view để sử dụng Spark SQL
transformed_laptop_df.createOrReplaceTempView("laptops")
transformed_tablet_df.createOrReplaceTempView("tablets")
transformed_phone_df.createOrReplaceTempView("phones")
brands.createOrReplaceTempView("brands")

In [10]:
def retrieve_product_info(product_type):
    """ Trích xuất các thông tin chung của từng sản phẩm
    
    Args:
        product_type(str): Loại sản phẩm
        
    Returns:
        Spark DataFrame: Dữ liệu sau khi được trích xuất
    """
    query = f""" SELECT 
                    Name, 
                    ProductType, 
                    brands.ID as BrandID, 
                    OldPrice, 
                    CurrentPrice, 
                    DiscountPercent, 
                    GiftValue, 
                    ProductLink
                FROM {product_type} JOIN brands 
                ON {product_type}.Brand = brands.Brand
            """
    product_info = spark.sql(query)
    return product_info

laptops_info = retrieve_product_info('laptops')
tablets_info = retrieve_product_info('tablets')
phones_info = retrieve_product_info('phones')

product_info = laptops_info.union(tablets_info.union(phones_info)).orderBy('Name', ascending=[1])
product_info = product_info.withColumn('ID', F.monotonically_increasing_id() + 1)
product_info = product_info.select(["ID", "Name", "ProductType", "BrandID", "OldPrice", "CurrentPrice", "DiscountPercent", "GiftValue", "ProductLink"])
product_info.show(truncate=False)

24/11/14 13:33:52 WARN GarbageCollectionMetrics: To enable non-built-in garbage collector(s) List(G1 Concurrent GC), users should configure it(them) to spark.eventLog.gcMetrics.youngGenerationGarbageCollectors or spark.eventLog.gcMetrics.oldGenerationGarbageCollectors


+---+----------------------------------+-----------+-------+--------+------------+---------------+---------+-----------------------------------------------------------------------------+
|ID |Name                              |ProductType|BrandID|OldPrice|CurrentPrice|DiscountPercent|GiftValue|ProductLink                                                                  |
+---+----------------------------------+-----------+-------+--------+------------+---------------+---------+-----------------------------------------------------------------------------+
|1  |ASUS Zenfone Max                  |Laptop     |1      |NULL    |NULL        |NULL           |NULL     |https://www.thegioididong.com/dtdd/asus-zenfone-max-m2                       |
|2  |Benco 4G G3Mẫu mới                |Laptop     |2      |NULL    |380000      |NULL           |NULL     |https://www.thegioididong.com/dtdd/benco-4g-g3                               |
|3  |HONOR 200 5G                      |Laptop     |3      |12990

In [11]:
def retrieve_product_spec(product_type):
    """ Trích xuất các thông số kĩ thuật của từng sản phẩm
    
    Args:
        product_type(str): Loại sản phẩm
        
    Returns:
        Spark DataFrame: Dữ liệu sau khi được trích xuất
    """
    if product_type == 'laptops':
        query = f"""SELECT 
                       Name, Core, RAM, SSD, 'NULL' AS Screen
                    FROM {product_type}
                """
    else:
        query = f"""SELECT 
                       Name, 'NULL' AS Core, 'NULL' AS RAM, 'NULL' AS SSD, Screen
                    FROM {product_type}
                """
        
    product_spec = spark.sql(query)
    return product_spec

laptops_spec = retrieve_product_spec('laptops')
tablets_spec = retrieve_product_spec('tablets')
phones_spec = retrieve_product_spec('phones')

product_spec = laptops_spec.union(tablets_spec.union(phones_spec)).orderBy("Name", ascending=[1])
product_spec = product_spec.withColumn('ProductID', F.monotonically_increasing_id() + 1)
product_spec = product_spec.select(["ProductID", "Name", "Core", "RAM", "SSD", "Screen"])
product_spec.show(truncate=False)

+---------+----------------------------------+-----------+----+----+--------------+
|ProductID|Name                              |Core       |RAM |SSD |Screen        |
+---------+----------------------------------+-----------+----+----+--------------+
|1        |ASUS Zenfone Max                  |NULL       |NULL|NULL|HD+ 6.26""    |
|2        |Benco 4G G3Mẫu mới                |NULL       |NULL|NULL|QQVGA 1.77""  |
|3        |HONOR 200 5G                      |NULL       |NULL|NULL|1.5K 6.7""    |
|4        |HONOR 90 5G 12GB/256GB            |NULL       |NULL|NULL|1.5K 6.7""    |
|5        |HONOR X5 Plus 4GB/64GB            |NULL       |NULL|NULL|HD+ 6.56""    |
|6        |HONOR X6a 4GB/128GB               |NULL       |NULL|NULL|HD+ 6.56""    |
|7        |HONOR X6b                         |NULL       |NULL|NULL|HD+ 6.56""    |
|8        |HONOR X7c 8GB/256GBMẫu mới        |NULL       |NULL|NULL|HD+ 6.77 ""   |
|9        |HONOR X8b 8GB/512GB               |NULL       |NULL|NULL|Full HD+

In [12]:
def retrieve_product_rating(product_type):
    """ Trích xuất các thoog+ng tin đánh giá của khách hàng đối với từng sản phẩm
    
    Args:
        product_type(str): Loại sản phẩm
        
    Returns:
        Spark DataFrame: Dữ liệu sau khi được trích xuất
    """
    query = f"""SELECT 
                    Name, TotalRating, AverageScore
                FROM {product_type}
               """

    product_rating = spark.sql(query)
    return product_rating

laptops_rating = retrieve_product_rating('laptops')
tablets_rating = retrieve_product_rating('tablets')
phones_rating = retrieve_product_rating('phones')

product_rating = laptops_rating.union(tablets_rating.union(phones_rating)).orderBy("Name", ascending=[1])
product_rating = product_rating.withColumn('ProductID', F.monotonically_increasing_id() + 1)
product_rating = product_rating.select(["ProductID", "Name", "TotalRating", "AverageScore"])
product_rating.show(truncate=False)

+---------+----------------------------------+-----------+------------+
|ProductID|Name                              |TotalRating|AverageScore|
+---------+----------------------------------+-----------+------------+
|1        |ASUS Zenfone Max                  |0          |0.0         |
|2        |Benco 4G G3Mẫu mới                |0          |0.0         |
|3        |HONOR 200 5G                      |0          |0.0         |
|4        |HONOR 90 5G 12GB/256GB            |0          |0.0         |
|5        |HONOR X5 Plus 4GB/64GB            |0          |0.0         |
|6        |HONOR X6a 4GB/128GB               |0          |0.0         |
|7        |HONOR X6b                         |0          |0.0         |
|8        |HONOR X7c 8GB/256GBMẫu mới        |0          |0.0         |
|9        |HONOR X8b 8GB/512GB               |0          |0.0         |
|10       |Itel it2600                       |0          |0.0         |
|11       |Itel it9211                       |0          |0.0   

In [None]:
# Lưu dữ liệu đã biến đổi và trích xuất vào máy
file_path = os.getcwd()
data_path = os.path.join(file_path, 'data/processed')

if not os.path.exists(data_path):
    os.makedirs(data_path)
    
brands.toPandas().to_csv(f'{data_path}/brands.csv', index=False, header=True, mode='a')
product_info.toPandas().to_csv(f'{data_path}/product_info.csv', index=False, header=True, mode='a')
product_spec.toPandas().to_csv(f'{data_path}/product_spec.csv', index=False, header=True, mode='a')
product_rating.toPandas().to_csv(f'{data_path}/product_rating.csv', index=False, header=True, mode='a')