In [23]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import struct, lower, regexp_replace, trim, explode, split, count, collect_list, lit, col
from pyspark.sql.types import StringType

In [24]:
spark = SparkSession.builder \
    .appName("Spark XML Example") \
    .config("spark.jars", 'C:\SPARK\spark-xml_2.12-0.16.0.jar') \
    .config("spark.jars.packages", "com.johnsnowlabs.nlp:spark-nlp_2.12:4.4.3") \
    .config("spark.jars.repositories", "https://repo1.maven.org/maven2/") \
    .config("spark.driver.memory", "4g") \
    .config("spark.executor.memory", "4g") \
    .master("local[*]") \
    .getOrCreate()

 #   .config("spark.jars.packages", "com.johnsnowlabs.nlp:spark-nlp_2.12:5.0.0") \


In [7]:
# Đường dẫn đến file XML
xml_path = "mini_wiki-dum.xml"

# Đọc XML với rowTag là "page"
df = spark.read.format("xml") \
    .option("rowTag", "page") \
    .option("inferSchema", "true") \
    .load(xml_path)

# Hiển thị schema để thấy các thẻ con
df.printSchema()

root
 |-- id: long (nullable = true)
 |-- ns: long (nullable = true)
 |-- redirect: struct (nullable = true)
 |    |-- _VALUE: string (nullable = true)
 |    |-- _title: string (nullable = true)
 |-- revision: struct (nullable = true)
 |    |-- comment: struct (nullable = true)
 |    |    |-- _VALUE: string (nullable = true)
 |    |    |-- _deleted: string (nullable = true)
 |    |-- contributor: struct (nullable = true)
 |    |    |-- _VALUE: string (nullable = true)
 |    |    |-- _deleted: string (nullable = true)
 |    |    |-- id: long (nullable = true)
 |    |    |-- ip: string (nullable = true)
 |    |    |-- username: string (nullable = true)
 |    |-- format: string (nullable = true)
 |    |-- id: long (nullable = true)
 |    |-- minor: string (nullable = true)
 |    |-- model: string (nullable = true)
 |    |-- origin: long (nullable = true)
 |    |-- parentid: long (nullable = true)
 |    |-- sha1: string (nullable = true)
 |    |-- text: struct (nullable = true)
 |    |    

In [8]:
df_flat = df.select(
    col("id"),
    col("ns"),
    col("redirect._VALUE").alias("redirect_value"),
    col("redirect._title").alias("redirect_title"),
    col("revision.comment").alias("revision_comment"),
    col("revision.contributor.id").alias("contributor_id"),
    col("revision.contributor.ip").alias("contributor_ip"),
    col("revision.contributor.username").alias("contributor_username"),
    col("revision.format").alias("revision_format"),
    col("revision.id").alias("revision_id"),
    col("revision.minor").alias("revision_minor"),
    col("revision.model").alias("revision_model"),
    col("revision.origin").alias("revision_origin"),
    col("revision.parentid").alias("revision_parentid"),
    col("revision.sha1").alias("revision_sha1"),
    col("revision.text._VALUE").alias("content"),
    col("revision.text._bytes").alias("text_bytes"),
    col("revision.text._sha1").alias("text_sha1"),
    col("revision.text._xml:space").alias("text_xml_space"),
    col("revision.timestamp").alias("timestamp"),
    col("title")
)

# Kiểm tra kiểu dữ liệu của 'revision.comment'
if not isinstance(df_flat.schema["revision_comment"].dataType, StringType):
    # Nếu không là chuỗi
    df_flat = df_flat.withColumn("revision_comment", col("revision_comment._VALUE"))


# Hiển thị kết quả
df_flat.show()

+--------+---+--------------+--------------------+--------------------+--------------+--------------+--------------------+---------------+-----------+--------------+--------------+---------------+-----------------+--------------------+--------------------+----------+--------------------+--------------+-------------------+--------------------+
|      id| ns|redirect_value|      redirect_title|    revision_comment|contributor_id|contributor_ip|contributor_username|revision_format|revision_id|revision_minor|revision_model|revision_origin|revision_parentid|       revision_sha1|             content|text_bytes|           text_sha1|text_xml_space|          timestamp|               title|
+--------+---+--------------+--------------------+--------------------+--------------+--------------+--------------------+---------------+-----------+--------------+--------------+---------------+-----------------+--------------------+--------------------+----------+--------------------+--------------+-------

In [5]:
from pyspark.sql.functions import coalesce

# df_flat = df_flat.withColumn("redirect_title", coalesce(df_flat["redirect_title"], df_flat["title"]))

In [None]:
df_flat = df_flat.withColumn(
    "content", 
    lower(trim(regexp_replace(col("content"), r'[^a-zA-Z0-9]', ' ')))  # Thay thế tất cả các ký tự không phải chữ và số bằng khoảng trắng
).withColumn(
    "revision_comment", 
    lower(trim(regexp_replace(col("revision_comment"), r'[^a-zA-Z0-9]', ' ')))  # Thay thế tất cả các ký tự không phải chữ và số bằng khoảng trắng
).withColumn(
    "title", 
    lower(trim(regexp_replace(col("title"), r'[^a-zA-Z0-9]', ' ')))  # Thay thế tất cả các ký tự không phải chữ và số bằng khoảng trắng
)


In [9]:
# Xử lý null cho các cột
# df_flat = df_flat.withColumn("content", when(col("content").isNull(), lit("")).otherwise(col("content"))) \
#                  .withColumn("revision_comment", when(col("revision_comment").isNull(), lit("")).otherwise(col("revision_comment"))) \
#                  .withColumn("title", when(col("title").isNull(), lit("")).otherwise(col("title")))


# Áp dụng thao tác làm sạch và chuyển tất cả về chữ thường
df_flat = df_flat.withColumn(
    "content", 
    lower(trim(regexp_replace(regexp_replace(regexp_replace(col("content"), r'[^\w\s]', ' '), r'\s+', ' '), r'\d+', '')))
).withColumn(
    "revision_comment", 
    lower(trim(regexp_replace(regexp_replace(col("revision_comment"), r'[^\w\s]', ' '), r'\s+', ' ')))  # Xử lý comment
).withColumn(
    "title", 
    lower(trim(regexp_replace(regexp_replace(col("title"), r'[^\w\s]', ' '), r'\s+', ' ')))  # Xử lý comment
)

# Kiểm tra kết quả
df_flat.show(truncate=True)

+--------+---+--------------+--------------------+--------------------+--------------+--------------+--------------------+---------------+-----------+--------------+--------------+---------------+-----------------+--------------------+--------------------+----------+--------------------+--------------+-------------------+--------------------+
|      id| ns|redirect_value|      redirect_title|    revision_comment|contributor_id|contributor_ip|contributor_username|revision_format|revision_id|revision_minor|revision_model|revision_origin|revision_parentid|       revision_sha1|             content|text_bytes|           text_sha1|text_xml_space|          timestamp|               title|
+--------+---+--------------+--------------------+--------------------+--------------+--------------+--------------------+---------------+-----------+--------------+--------------+---------------+-----------------+--------------------+--------------------+----------+--------------------+--------------+-------

In [7]:
key_words = ["traditional anarchism", "Amoeba"]

# Lọc các dòng có giá trị của cột 'redirect_title' nằm trong danh sách 'key_words'
df_flat.filter(col("title").isin(key_words)).show()


+-----+---+--------------+--------------+--------------------+--------------+--------------+--------------------+---------------+-----------+--------------+--------------+---------------+-----------------+--------------------+--------------------+----------+--------------------+--------------+-------------------+--------------------+
|   id| ns|redirect_value|redirect_title|    revision_comment|contributor_id|contributor_ip|contributor_username|revision_format|revision_id|revision_minor|revision_model|revision_origin|revision_parentid|       revision_sha1|             content|text_bytes|           text_sha1|text_xml_space|          timestamp|               title|
+-----+---+--------------+--------------+--------------------+--------------+--------------+--------------------+---------------+-----------+--------------+--------------+---------------+-----------------+--------------------+--------------------+----------+--------------------+--------------+-------------------+--------------

In [10]:
# Ghép dữ liệu từ nhiều cột
df_new = df_flat.select(
    col("title"),
    explode(
        split(col("title"), r"\s+")  # Tokenize Column1
    ).alias("word")
).withColumn("location", lit("title")) \
 .union(df_flat.select(
    col("title"),
    explode(
        split(col("revision_comment"), r"\s+")  # Tokenize Column2
    ).alias("word")
).withColumn("location", lit("revision_comment"))) \
.union(df_flat.select(
    col("title"),
    explode(
        split(col("content"), r"\s+")
    ).alias("word")
).withColumn("location", lit("content")))

df_new.show(n=200, truncate=False)

+---------------------------------------------------------+------------+--------+
|title                                                    |word        |location|
+---------------------------------------------------------+------------+--------+
|mccabe lake                                              |mccabe      |title   |
|mccabe lake                                              |lake        |title   |
|miller lake nova scotia                                  |miller      |title   |
|miller lake nova scotia                                  |lake        |title   |
|miller lake nova scotia                                  |nova        |title   |
|miller lake nova scotia                                  |scotia      |title   |
|miller lake mooseland                                    |miller      |title   |
|miller lake mooseland                                    |lake        |title   |
|miller lake mooseland                                    |mooseland   |title   |
|vickers vagabon

In [9]:
# from sparknlp.pretrained import PretrainedPipeline

# # Lưu mô hình vào thư mục cục bộ
# lemma = LemmatizerModel.pretrained("lemma_antbnc", lang="en")
# lemma.write().overwrite().save("lemma_antbnc_model")


In [11]:
from sparknlp.base import DocumentAssembler
from sparknlp.annotator import Tokenizer, LemmatizerModel
from pyspark.ml import Pipeline

# Sử dụng mô hình đã lưu
lemmatizer = LemmatizerModel.load("lemma_antbnc_model") \
    .setInputCols(["token"]) \
    .setOutputCol("lemma")

# Tạo DocumentAssembler
document_assembler = DocumentAssembler() \
    .setInputCol("word") \
    .setOutputCol("document")

# Tạo Tokenizer
tokenizer = Tokenizer() \
    .setInputCols(["document"]) \
    .setOutputCol("token")

# Xây dựng Pipeline
pipeline = Pipeline(stages=[
    document_assembler,
    tokenizer,
    lemmatizer
])

# Thực thi pipeline
model = pipeline.fit(df_new)
result = model.transform(df_new)

# Hiển thị kết quả
result.select("lemma.result").show(truncate=False)


+-----------+
|result     |
+-----------+
|[mccabe]   |
|[lake]     |
|[miller]   |
|[lake]     |
|[nova]     |
|[scotia]   |
|[miller]   |
|[lake]     |
|[mooseland]|
|[vickers]  |
|[vagabond] |
|[moon]     |
|[lake]     |
|[nova]     |
|[scotia]   |
|[mountain] |
|[lake]     |
|[nova]     |
|[scotia]   |
|[murphys]  |
+-----------+
only showing top 20 rows



In [12]:
from pyspark.sql.functions import col

# Thêm cột mới 'first_lemma' với phần tử đầu tiên của mảng 'lemma.result'
result = result.withColumn("first_lemma", col("lemma.result").getItem(0))


In [13]:
result = result.drop("token")
result = result.drop("document")
result = result.drop("lemma")
result = result.drop("word")

In [14]:
result = result.withColumnRenamed("first_lemma", "word")

In [15]:
result.show(truncate=False)

+-------------------------+--------+---------+
|title                    |location|word     |
+-------------------------+--------+---------+
|mccabe lake              |title   |mccabe   |
|mccabe lake              |title   |lake     |
|miller lake nova scotia  |title   |miller   |
|miller lake nova scotia  |title   |lake     |
|miller lake nova scotia  |title   |nova     |
|miller lake nova scotia  |title   |scotia   |
|miller lake mooseland    |title   |miller   |
|miller lake mooseland    |title   |lake     |
|miller lake mooseland    |title   |mooseland|
|vickers vagabond         |title   |vickers  |
|vickers vagabond         |title   |vagabond |
|moon lake nova scotia    |title   |moon     |
|moon lake nova scotia    |title   |lake     |
|moon lake nova scotia    |title   |nova     |
|moon lake nova scotia    |title   |scotia   |
|mountain lake nova scotia|title   |mountain |
|mountain lake nova scotia|title   |lake     |
|mountain lake nova scotia|title   |nova     |
|mountain lak

In [16]:
print(result.count())

21312000


In [17]:
# Đếm số lần xuất hiện của mỗi từ trong từng vị trí
df_count = df_new.groupBy("word", "title", "location").agg(
    count("*").alias("count")
)

# Tổng hợp thành Inverted Index với số lần xuất hiện
inverted_index_with_count = df_count.groupBy("word").agg(
    collect_list(
        struct(
            col("title").alias("doc_id"),
            col("location"),
            col("count").alias("frequency")
        )
    ).alias("locations")
)
inverted_index_with_count.show(truncate=False)

print(inverted_index_with_count.count())

+---------------+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------

In [18]:
inverted_index_with_count.write.parquet("parquetFile/final2.parquet")

In [None]:
from pyspark.sql.functions import col, substring

# Tạo cột 'first_letter' chứa chữ cái đầu của mỗi từ trong cột 'word'
df_with_letter = inverted_index_with_count.withColumn("first_letter", substring(col("word"), 1, 1))

# Lưu theo chữ cái đầu tiên của từ vào các thư mục con
df_with_letter.write.partitionBy("first_letter").parquet("parquetFile/final.parquet")

In [19]:
spark.stop()

In [29]:
from pyspark.sql import functions as F
from pyspark.sql.functions import broadcast

# Đọc dữ liệu
old_df = spark.read.parquet("parquetFile/final.parquet")
inverted_index_with_count = spark.read.parquet("parquetFile/final2.parquet")

# Đổi tên các cột locations để tránh bị nhầm lẫn sau khi join
old_df = old_df.withColumnRenamed("locations", "locations_old")
inverted_index_with_count = inverted_index_with_count.withColumnRenamed("locations", "locations_new")

# Thực hiện join giữa hai bảng theo cột "word"
combined_df = old_df.join(broadcast(inverted_index_with_count), "word", "outer")

# Group theo "word" và gom tất cả các locations từ cả hai cột vào một danh sách duy nhất
combined_df_grouped = combined_df.groupBy("word").agg(
    # Kết hợp cả hai cột locations_old và locations_new thành một danh sách duy nhất
    F.flatten(F.array_union(F.collect_list("locations_old"), F.collect_list("locations_new"))).alias("locations")
)

# Cache DataFrame nếu cần thiết (nếu sẽ sử dụng lại nhiều lần)
combined_df_grouped.cache()

# Hiển thị kết quả
combined_df_grouped.show(truncate=False)


+----------+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------

In [44]:
from pyspark.sql import functions as F
df = spark.read.parquet("parquetFile/final.parquet")
# Danh sách các cột có tên bắt đầu bằng số

from pyspark.sql import functions as F

# Lọc các dòng mà "word" bắt đầu bằng số
df_with_number = df.filter(F.col("word").rlike("^[0-9]"))

# Lọc các dòng mà "word" không bắt đầu bằng số (chữ)
df_with_letter = df.filter(F.col("word").rlike("^[a-z]"))

df_with_letter = df_with_letter.withColumn("first_letter", F.substring("word", 1, 1))

print(df_with_number.count())

print(df_with_letter.count())

5697
1413618


In [45]:
df_with_letter.show()

+--------------------+--------------------+------------+
|                word|           locations|first_letter|
+--------------------+--------------------+------------+
|                a14d|[{vlb, revision_c...|           a|
|                a390|[{1012, revision_...|           a|
|                a400|[{proverb, revisi...|           a|
|                a443|[{zeno of citium,...|           a|
|                a64e|[{web, revision_c...|           a|
|                 a79|[{infantry, revis...|           a|
|                 a7d|[{ccd, revision_c...|           a|
|                a89e|[{armed forces of...|           a|
|       a__domaci_vel|[{scopolamine, co...|           a|
|       a__domaci_zra|[{april 16, conte...|           a|
|    a__filmvideo_jaz|[{february 21, co...|           a|
|     a__ln_domov_mct|[{bohemia, conten...|           a|
|a__protein_structure|[{denaturation bi...|           a|
|                a_a_|[{multiplication,...|           a|
|                a_al|[{1384, c

In [46]:
df_with_letter.write.partitionBy("first_letter").parquet("parquetFile/partitioned_by_first_letter.parquet")

df_with_number.write.parquet("parquetFile/word_with_number.parquet")

In [47]:
spark.stop()