In [0]:
# %pip install pandas requests
# %pip install delta-spark


In [0]:
import requests
import os
import logging
import pandas as pd
from datetime import datetime
from concurrent.futures import ThreadPoolExecutor
from pyspark.sql import SparkSession
from pyspark.sql.functions import current_timestamp
import pytz  # Timezone handling
import stat  # File permissions



In [0]:
class TZFormatter(logging.Formatter):
    def __init__(self, fmt=None, datefmt='%Y-%m-%d %H:%M:%S %Z', tz=None):
        super().__init__(fmt=fmt, datefmt=datefmt)
        self.tz = tz

    def formatTime(self, record, datefmt=None):
        dt = datetime.fromtimestamp(record.created, tz=self.tz)
        return dt.strftime(datefmt or '%Y-%m-%d %H:%M:%S %Z')

def init_log(name='log', log_dir="/dbfs/tmp/logs", log_file="tiki.log"):
    if not os.path.exists(log_dir):
        os.makedirs(log_dir, exist_ok=True)

    logger = logging.getLogger(name)
    logger.setLevel(logging.DEBUG)

    if logger.hasHandlers():
        logger.handlers.clear()

    log_path = os.path.join(log_dir, log_file)
    file_handler = logging.FileHandler(log_path)
    file_handler.setLevel(logging.DEBUG)

    ho_chi_minh_tz = pytz.timezone('Asia/Ho_Chi_Minh')
    formatter = TZFormatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s', tz=ho_chi_minh_tz)
    file_handler.setFormatter(formatter)

    logger.addHandler(file_handler)
    os.chmod(log_path, stat.S_IRWXU | stat.S_IRWXG | stat.S_IRWXO)

    return logger

logger = init_log('tiki_logger')


#Mount Azure Storage to DataBricks

In [0]:
def mount_storage():
    try:
        dbutils.fs.mount(
            source="name_storage",
            mount_point="/mnt/tikistorage",  
            extra_configs={
                "Key"
            }
        )
        print("Mount thành công!")
    except Exception as e:
        print(f"Mount thất bại: {e}")

mount_storage()

Mount thành công!


In [0]:
display(dbutils.fs.mounts())


mountPoint,source,encryptionType
/databricks-datasets,databricks-datasets,
/databricks/mlflow-tracking,databricks/mlflow-tracking,sse-s3
/databricks-results,databricks-results,sse-s3
/databricks/mlflow-registry,databricks/mlflow-registry,sse-s3
/mnt/tikistorage,wasbs://datalake@tikistoragedata123.blob.core.windows.net,
/,DatabricksRoot,sse-s3


# Tiki Scraping

In [0]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import current_timestamp
import requests
import pandas as pd
import logging
import os
from concurrent.futures import ThreadPoolExecutor


def init_log(logger_name, log_dir, log_file):
    """
    Initialize logger with specific configuration.
    """
    if not os.path.exists(log_dir):
        os.makedirs(log_dir)

    logger = logging.getLogger(logger_name)
    logger.setLevel(logging.INFO)

    log_path = os.path.join(log_dir, log_file)
    file_handler = logging.FileHandler(log_path, encoding='utf-8')
    file_handler.setLevel(logging.INFO)

    formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')
    file_handler.setFormatter(formatter)

    logger.addHandler(file_handler)
    return logger


logger = init_log('tiki', './logs', 'tiki.log')


def get_categories():
    """
    Fetch all categories from Tiki.vn.
    Returns a list of category links.
    """
    url = 'https://api.tiki.vn/raiden/v2/menu-config?platform=desktop'
    headers = {'User-Agent': 'Mozilla/5.0'}
    try:
        response = requests.get(url, headers=headers)
        response.raise_for_status()
        data = response.json()
        return [item['link'].split("/c")[-1] for item in data.get("menu_block", {}).get("items", []) if 'link' in item]
    except Exception as e:
        logger.error(f"Error fetching categories: {e}")
        return []


def get_data(url):
    headers = {'User-Agent': 'Mozilla/5.0'}
    try:
        response = requests.get(url, headers=headers)
        response.raise_for_status()
        return response.json().get("data", [])
    except requests.exceptions.RequestException as e:
        logger.warning(f"Error fetching data from {url}: {e}")
        return []


def extract_product_fields(product):
    """
    Extract important fields from product data.
    """
    important_fields = {
        'id': product.get('id'),
        'sku': product.get('sku'),
        'name': product.get('name'),
        'price': product.get('price'),
        'list_price': product.get('list_price'),
        'discount': product.get('discount'),
        'discount_rate': product.get('discount_rate'),
        'rating_average': product.get('rating_average'),
        'review_count': product.get('review_count'),
        'order_count': product.get('order_count'),
        'favourite_count': product.get('favourite_count'),
        'thumbnail_url': product.get('thumbnail_url'),
        'quantity_sold': product.get('quantity_sold', {}).get('value') if isinstance(product.get('quantity_sold'), dict) else None,
        'original_price': product.get('original_price'),
        'seller_id': product.get('seller_id'),
        'seller': product.get('seller', {}).get('name') if isinstance(product.get('seller'), dict) else None,
        'seller_product_id': product.get('seller_product_id'),
        'brand_name': product.get('brand_name'),
        'category_l1_name': product.get('visible_impression_info', {}).get('amplitude', {}).get('category_l1_name'),
        'category_l2_name': product.get('visible_impression_info', {}).get('amplitude', {}).get('category_l2_name'),
        'category_l3_name': product.get('visible_impression_info', {}).get('amplitude', {}).get('category_l3_name')
    }
    return important_fields


def save_to_deltalake(data, delta_output_dir):
    """
    Save all data into a single Delta Lake table with schema overwrite enabled.
    """
    spark = SparkSession.builder \
        .appName("ProcessTikiData") \
        .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") \
        .config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog") \
        .getOrCreate()

    df = pd.DataFrame(data)
    spark_df = spark.createDataFrame(df)

    # Add processed time column
    spark_df = spark_df.withColumn("processed_time", current_timestamp())

    # Write data to Delta Lake with schema overwrite
    spark_df.write.format("delta") \
        .option("overwriteSchema", "true") \
        .mode("overwrite") \
        .save(delta_output_dir)

    logger.info(f"Dữ liệu đã được lưu vào Delta Lake tại {delta_output_dir}.")



def process_category(category, total_data):
    """
    Process data for a specific category and append to total_data.
    """
    try:
        page = 1
        while True:
            url = f'https://tiki.vn/api/personalish/v1/blocks/listings?limit=300&category={category}&page={page}'
            data = get_data(url)
            if not data:
                logger.info(f"No more data for category {category} at page {page}")
                break
            extracted_data = [extract_product_fields(item) for item in data]
            total_data.extend(extracted_data)
            logger.info(f"Fetched {len(data)} products from category {category}, page {page}")
            page += 1
    except Exception as e:
        logger.error(f"Error processing category {category}: {e}")


def crawl_data(output_dir):
    """
    Crawl data from all categories and save to Delta Lake.
    """
    categories = get_categories()
    if not categories:
        logger.error("No categories found to process.")
        return

    total_data = []

    # Use ThreadPoolExecutor to process categories in parallel
    with ThreadPoolExecutor(max_workers=5) as executor:
        executor.map(lambda category: process_category(category, total_data), categories)

    if total_data:
        save_to_deltalake(total_data, output_dir)
        logger.info("Dữ liệu từ tất cả danh mục đã được lưu thành công.")
    else:
        logger.warning("Không có dữ liệu để lưu.")


if __name__ == "__main__":
    try:
        logger.info("Bắt đầu quy trình cào dữ liệu từ Tiki...")
        crawl_data(output_dir="/mnt/tikistorage/tiki_data")
        logger.info("Quy trình cào dữ liệu hoàn thành!")
    except Exception as e:
        logger.error(f"Lỗi xảy ra trong quá trình cào dữ liệu: {e}")


In [0]:
log_path = "./logs/tiki.log"

try:
    with open(log_path, 'r', encoding='utf-8') as log_file:
        print("=== Nội dung Log File ===")
        print(log_file.read())
except FileNotFoundError:
    print(f"Không tìm thấy log file tại: {log_path}")
except Exception as e:
    print(f"Lỗi khi đọc log file: {e}")


=== Nội dung Log File ===
2024-12-09 07:56:58,754 - tiki - INFO - Bắt đầu quy trình cào dữ liệu từ Tiki...
2024-12-09 07:56:59,950 - tiki - INFO - Fetched 40 products from category 8322, page 1
2024-12-09 07:57:00,025 - tiki - INFO - Fetched 40 products from category 1883, page 1
2024-12-09 07:57:00,038 - tiki - INFO - Fetched 40 products from category 1789, page 1
2024-12-09 07:57:00,346 - tiki - INFO - Fetched 40 products from category 2549, page 1
2024-12-09 07:57:00,419 - tiki - INFO - Fetched 40 products from category 8322, page 2
2024-12-09 07:57:00,432 - tiki - INFO - Fetched 40 products from category 1815, page 1
2024-12-09 07:57:00,717 - tiki - INFO - Fetched 40 products from category 1789, page 2
2024-12-09 07:57:01,003 - tiki - INFO - Fetched 40 products from category 1883, page 2
2024-12-09 07:57:01,051 - tiki - INFO - Fetched 40 products from category 8322, page 3
2024-12-09 07:57:01,149 - tiki - INFO - Fetched 40 products from category 2549, page 2
2024-12-09 07:57:01,160

# Read Delta

In [0]:
from pyspark.sql import SparkSession

# Khởi tạo SparkSession
spark = SparkSession.builder \
    .appName("Delta Lake Read tiki data") \
    .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") \
    .config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog") \
    .getOrCreate()

# Đường dẫn Delta Lake
delta_path = "link_storage"

# Đọc dữ liệu từ Delta Lake
try:
    df = spark.read.format("delta").load(delta_path)
    print("Dữ liệu đã được đọc thành công!")
    df.show(10)  # Hiển thị 10 dòng đầu tiên
except Exception as e:
    print(f"Lỗi khi đọc dữ liệu từ Delta Lake: {e}")


Dữ liệu đã được đọc thành công!
+---------+-------------+--------------------+-------+----------+--------+-------------+--------------+------------+-----------+---------------+--------------------+-------------+--------------+---------+-----------------+----------+--------------------+--------------------+--------------------+--------------------+
|       id|          sku|                name|  price|list_price|discount|discount_rate|rating_average|review_count|order_count|favourite_count|       thumbnail_url|quantity_sold|original_price|seller_id|seller_product_id|brand_name|    category_l1_name|    category_l2_name|    category_l3_name|      processed_time|
+---------+-------------+--------------------+-------+----------+--------+-------------+--------------+------------+-----------+---------------+--------------------+-------------+--------------+---------+-----------------+----------+--------------------+--------------------+--------------------+--------------------+
|276135518|218

In [0]:
df.select("category_l1_name").distinct().show()


+--------------------+
|    category_l1_name|
+--------------------+
|Thời Trang Cho Mẹ...|
|Ô Tô - Xe Máy - X...|
|       Điện Gia Dụng|
|Thiết Bị Số - Phụ...|
|   Đồ Chơi - Mẹ & Bé|
|    Chăm sóc nhà cửa|
| Thể Thao - Dã Ngoại|
|  Nhà Cửa - Đời Sống|
|     Bách Hóa Online|
|  Làm Đẹp - Sức Khỏe|
|       Thời trang nữ|
|       Nhà Sách Tiki|
|Điện Thoại - Máy ...|
|      Thời trang nam|
|Cross Border - Hà...|
| Điện Tử - Điện Lạnh|
|      Giày - Dép nam|
|Laptop - Máy Vi T...|
|                NGON|
| Phụ kiện thời trang|
+--------------------+
only showing top 20 rows



In [0]:
df.printSchema()

root
 |-- id: long (nullable = true)
 |-- sku: string (nullable = true)
 |-- name: string (nullable = true)
 |-- price: long (nullable = true)
 |-- list_price: long (nullable = true)
 |-- discount: long (nullable = true)
 |-- discount_rate: long (nullable = true)
 |-- rating_average: double (nullable = true)
 |-- review_count: long (nullable = true)
 |-- order_count: long (nullable = true)
 |-- favourite_count: long (nullable = true)
 |-- thumbnail_url: string (nullable = true)
 |-- quantity_sold: double (nullable = true)
 |-- original_price: long (nullable = true)
 |-- seller_id: long (nullable = true)
 |-- seller_product_id: long (nullable = true)
 |-- brand_name: string (nullable = true)
 |-- category_l1_name: string (nullable = true)
 |-- category_l2_name: string (nullable = true)
 |-- category_l3_name: string (nullable = true)
 |-- processed_time: timestamp (nullable = true)



# Process Data


In [0]:
from pyspark.sql.functions import col, sum

# Kiểm tra tổng số giá trị null cho mỗi cột
null_counts = df.select(
    [sum(col(c).isNull().cast("int")).alias(c) for c in df.columns]
)

null_counts.show()


+---+---+----+-----+----------+--------+-------------+--------------+------------+-----------+---------------+-------------+-------------+--------------+---------+-----------------+----------+----------------+----------------+----------------+--------------+
| id|sku|name|price|list_price|discount|discount_rate|rating_average|review_count|order_count|favourite_count|thumbnail_url|quantity_sold|original_price|seller_id|seller_product_id|brand_name|category_l1_name|category_l2_name|category_l3_name|processed_time|
+---+---+----+-----+----------+--------+-------------+--------------+------------+-----------+---------------+-------------+-------------+--------------+---------+-----------------+----------+----------------+----------------+----------------+--------------+
|  0|  0|   0|    0|         0|       0|            0|             0|           0|          0|              0|            0|        12058|             0|        0|                0|         0|               0|            24

In [0]:
df.select("quantity_sold").show(20, truncate=False)


+-------------+
|quantity_sold|
+-------------+
|185.0        |
|84.0         |
|156.0        |
|367.0        |
|285.0        |
|333.0        |
|255.0        |
|800.0        |
|588.0        |
|852.0        |
|1021.0       |
|53.0         |
|1691.0       |
|933.0        |
|523.0        |
|27.0         |
|3936.0       |
|1864.0       |
|11169.0      |
|285.0        |
+-------------+
only showing top 20 rows



In [0]:
df = df.fillna({"quantity_sold": 0})


In [0]:
null_count = df.filter(col("quantity_sold").isNull()).count()
print(f"Số lượng giá trị null trong cột quantity_sold: {null_count}")


Số lượng giá trị null trong cột quantity_sold: 0


In [0]:
df = df.fillna({'category_l2_name': 'Unknown', 'category_l3_name': 'Unknown'})


Tỷ lệ giảm giá


In [0]:
df.select("calculated_discount_rate").describe().show()


+-------+------------------------+
|summary|calculated_discount_rate|
+-------+------------------------+
|  count|                   50423|
|   mean|                     0.0|
| stddev|                     0.0|
|    min|                     0.0|
|    max|                     0.0|
+-------+------------------------+



In [0]:
df_grouped = df.groupBy("brand_name").agg(
    {"quantity_sold": "sum", "price": "avg"}
).withColumnRenamed("sum(quantity_sold)", "total_quantity_sold") \
 .withColumnRenamed("avg(price)", "average_price")

df_grouped.show()


+-----------------+-------------------+------------------+
|       brand_name|total_quantity_sold|     average_price|
+-----------------+-------------------+------------------+
|            OENON|             2166.0|          165000.0|
|         miDoctor|            71102.0|115839.65909090909|
|          Hồng Ký|               20.0|          285000.0|
|           Mamamy|             2341.0|          129993.6|
|            KOVER|               34.0|           52640.0|
|         peekaboo|               46.0|         1229000.0|
|           Chandi|             8758.0|          160400.0|
|            Ecoba|            15560.0|          179300.0|
|           SAKITO|              365.0|           79000.0|
|             ESNA|              103.0|           59000.0|
|       Cat's Eyes|               69.0|          153000.0|
|          OATSIDE|              122.0|203333.33333333334|
|             LiOA|             1333.0|217571.42857142858|
|            Total|               65.0|           90202.

## Save to Deltalake

In [0]:
output_path = "/mnt/tikistorage/processed_tiki_data"

try:
    # Ghi dữ liệu vào Delta Lake
    df.write.format("delta").mode("overwrite").option("overwriteSchema", "true").save(output_path)
    print(f"Dữ liệu đã được lưu thành công vào {output_path}")
except Exception as e:
    print(f"Lỗi khi lưu dữ liệu vào Delta Lake: {e}")


Dữ liệu đã được lưu thành công vào /mnt/tikistorage/processed_tiki_data


In [0]:
from pyspark.sql import SparkSession

# Khởi tạo SparkSession
spark = SparkSession.builder.appName("Read Delta from Azure").getOrCreate()

# Đường dẫn thư mục Delta Lake trên Azure Blob Storage
delta_path = "link_storage"

# Cấu hình tài khoản Azure Blob Storage
spark.conf.set(
    "fs.azure.account.key.tikistoragedata123.blob.core.windows.net",
    "key"
)

try:
    df = spark.read.format("delta").load(delta_path)
    df.show(10)  
    df.printSchema()  
except Exception as e:
    print(f"Lỗi khi đọc Delta Lake: {e}")


+---------+-------------+--------------------+-------+----------+--------+-------------+--------------+------------+-----------+---------------+--------------------+-------------+--------------+---------+-----------------+----------+--------------------+--------------------+--------------------+--------------------+------------------------+
|       id|          sku|                name|  price|list_price|discount|discount_rate|rating_average|review_count|order_count|favourite_count|       thumbnail_url|quantity_sold|original_price|seller_id|seller_product_id|brand_name|    category_l1_name|    category_l2_name|    category_l3_name|      processed_time|calculated_discount_rate|
+---------+-------------+--------------------+-------+----------+--------+-------------+--------------+------------+-----------+---------------+--------------------+-------------+--------------+---------+-----------------+----------+--------------------+--------------------+--------------------+------------------

In [0]:
df.select("category_l1_name").distinct().show()


+--------------------+
|    category_l1_name|
+--------------------+
|Thời Trang Cho Mẹ...|
|Ô Tô - Xe Máy - X...|
|       Điện Gia Dụng|
|Thiết Bị Số - Phụ...|
|   Đồ Chơi - Mẹ & Bé|
|    Chăm sóc nhà cửa|
| Thể Thao - Dã Ngoại|
|  Nhà Cửa - Đời Sống|
|     Bách Hóa Online|
|  Làm Đẹp - Sức Khỏe|
|       Thời trang nữ|
|       Nhà Sách Tiki|
|Điện Thoại - Máy ...|
|      Thời trang nam|
|Cross Border - Hà...|
| Điện Tử - Điện Lạnh|
|      Giày - Dép nam|
|Laptop - Máy Vi T...|
|                NGON|
| Phụ kiện thời trang|
+--------------------+
only showing top 20 rows

