In [1]:
from confluent_kafka import Producer
import scrapy
import os
from scrapy.crawler import CrawlerProcess
from pymongo import MongoClient
import json

class RugSpider(scrapy.Spider):
    name = 'rug_spider'
    start_page = 1
    max_pages = 134

    def __init__(self, *args, **kwargs):
        super().__init__(*args, **kwargs)

        # Kafka Configuration
        kafka_broker = os.getenv('KAFKA_BROKER', 'localhost:9092')
        self.kafka_topic = os.getenv('KAFKA_TOPIC', 'rug_products')
        self.producer = Producer({'bootstrap.servers': kafka_broker})

    def delivery_report(self, err, msg):
        """Callback báo cáo trạng thái gửi tin nhắn Kafka"""
        if err is not None:
            self.logger.error(f"Message delivery failed: {err}")
        else:
            self.logger.info(f"Message delivered to {msg.topic()} [{msg.partition()}]")

    def start_requests(self):
        yield scrapy.Request(
            f'https://www.therugshopuk.co.uk/rugs-by-type/rugs.html?page={self.start_page}',
            meta={'page': self.start_page}
        )

    def parse(self, response):
        # Xử lý sản phẩm trên trang hiện tại
        products = response.css('div.product-item-info')
        for product in products:
            product_link = product.css('a.product-item-link::attr(href)').get()
            product_data = {
                'Name': product.css('img.product-image-photo.image::attr(alt)').get(default='N/A').strip(),
                'Old_Price': self.clean_price(product.css('span.old-price span.price::text').get(default='N/A').strip()),
                'Special_Price': self.clean_price(product.css('span.special-price span.price::text').get(default='N/A').strip()),
                'Save': self.clean_save_percentage(product.css('span.save-percentage::text').get(default='N/A').strip()),
            }
            if product_link:
                yield response.follow(product_link, self.parse_product, meta={'product_data': product_data})

        # Lấy trang hiện tại từ meta
        current_page = response.meta.get('page', 1)

        # Tìm link của trang kế tiếp
        next_page = response.css('a.next::attr(href)').get()

        # Nếu có trang kế tiếp và chưa đạt số trang tối đa
        if next_page and current_page < self.max_pages:
            yield response.follow(next_page, callback=self.parse, meta={'page': current_page + 1})

    def parse_product(self, response):
        product_data = response.meta['product_data']

        # Truy cập phần 'Key Features'
        features = response.css('div.tab ul.as-list li')

        # Lấy thông tin chi tiết của sản phẩm
        for feature in features:
            text = feature.css('span::text').get(default='').strip()
            if "Material" in text:
                product_data['Material'] = feature.css('span.prod_mat::text').get(default='N/A').strip()
            elif "Origin" in text:
                product_data['Origin'] = feature.css('::text').extract()[-1].strip()
            elif "Type" in text:
                product_data['Type'] = feature.css('::text').extract()[-1].strip()


        # Gửi dữ liệu vào Kafka
        self.producer.produce(
            self.kafka_topic,
            value=json.dumps(product_data),
            callback=self.delivery_report
        )
        self.producer.flush()

        # Kiểm tra dữ liệu đã lấy
        yield product_data

    def clean_price(self, price_str):
        if price_str and price_str != 'N/A':
            return float(price_str.replace('£', '').replace(',', '').strip())
        return None

    def clean_save_percentage(self, save_str):
        if save_str and save_str != 'N/A':
            return save_str.replace('%', '').strip()
        return None

    def close(self, reason):
        self.client.close()
        self.producer.flush()
        self.producer.close()


# Setup CrawlerProcess and start the spider
process = CrawlerProcess(settings={
    'LOG_LEVEL': 'INFO',
    'DOWNLOAD_DELAY': 0.5,
})

process.crawl(RugSpider)
process.start()


2024-11-22 00:30:07 [scrapy.utils.log] INFO: Scrapy 2.11.2 started (bot: scrapybot)
2024-11-22 00:30:07 [scrapy.utils.log] INFO: Versions: lxml 5.3.0.0, libxml2 2.11.7, cssselect 1.2.0, parsel 1.9.1, w3lib 2.2.1, Twisted 24.7.0, Python 3.12.2 (tags/v3.12.2:6abddd9, Feb  6 2024, 21:26:36) [MSC v.1937 64 bit (AMD64)], pyOpenSSL 24.2.1 (OpenSSL 3.3.1 4 Jun 2024), cryptography 43.0.0, Platform Windows-11-10.0.22631-SP0
2024-11-22 00:30:07 [scrapy.addons] INFO: Enabled addons:
[]


See the documentation of the 'REQUEST_FINGERPRINTER_IMPLEMENTATION' setting for information on how to handle this deprecation.
  return cls(crawler)

2024-11-22 00:30:07 [scrapy.extensions.telnet] INFO: Telnet Password: 929ed7e6beaa921b
2024-11-22 00:30:07 [scrapy.middleware] INFO: Enabled extensions:
['scrapy.extensions.corestats.CoreStats',
 'scrapy.extensions.telnet.TelnetConsole',
 'scrapy.extensions.logstats.LogStats']
2024-11-22 00:30:07 [scrapy.crawler] INFO: Overridden settings:
{'DOWNLOAD_DELAY': 0.5, 'L

In [None]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import from_json, col
from pyspark.sql.types import StructType, StructField, StringType, DoubleType
from pyspark import SparkConf, SparkContext
from pyspark.sql import SQLContext
import os

# Set hadoop home directory
os.environ['hadoop.home.dir'] = "C:\\hadoop-3.3.0"
os.environ['HADOOP_HOME'] = "C:\\hadoop-3.3.0"
os.environ['JAVA_HOME'] = "C:\\Program Files\\Java\\jdk-11.0.10"

# Ensure Hadoop binaries are in the PATH
os.environ['PATH'] += os.pathsep + os.path.join(os.environ['hadoop.home.dir'], 'bin')

# Cấu hình Spark
conf = SparkConf() \
    .set("spark.jars.packages", "org.postgresql:postgresql:42.7.4,org.apache.spark:spark-sql-kafka-0-10_2.12:3.1.2,org.mongodb.spark:mongo-spark-connector_2.12:3.0.1,com.microsoft.sqlserver:mssql-jdbc:10.2.0.jre8") \
    .setMaster("local[*]") \
    .setAppName("kafka_spark_sql")

sc = SparkContext(conf=conf)
sqlC = SQLContext(sc)

# Đọc dữ liệu từ Kafka
kafka_bootstrap_servers = "localhost:9092"
topic = "rug_products"

# Đọc dữ liệu từ Kafka
raw_data = sqlC.readStream \
    .format("kafka") \
    .option("kafka.bootstrap.servers", kafka_bootstrap_servers) \
    .option("subscribe", topic) \
    .load()

# Định nghĩa schema của dữ liệu JSON
rug_schema = StructType([
    StructField("Name", StringType(), True),
    StructField("Old_Price", DoubleType(), True),
    StructField("Special_Price", DoubleType(), True),
    StructField("Save", StringType(), True),
    StructField("Material", StringType(), True),
    StructField("Origin", StringType(), True),
    StructField("Type", StringType(), True)
])

# Chuyển đổi dữ liệu Kafka từ chuỗi JSON sang DataFrame
rug_data = raw_data.selectExpr("CAST(value AS STRING)") \
    .select(from_json(col("value"), rug_schema).alias("json_data"))

# Lấy các cột cụ thể từ dữ liệu JSON
rug_selected = rug_data.select(
    "json_data.Name",
    "json_data.Old_Price",
    "json_data.Special_Price",
    "json_data.Save",
    "json_data.Material",
    "json_data.Origin",
    "json_data.Type"
)


query = rug_selected.writeStream.outputMode("append").format("console").start()
query.awaitTermination()

In [2]:
# Kiểm tra schema
rug_selected.printSchema()

root
 |-- Name: string (nullable = true)
 |-- Old_Price: double (nullable = true)
 |-- Special_Price: double (nullable = true)
 |-- Save: string (nullable = true)
 |-- Material: string (nullable = true)
 |-- Origin: string (nullable = true)
 |-- Type: string (nullable = true)



In [3]:
# Xử lý dữ liệu rỗng: Loại bỏ các hàng có giá trị null trong một số cột nào
from pyspark.sql import functions as F


rug_cleand = rug_selected.filter(
    F.col('Name').isNotNull() & 
    F.col('Old_Price').isNotNull() & 
    F.col('Special_Price').isNotNull() & 
    F.col('Save').isNotNull() & 
    F.col('Material').isNotNull() &  
    F.col('Type').isNotNull()
)


In [6]:
rug_cleand = rug_cleand.na.replace("N/A", "Unknown").filter(
    F.col('Name').isNotNull() & 
    F.col('Old_Price').isNotNull() & 
    F.col('Special_Price').isNotNull() & 
    F.col('Save').isNotNull() & 
    F.col('Material').isNotNull() & 
    F.col('Origin').isNotNull() &
    F.col('Type').isNotNull() 
)


In [7]:

# Thay thế "Unknown" bằng null và loại bỏ các hàng có giá trị null trong các cột quan trọng
rug_cleand = rug_cleand.na.replace("Unknown", None).filter(
    F.col('Name').isNotNull() & 
    F.col('Old_Price').isNotNull() & 
    F.col('Special_Price').isNotNull() & 
    F.col('Save').isNotNull() & 
    F.col('Material').isNotNull() & 
    F.col('Origin').isNotNull() & 
    F.col('Type').isNotNull()
)


In [8]:
# Hiển thị dữ liệu đã xử lý
rug_cleand.show(truncate=False)

+-----------------------------------------------------------------------------+---------+-------------+-------+-----------------------------+----------------+------------------------+-----------+------+-------+-----------------------------+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|Name                                                                         |Old_Price|Special_Price|Save   |Material                     |Cleaning_Process|Pattern                 |Pile_height|Weight|Origin |Type                         |Product_Reviews                                                                                                                                                                                                                                         |
+-------------------

In [None]:
from pyspark.sql.functions import regexp_replace

# Loại bỏ phần "Save " khỏi cột "Save"
rug_cleand = rug_cleand.withColumn('Save', regexp_replace('Save', 'Save ', ''))

In [10]:
# Hiển thị dữ liệu đã xử lý
rug_cleand.show(truncate=False)

+-----------------------------------------------------------------------------+---------+-------------+----+-----------------------------+----------------+------------------------+-----------+------+-------+-----------------------------+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|Name                                                                         |Old_Price|Special_Price|Save|Material                     |Cleaning_Process|Pattern                 |Pile_height|Weight|Origin |Type                         |Product_Reviews                                                                                                                                                                                                                                         |
+-------------------------

ĐẨY DỮ LIỆU SẠCH LÊN MONGO

In [13]:
# Định nghĩa URI MongoDB 
mongo_output_uri = "mongodb://localhost:27017/rugs.rugs_cleand"  

# Ghi DataFrame 'rug_cleand' vào MongoDB 
rug_cleand.write \
    .format("com.mongodb.spark.sql.DefaultSource") \
    .mode("overwrite") \
    .option("uri", mongo_output_uri) \
    .save()



TÁCH BẢNG ĐỂ MÔ HÌNH HOÁ

In [14]:
from pyspark.sql.functions import monotonically_increasing_id
# Tạo DataFrame 'Rugs' chứa thông tin chi tiết về sản phẩm
Rugs = rug_cleand.select(
    monotonically_increasing_id().alias("rug_id"),
    "Name",
    "Old_Price",
    "Special_Price",
    "Save",
    "Origin"
).distinct().orderBy("rug_id")

# Tạo DataFrame 'Material' chứa các vật liệu sản phẩm
Material = rug_cleand.select(
    monotonically_increasing_id().alias("rug_id"),
    "Material"
).distinct().orderBy("rug_id")

# Tạo DataFrame 'Type' chứa các loại sản phẩm
Type = rug_cleand.select(
    monotonically_increasing_id().alias("rug_id"),
    "Type"
).distinct().orderBy("rug_id")

ĐẨY DỮ LIỆU LÊN CÁC TOPIC TRÊN KAFKA

In [15]:
from confluent_kafka.admin import AdminClient
import logging
from pyspark.sql import functions as F

# Cấu hình Kafka
kafka_bootstrap_servers = "localhost:9092"
topic_rug = "RugTopic"
topic_material = "MaterialTopic"
topic_type = "TypeTopic"

# Cấu hình logging
logging.basicConfig(level=logging.ERROR, format='%(asctime)s - %(levelname)s - %(message)s')
logger = logging.getLogger(__name__)

def check_kafka_status(bootstrap_servers):
    try:
        # Tạo cấu hình cho Kafka AdminClient
        conf = {
            'bootstrap.servers': bootstrap_servers
        }
        admin_client = AdminClient(conf)
        admin_client.list_topics(timeout=10)  # Kiểm tra kết nối bằng cách liệt kê các chủ đề
        print("Kafka is reachable.")
    except Exception as e:
        raise ConnectionError(f"Cannot connect to Kafka: {e}")

def write_to_kafka(df, topic):
    try:
        check_kafka_status(kafka_bootstrap_servers)

        # Hiển thị 5 dòng đầu tiên của DataFrame
        df.show(5)
        
        # Cột cần điền giá trị mặc định nếu giá trị thiếu
        columns_to_fill = ['Name', 'Old_Price', 'Special_Price', 'Save', 'Origin', 'Material', 'Type']
        fill_values = {
            'Name': 'Unknown',
            'Old_Price': '0.0',
            'Special_Price': '0.0',
            'Save': '0',
            'Origin': 'Unknown',
            'Material': 'Unknown',
            'Type': 'Unknown'
        }

        # Điền giá trị mặc định cho các cột
        df = df.fillna({col: fill_values.get(col, None) for col in columns_to_fill if col in df.columns})
        df.show(5)
        df.printSchema()

        # Thêm cột "rug_id" nếu chưa có
        if "rug_id" not in df.columns:
            df = df.withColumn("rug_id", F.monotonically_increasing_id().cast("string"))

        # Chuyển đổi dữ liệu thành JSON
        df = df.selectExpr("CAST(rug_id AS STRING) AS key", "to_json(struct(*)) AS value")

        # Ghi dữ liệu vào Kafka
        df.write \
            .format("kafka") \
            .option("kafka.bootstrap.servers", kafka_bootstrap_servers) \
            .option("topic", topic) \
            .option("checkpointLocation", f"/tmp/kafka_checkpoint_{topic}") \
            .save()

        print(f"Data written to Kafka topic {topic} successfully!")
    except Exception as e:
        logger.error(f"Failed to write data to Kafka topic {topic}: {e}")

# Gọi hàm write_to_kafka
write_to_kafka(Rugs, topic_rug)
write_to_kafka(Material, topic_material)
write_to_kafka(Type, topic_type)


Kafka is reachable.
+------+--------------------+---------+-------------+----+-------+
|rug_id|                Name|Old_Price|Special_Price|Save| Origin|
+------+--------------------+---------+-------------+----+-------+
|     0|Cleo 013-0016 619...|    223.2|       169.08|  24|Belgium|
|     1|Nova NV10 Antique...|     59.0|         39.6|  33| Turkey|
|     2|Aurora AU17 Linea...|     89.0|         52.8|  41| Turkey|
|     3|Galleria 063 0529...|    100.8|        76.12|  24|Belgium|
|     4|My Lux Washable S...|     35.0|         27.0|  23|     UK|
+------+--------------------+---------+-------------+----+-------+
only showing top 5 rows

+------+--------------------+---------+-------------+----+-------+
|rug_id|                Name|Old_Price|Special_Price|Save| Origin|
+------+--------------------+---------+-------------+----+-------+
|     0|Cleo 013-0016 619...|    223.2|       169.08|  24|Belgium|
|     1|Nova NV10 Antique...|     59.0|         39.6|  33| Turkey|
|     2|Aurora AU

2024-11-21 02:15:26,884 - ERROR - Failed to write data to Kafka topic MaterialTopic: An error occurred while calling o205.save.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 33.0 failed 1 times, most recent failure: Lost task 0.0 in stage 33.0 (TID 21) (TZA executor driver): org.apache.kafka.common.errors.TimeoutException: Expiring 3 record(s) for MaterialTopic-0:120015 ms has passed since batch creation

Driver stacktrace:
	at org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:2856)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:2792)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:2791)
	at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
	at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
	at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
	at org.apache.spark

Kafka is reachable.
+------+--------------------+
|rug_id|                Type|
+------+--------------------+
|     0|              Modern|
|     1|         Traditional|
|     2|              Modern|
|     3|              Modern|
|     4|Modern , Plain , ...|
+------+--------------------+
only showing top 5 rows

+------+--------------------+
|rug_id|                Type|
+------+--------------------+
|     0|              Modern|
|     1|         Traditional|
|     2|              Modern|
|     3|              Modern|
|     4|Modern , Plain , ...|
+------+--------------------+
only showing top 5 rows

root
 |-- rug_id: long (nullable = false)
 |-- Type: string (nullable = false)

Data written to Kafka topic TypeTopic successfully!


KẾT NỐI ĐỂ TẠO BẢNG VÀ MÔ HÌNH HOÁ SAU ĐÓ ĐẨY DỮ LIỆU LÊN POSTGRES

In [16]:
import psycopg2
from pyspark.sql import SparkSession

spark = SparkSession.builder \
    .appName("CreateTablesWithConstraints") \
    .getOrCreate()

jdbc_url = "jdbc:postgresql://localhost:5432/db_rug"
connection_properties = {
    "user": "tram2",
    "password": "1111",
    "driver": "org.postgresql.Driver"
}

def create_tables_and_constraints():
    try:
        conn = psycopg2.connect(dbname="db_rug", user="tram2", password="1111", host="localhost", port="5432")
        cursor = conn.cursor()

        # Tạo bảng rug
        cursor.execute("""
            CREATE TABLE IF NOT EXISTS Rug (
                rug_id BIGSERIAL PRIMARY KEY, 
                Name VARCHAR(255),
                Old_price FLOAT,
                Special_Price FLOAT,
                Save VARCHAR(255),
                Origin VARCHAR(255)
            );
        """)

        # Tạo bảng material
        cursor.execute("""
            CREATE TABLE IF NOT EXISTS Material (
                rug_id BIGSERIAL,
                Material VARCHAR(255),
                PRIMARY KEY (rug_id, Material),
                CONSTRAINT fk_rug_material_rug_id FOREIGN KEY (rug_id) REFERENCES rug (rug_id) ON DELETE CASCADE
            );
        """)

        # Tạo bảng type
        cursor.execute("""
            CREATE TABLE IF NOT EXISTS Type (
                rug_id BIGSERIAL,
                Type VARCHAR(255),
                PRIMARY KEY (rug_id, Type),
                CONSTRAINT fk_rug_type_rug_id FOREIGN KEY (rug_id) REFERENCES rug (rug_id) ON DELETE CASCADE
            );
        """)

        conn.commit()
        cursor.close()
        conn.close()
        print("Tables created successfully.")
    except Exception as e:
        print("Error occurred while creating tables: ", e)

create_tables_and_constraints()


Tables created successfully.


In [17]:
def write_to_postgres(df, table_name):
    try:
        df.write.jdbc(url=jdbc_url, table=table_name, mode="append", properties=connection_properties)
        print(f"Successfully wrote to {table_name}")
    except Exception as e:
        print(f"Error writing to {table_name}: {e}")

# Đẩy dữ liệu lên PostgreSQL theo thứ tự đúng
write_to_postgres(Rugs, "rug")  # Ghi bảng rug
write_to_postgres(Material, "material")  # Ghi bảng material
write_to_postgres(Type, "type")  # Ghi bảng type
 


Successfully wrote to rug
Successfully wrote to material
Successfully wrote to type
