In [26]:
import pandas as pd
import numpy as np
from pyspark.sql import SparkSession

In [27]:
# Khởi tạo SparkSession
spark = SparkSession.builder \
    .appName("MongoDB and Postgres") \
    .config("spark.jars.packages", "org.mongodb.spark:mongo-spark-connector_2.12:3.0.2,org.postgresql:postgresql:42.7.4") \
    .config("spark.mongodb.input.uri", "mongodb://localhost:27017/dbmycrawler") \
    .getOrCreate()

In [28]:
#doc du lieu tu mongo
df=spark.read.format("mongo").option("collection","tblcafeland").load()
#hien du lieu
df.show() 

+--------------------+----------+------------------+--------------------+--------------------+--------------------+-----------+--------------------+-----------------+--------+--------------------+--------------------+
|          Chủ đầu tư|Diện tích:|        Loại dự án|               Mô tả|           Ngày đăng|           Thành phố| Trạng thái|           Tên dự án|  Tổng vốn đầu tư|Xếp hạng|                 _id|               Đường|
+--------------------+----------+------------------+--------------------+--------------------+--------------------+-----------+--------------------+-----------------+--------+--------------------+--------------------+
|Công Ty Cổ Phần B...|   8.200m2|     Đất Nền Dự Án|The Long Eyes là ...|Cập nhật: 12/09/2...|           Trảng Bom|Đang mở bán|The Long Eyes: Dự...|          unknown|     5.0|{66f8f9c98ac17558...|   đường Sông Thao 5|
|Công ty Cổ phần Đ...|33.540,7m2|   Căn hộ chung cư|Golden City là că...|Cập nhật: 25/09/2...|        TP. Tây Ninh|Đang mở bán|G

In [29]:
#Xem kiểu dl các cột
df.printSchema()

root
 |-- Chủ đầu tư: string (nullable = true)
 |-- Diện tích:: string (nullable = true)
 |-- Loại dự án: string (nullable = true)
 |-- Mô tả: string (nullable = true)
 |-- Ngày đăng: string (nullable = true)
 |-- Thành phố: string (nullable = true)
 |-- Trạng thái: string (nullable = true)
 |-- Tên dự án: string (nullable = true)
 |-- Tổng vốn đầu tư: string (nullable = true)
 |-- Xếp hạng: string (nullable = true)
 |-- _id: struct (nullable = true)
 |    |-- oid: string (nullable = true)
 |-- Đường: string (nullable = true)



In [30]:
from pyspark.sql import functions as F

# Thay thế giá trị 'unknown' và chuỗi rỗng trong tất cả các cột
for column in df.columns:
    if column in df.columns:  # Kiểm tra xem cột có tồn tại trong DataFrame không
        try:
            df = df.withColumn(column, 
                               F.when(F.col(column) == 'unknown', 'chưa cập nhật')
                                .when(F.col(column) == '', 'chưa cập nhật')
                                .otherwise(F.col(column)))
            print(f"Processed column: {column}")  # In ra cột đã xử lý
        except Exception as e:
            print(f"Error processing column {column}: {e}")  # In ra lỗi nếu có

# Kiểm tra dữ liệu sau khi thay thế
df.show()


Processed column: Chủ đầu tư
Processed column: Diện tích:
Processed column: Loại dự án
Processed column: Mô tả
Processed column: Ngày đăng
Processed column: Thành phố
Processed column: Trạng thái
Processed column: Tên dự án
Processed column: Tổng vốn đầu tư
Processed column: Xếp hạng
Error processing column _id: [DATATYPE_MISMATCH.BINARY_OP_DIFF_TYPES] Cannot resolve "(_id = unknown)" due to data type mismatch: the left and right operands of the binary operator have incompatible types ("STRUCT<oid: STRING>" and "STRING").;
'Project [Chủ đầu tư#1190, Diện tích:#1203, Loại dự án#1216, Mô tả#1229, Ngày đăng#1242, Thành phố#1255, Trạng thái#1268, Tên dự án#1281, Tổng vốn đầu tư#1294, Xếp hạng#1307, CASE WHEN (_id#1115 = unknown) THEN chưa cập nhật WHEN (_id#1115 = ) THEN chưa cập nhật ELSE _id#1115 END AS _id#1320, Đường#1116]
+- Project [Chủ đầu tư#1190, Diện tích:#1203, Loại dự án#1216, Mô tả#1229, Ngày đăng#1242, Thành phố#1255, Trạng thái#1268, Tên dự án#1281, Tổng vốn đầu tư#1294, CAS

In [31]:
from pyspark.sql.functions import col, regexp_replace
# Loại bỏ chuỗi 'Cập nhật:' trong cột 'Ngày đăng'
df = df.withColumn("Ngày đăng", regexp_replace(col("Ngày đăng"), "Cập nhật:", ""))




In [32]:
#Xem dl các cột
df.show()

+--------------------+-------------+------------------+--------------------+--------------------+--------------------+-----------+--------------------+-----------------+--------+--------------------+--------------------+
|          Chủ đầu tư|   Diện tích:|        Loại dự án|               Mô tả|           Ngày đăng|           Thành phố| Trạng thái|           Tên dự án|  Tổng vốn đầu tư|Xếp hạng|                 _id|               Đường|
+--------------------+-------------+------------------+--------------------+--------------------+--------------------+-----------+--------------------+-----------------+--------+--------------------+--------------------+
|Công Ty Cổ Phần B...|      8.200m2|     Đất Nền Dự Án|The Long Eyes là ...|  12/09/2024 8:49 AM|           Trảng Bom|Đang mở bán|The Long Eyes: Dự...|    chưa cập nhật|     5.0|{66f8f9c98ac17558...|   đường Sông Thao 5|
|Công ty Cổ phần Đ...|   33.540,7m2|   Căn hộ chung cư|Golden City là că...|  25/09/2024 9:05 AM|        TP. Tây Nin

In [33]:
# Xóa bỏ dấu chấm ở các dòng m2
df = df.withColumn("Diện tích:", regexp_replace(col("Diện tích:"), "\\.", ""))

# Đổi dấu phẩy ở dòng có chứa 'ha' thành dấu chấm
df = df.withColumn("Diện tích:", regexp_replace(col("Diện tích:"), ",", "."))

In [34]:
from pyspark.sql.functions import col, when, regexp_extract
# Chuyển đổi giá trị diện tích, nhân với 10.000 nếu có 'ha'
df = df.withColumn(
    "Diện tích:",
    when(
        col("Diện tích:") == "chưa cập nhật",  # Giữ nguyên nếu giá trị là "chưa cập nhật"
        "chưa cập nhật"
    ).otherwise(
        when(
            col("Diện tích:").contains("ha"),
            regexp_extract(col("Diện tích:"), "([\\d.]+)", 1).cast("float") * 10000  # Nhân với 10.000 nếu có 'ha'
        ).otherwise(
            regexp_extract(col("Diện tích:"), "([\\d.]+)", 1).cast("float")  # Chuyển đổi giá trị m2
        )
    )
)
# Đổi tên cột thành "Diện tích(m²)"
df = df.withColumnRenamed("Diện tích:", "Diện tích (m²)")
# Xóa bỏ chữ 'ha' và 'm2'
df = df.withColumn("Diện tích (m²)", regexp_replace(col("Diện tích (m²)"), "ha|m2", ""))
#Xem dl các cột
df.show()

+--------------------+--------------+------------------+--------------------+--------------------+--------------------+-----------+--------------------+-----------------+--------+--------------------+--------------------+
|          Chủ đầu tư|Diện tích (m²)|        Loại dự án|               Mô tả|           Ngày đăng|           Thành phố| Trạng thái|           Tên dự án|  Tổng vốn đầu tư|Xếp hạng|                 _id|               Đường|
+--------------------+--------------+------------------+--------------------+--------------------+--------------------+-----------+--------------------+-----------------+--------+--------------------+--------------------+
|Công Ty Cổ Phần B...|        8200.0|     Đất Nền Dự Án|The Long Eyes là ...|  12/09/2024 8:49 AM|           Trảng Bom|Đang mở bán|The Long Eyes: Dự...|    chưa cập nhật|     5.0|{66f8f9c98ac17558...|   đường Sông Thao 5|
|Công ty Cổ phần Đ...|       33540.7|   Căn hộ chung cư|Golden City là că...|  25/09/2024 9:05 AM|        TP. Tâ

In [35]:
# Xóa bỏ dấu chấm ở các dòng "tỷ đồng"
df = df.withColumn("Tổng vốn đầu tư", regexp_replace(col("Tổng vốn đầu tư"), "\\.", ""))

# Đổi dấu phẩy ở dòng có chứa 'USD' thành dấu chấm
df = df.withColumn("Tổng vốn đầu tư", regexp_replace(col("Tổng vốn đầu tư"), ",", "."))

In [36]:
from pyspark.sql.functions import col, when, regexp_extract
# Chuyển đổi giá trị tổng vón đầu tư, nhân với 24 nếu có "USD"
df = df.withColumn(
    "Tổng vốn đầu tư",
    when(
        col("Tổng vốn đầu tư") == "chưa cập nhật",  # Giữ nguyên nếu giá trị là "chưa cập nhật"
        "chưa cập nhật"
    ).otherwise(
        when(
            col("Tổng vốn đầu tư").contains("USD"),
            regexp_extract(col("Tổng vốn đầu tư"), "([\\d.]+)", 1).cast("float") * 24  # Nhân 24 nếu có chữ triệu USD
        ).otherwise(
            regexp_extract(col("Tổng vốn đầu tư"), "([\\d.]+)", 1).cast("float")  
        )
    )
)
# Đổi tên cột thành "Tổng vốn đầu tư (tỷ đồng)"
df = df.withColumnRenamed("Tổng vốn đầu tư", "Tổng vốn đầu tư (tỷ đồng)")

#đổi tên cột thành phố
df=df.withColumnRenamed("Thành phố",'Vị trí')

# Hiển thị dữ liệu sau khi thay đổi
df.show()

+--------------------+--------------+------------------+--------------------+--------------------+--------------------+-----------+--------------------+-------------------------+--------+--------------------+--------------------+
|          Chủ đầu tư|Diện tích (m²)|        Loại dự án|               Mô tả|           Ngày đăng|              Vị trí| Trạng thái|           Tên dự án|Tổng vốn đầu tư (tỷ đồng)|Xếp hạng|                 _id|               Đường|
+--------------------+--------------+------------------+--------------------+--------------------+--------------------+-----------+--------------------+-------------------------+--------+--------------------+--------------------+
|Công Ty Cổ Phần B...|        8200.0|     Đất Nền Dự Án|The Long Eyes là ...|  12/09/2024 8:49 AM|           Trảng Bom|Đang mở bán|The Long Eyes: Dự...|            chưa cập nhật|     5.0|{66f8f9c98ac17558...|   đường Sông Thao 5|
|Công ty Cổ phần Đ...|       33540.7|   Căn hộ chung cư|Golden City là că...|  2

In [37]:
# Thay thế giá trị 0 trong cột 'Xếp hạng' bằng 'chưa có đánh giá'
df = df.withColumn("Xếp hạng", when(col("Xếp hạng") == '0', 'chưa có đánh giá').otherwise(col("Xếp hạng")))

# Hiển thị dữ liệu sau khi thay đổi
df.show()

+--------------------+--------------+------------------+--------------------+--------------------+--------------------+-----------+--------------------+-------------------------+----------------+--------------------+--------------------+
|          Chủ đầu tư|Diện tích (m²)|        Loại dự án|               Mô tả|           Ngày đăng|              Vị trí| Trạng thái|           Tên dự án|Tổng vốn đầu tư (tỷ đồng)|        Xếp hạng|                 _id|               Đường|
+--------------------+--------------+------------------+--------------------+--------------------+--------------------+-----------+--------------------+-------------------------+----------------+--------------------+--------------------+
|Công Ty Cổ Phần B...|        8200.0|     Đất Nền Dự Án|The Long Eyes là ...|  12/09/2024 8:49 AM|           Trảng Bom|Đang mở bán|The Long Eyes: Dự...|            chưa cập nhật|             5.0|{66f8f9c98ac17558...|   đường Sông Thao 5|
|Công ty Cổ phần Đ...|       33540.7|   Căn hộ c

In [38]:
from pyspark.sql.functions import col

# Danh sách cột mong muốn
desired_columns = [
    "Loại dự án",
    "Tên dự án",
    "Đường",
    "Vị trí",
    "Chủ đầu tư",
    "Trạng thái",
    "Diện tích (m²)",  # Đổi tên cột này cho đầy đủ
    "Ngày đăng",
    "Tổng vốn đầu tư (tỷ đồng)",  # Đổi tên cột này cho đầy đủ
    "Xếp hạng",
    "Mô tả"
]

# Kiểm tra danh sách các cột có trong DataFrame
existing_columns = df.columns
print("Các cột hiện có trong DataFrame:", existing_columns)

# Lọc ra các cột tồn tại trong DataFrame
valid_columns = [col for col in desired_columns if col in existing_columns]

# Sắp xếp lại DataFrame theo thứ tự các cột đã lọc
df = df.select(*valid_columns)

# Hiển thị dữ liệu sau khi thay đổi
df.show()


Các cột hiện có trong DataFrame: ['Chủ đầu tư', 'Diện tích (m²)', 'Loại dự án', 'Mô tả', 'Ngày đăng', 'Vị trí', 'Trạng thái', 'Tên dự án', 'Tổng vốn đầu tư (tỷ đồng)', 'Xếp hạng', '_id', 'Đường']
+------------------+--------------------+--------------------+--------------------+--------------------+-----------+--------------+--------------------+-------------------------+----------------+--------------------+
|        Loại dự án|           Tên dự án|               Đường|              Vị trí|          Chủ đầu tư| Trạng thái|Diện tích (m²)|           Ngày đăng|Tổng vốn đầu tư (tỷ đồng)|        Xếp hạng|               Mô tả|
+------------------+--------------------+--------------------+--------------------+--------------------+-----------+--------------+--------------------+-------------------------+----------------+--------------------+
|     Đất Nền Dự Án|The Long Eyes: Dự...|   đường Sông Thao 5|           Trảng Bom|Công Ty Cổ Phần B...|Đang mở bán|        8200.0|  12/09/2024 8:49 AM| 

In [39]:
from pyspark.sql.functions import when
from pyspark.sql.functions import monotonically_increasing_id

#thêm cột project_id 
df=df.withColumn('project_id',monotonically_increasing_id())

# Thêm cột project_type_id với giá trị tương ứng cho từng loại dự án
df= df.withColumn("project_type_id",
    when(df["Loại dự án"] == "Khu công nghiệp", 111)
    .when(df["Loại dự án"] == "Nhà Phố - Biệt Thự", 112)
    .when(df["Loại dự án"] == "Bất Động Sản Nghỉ Dưỡng", 113)
    .when(df["Loại dự án"] == "Loại Hình Khác", 114)
    .when(df["Loại dự án"] == "Căn hộ chung cư", 115)
    .when(df["Loại dự án"] == "Khu Đô Thị", 116)
    .when(df["Loại dự án"] == "Đất Nền Dự Án", 117)
    .otherwise(None)  # Gán None cho các giá trị không xác định
)

In [40]:
df.show()

+------------------+--------------------+--------------------+--------------------+--------------------+-----------+--------------+--------------------+-------------------------+----------------+--------------------+----------+---------------+
|        Loại dự án|           Tên dự án|               Đường|              Vị trí|          Chủ đầu tư| Trạng thái|Diện tích (m²)|           Ngày đăng|Tổng vốn đầu tư (tỷ đồng)|        Xếp hạng|               Mô tả|project_id|project_type_id|
+------------------+--------------------+--------------------+--------------------+--------------------+-----------+--------------+--------------------+-------------------------+----------------+--------------------+----------+---------------+
|     Đất Nền Dự Án|The Long Eyes: Dự...|   đường Sông Thao 5|           Trảng Bom|Công Ty Cổ Phần B...|Đang mở bán|        8200.0|  12/09/2024 8:49 AM|            chưa cập nhật|             5.0|The Long Eyes là ...|         0|            117|
|   Căn hộ chung cư|Gold

In [41]:
#bảng project
projects=df.select("Tên dự án",'project_type_id') \

    #thêm cột project_id để làm khóa chính và đổi tên các cột
projects=projects.withColumn('project_id',monotonically_increasing_id())\
                 .withColumnRenamed('project_id','Mã dự án') \
                 .withColumnRenamed('project_type_id','Mã loại dự án')

#bảng loại dự án
project_types=df.select("Loại dự án")
    #tạo khóa chính mã loại dự án
project_types= project_types.withColumn("project_type_id",
                                        when(col("Loại dự án") == "Khu công nghiệp", 111)
                                        .when(col("Loại dự án") == "Nhà Phố - Biệt Thự", 112)
                                        .when(col("Loại dự án") == "Bất Động Sản Nghỉ Dưỡng", 113)
                                        .when(col("Loại dự án") == "Loại Hình Khác", 114)
                                        .when(col("Loại dự án") == "Căn hộ chung cư", 115)
                                        .when(col("Loại dự án") == "Khu Đô Thị", 116)
                                        .when(col("Loại dự án") == "Đất Nền Dự Án", 117)
                                        .otherwise(None))

from pyspark.sql.functions import count

    # Tính số lượng dự án theo loại dự án
project_types = project_types.groupBy("project_type_id", "Loại dự án").agg(
    count("*").alias("Số lượng dự án")
)  
                
    # Loại bỏ các hàng trùng lặp
project_types = project_types.dropDuplicates(["project_type_id","Loại dự án","Số lượng dự án"])   

    #đổi tên các cột
project_types=project_types.withColumnRenamed('project_type_id','Mã loại dự án')\
                           .withColumnRenamed('Loại dự án','Tên loại dự án')
    # Sắp xếp DataFrame theo cột "Mã loại dự án" tăng dần
project_types=project_types.orderBy("Mã loại dự án")
                 
#bảng chi tiết dự án
project_details = df.select("project_id", "Tên dự án","project_type_id","Loại dự án","Đường","Vị trí","Diện tích (m²)","Chủ đầu tư","Tổng vốn đầu tư (tỷ đồng)","Mô tả")\
                    .withColumnRenamed('project_id', 'Mã dự án') \
                    .withColumnRenamed("Loại dự án", "Tên loại dự án") \
                    .withColumnRenamed('project_type_id', 'Mã loại dự án')
#bảng vị trí 
locations =df.select('project_id','Vị trí','Đường','Diện tích (m²)')\
            .withColumnRenamed('project_id','Mã dự án')
#bảng trạng thái  
status=df.select('project_id',"Tên dự án",'Trạng thái','Xếp hạng','Mô tả')\
         .withColumnRenamed('project_id','Mã dự án')

In [42]:
#thêm khóa ngoại cho bảng project_detail để nối với bảng project
project_details=project_details.join(projects.select(
    'Mã dự án'),on='Mã dự án',how='inner')

#thêm khóa ngoại cho bảng location để nối với bảng project
locations=locations.join(projects.select(
    'Mã dự án'),on='Mã dự án',how='inner')

#thêm khóa ngoại cho bảng status để nối với bảng project
status=status.join(projects.select(
    'Mã dự án'),on='Mã dự án',how='inner') 

#thêm khóa ngoại cho bảng project để nối với bảng project type
projects=projects.join(project_types.select(
    'Mã loại dự án'),on='Mã loại dự án',how='inner')    
       

In [43]:
# Kết nối tới PostgreSQL
jdbc_url = "jdbc:postgresql://localhost:5432/cafeland"
connection_properties = {
    "user": "postgres",
    "password": "123",#nhập mk tạo trong postgres
    "driver": "org.postgresql.Driver"
}

In [44]:
# Hàm ghi dữ liệu vào PostgreSQL

def write_to_postgres(df, table_name):
    try:
        df.write.jdbc(
            url=jdbc_url,
            table=table_name,
            mode="overwrite",
            properties=connection_properties
        )
        print(f"Đã ghi thành công bảng {table_name} vào PostgreSQL!")
    except Exception as e:
        print(f"Lỗi khi ghi bảng {table_name} vào PostgreSQL: {e}")

In [45]:
# Lưu bảng  vào PostgreSQL
write_to_postgres(projects, "projects")
write_to_postgres(locations, "locations")
write_to_postgres(project_details, "project_details")
write_to_postgres(project_types, "project_types")
write_to_postgres(status, "status")

Đã ghi thành công bảng projects vào PostgreSQL!
Đã ghi thành công bảng locations vào PostgreSQL!
Đã ghi thành công bảng project_details vào PostgreSQL!
Đã ghi thành công bảng project_types vào PostgreSQL!
Đã ghi thành công bảng status vào PostgreSQL!


In [46]:
# Dừng SparkSession
spark.stop()