In [None]:
import pyspark
from pyspark.sql import SparkSession, SQLContext
from pyspark.sql import functions as F
from pyspark.sql.types import DecimalType
from pyspark.sql.functions import col, regexp_replace, trim, count, when
from pyspark.ml.feature import VectorAssembler

In [None]:
sparkSession = SparkSession.builder.appName("Fahasa").config("spark.mongodb.read.connection.uri", "mongodb://127.0.0.1/test.myCollection?readPreference=primaryPreferred").config("spark.mongodb.write.connection.uri", "mongodb://127.0.0.1/test.myCollection").config("spark.jars.packages", "org.mongodb.spark:mongo-spark-connector_2.12:10.4.0").config("spark.jars", "postgresql-42.7.4.jar").getOrCreate()

In [None]:
df = sparkSession.read.format("mongodb").option("database", "Fahasa").option("collection", "Fahasa").load()
df.show()

In [None]:
row_count = df.count()
print(f"Số dòng hiện tại: {row_count}")

column_count = len(df.columns)
print(f"Số cột hiện tại: {column_count}")

In [None]:
df = df.withColumn("Status", when(col("Status") == "Luôn có", 1).when(col("Status") == "Hết hàng", 0).otherwise(col("Status")))

df.show()

In [None]:
price_cleaning = ['Price']

for column in price_cleaning:
    df = df.withColumn("Price", regexp_replace(col("Price"), " đ", ""))

df.show()

In [None]:
sold_cleaning = ['Sold']

for column in sold_cleaning:
    df = df.withColumn("Sold", regexp_replace(col("Sold"), "Đã bán ", ""))

df.show()

In [None]:
vote_cleaning = ['Vote']

for column in ['Vote']:
    df = df.withColumn("Vote", regexp_replace(col("Vote"), r"\(.*?\s*đánh giá\)", ""))
df.show()

In [None]:
rate_cleaning = ['Rate']

for column in rate_cleaning:
    df = df.withColumn("Rate", regexp_replace(col("Rate"), "/5", ""))

df.show()

In [None]:
df = df.withColumn("YearPublish", col("YearPublish").cast("integer"))
df.printSchema()

In [None]:
df = df.withColumn("Price", col("Price").cast("integer"))
df.printSchema()

In [None]:
df = df.withColumn("Sold", col("Sold").cast("integer"))
df.printSchema()

In [None]:
df = df.withColumn("Vote", col("Vote").cast("integer"))
df.printSchema()

In [None]:
df = df.withColumn("Rate", col("Rate").cast("float"))
df.printSchema()

In [None]:
df = df.withColumn("Total", (col("Price") * col("Sold")) + (col("Price") * 0.1))
df.show()

In [None]:
df = df.withColumn("Total", col("Total").cast("integer"))
df.printSchema()

In [None]:
describe_cleaning = ['Describe']

for column in describe_cleaning:
  df = df.withColumn(column, regexp_replace(col(column), "[^\\p{L}\\p{N}\\s]", ""))
  df = df.withColumn(column, regexp_replace(col(column), "\\s+", " "))
  df = df.withColumn(column, trim(col(column)))

df.show()

In [None]:
df = df.dropDuplicates(["Link"]).na.drop(subset=["Link"])

df = df.dropDuplicates(["Title"]).na.drop(subset=["Title"])

df = df.dropDuplicates(["Code"]).na.drop(subset=["Code"])

df = df.na.drop(subset=["Author"])

df = df.na.drop(subset=["Publisher"])

df = df.na.drop(subset=["Supplier"])

df = df.na.drop(subset=["Level"])

df = df.na.drop(subset=["Grade"])

df = df.na.drop(subset=["YearPublish"])

df = df.na.drop(subset=["Price"])

df = df.na.drop(subset=["Sold"])

df = df.na.drop(subset=["Vote"])

df = df.na.drop(subset=["Rate"])

df = df.na.drop(subset=["Status"])

df = df.na.drop(subset=["Describe"])

df.show()

In [None]:
df = df.withColumn("Sold", when(col("Sold").like("%k"), regexp_replace(col("Sold"), "k", "").cast("float") * 1000).otherwise(col("Sold").cast("integer")))

df.show()

In [None]:
row_count = df.count()
print(f"Số dòng sau khi làm sạch dữ liệu: {row_count}")

In [None]:
df_author = df.select("Author").distinct()
df_author = df_author.withColumn("AuthorID", F.concat(F.lit("PA"), F.monotonically_increasing_id()))

df_author.show(truncate=False)

In [None]:
df_publisher = df.select("Publisher").distinct()
df_publisher = df_publisher.withColumn("PublisherID", F.concat(F.lit("PP"), F.monotonically_increasing_id()))

df_publisher.show(truncate=False)

In [None]:
df_supplier = df.select("Supplier").distinct()
df_supplier = df_supplier.withColumn("SupplierID", F.concat(F.lit("PS"), F.monotonically_increasing_id()))

df_supplier.show(truncate=False)

In [None]:
df_Level = df.select("Level").distinct()
df_Level = df_Level.withColumn("LevelID", F.concat(F.lit("PT"), F.monotonically_increasing_id()))

df_Level.show(truncate=False)

In [None]:
df_Grade = df.select("Grade").distinct()
df_Grade = df_Grade.withColumn("GradeID", F.concat(F.lit("PG"), F.monotonically_increasing_id()))

df_Grade.show(truncate=False)

In [None]:
df = df.drop("Link")
df.show()

In [None]:
book_columns = ["Title", "Code", "AuthorID", "PublisherID", "SupplierID", "LevelID", "GradeID", "YearPublish", "Price", "Sold", "Vote", "Rate", "Status", "Total"]

df_product = df.join(df_author, "Author").select(
    "Title", "Code", "AuthorID", "Publisher", "Supplier", "Level", "Grade", "YearPublish", "Price", "Sold", "Vote", "Rate", "Status", "Total"
)

df_product = df_product.join(df_publisher, "Publisher").select(
    "Title", "Code", "AuthorID", "PublisherID", "Supplier", "Level", "Grade", "YearPublish", "Price", "Sold", "Vote", "Rate", "Status", "Total"
)

df_product = df_product.join(df_supplier, "Supplier").select(
    "Title", "Code", "AuthorID", "PublisherID", "SupplierID", "Level", "Grade", "YearPublish", "Price", "Sold", "Vote", "Rate", "Status", "Total"
)

df_product = df_product.join(df_Level, "Level").select(
    "Title", "Code", "AuthorID", "PublisherID", "SupplierID", "LevelID", "Grade", "YearPublish", "Price", "Sold", "Vote", "Rate", "Status", "Total"
)

df_product = df_product.join(df_Grade, "Grade").select(
    "Title", "Code", "AuthorID", "PublisherID", "SupplierID", "LevelID", "GradeID", "YearPublish", "Price", "Sold", "Vote", "Rate", "Status", "Total"
)

df_product = df_product.withColumn("BookID", F.concat(F.lit("PB"), F.monotonically_increasing_id()))

df_product = df_product.select("BookID", *df_product.columns[:-1])

df_product.show()

In [None]:
df_book = df_product.join(df_author, "AuthorID").join(df_publisher, "PublisherID").join(df_supplier, "SupplierID").join(df_Level, "LevelID").join(df_Grade, "GradeID")

df_book.show()

### PostgreSQL

Thêm username, password

In [None]:
table = "Author"
servername = "localhost"
port = 5432
dbname = "Fahasa"
username = "fahasa"
password = "fahasa"

Link = f"jdbc:postgresql://{servername}:{port}/{dbname}"

df_author.write.format("jdbc").option("Link", Link).option("dbtable", table).option("user", username).option("password", password).option("driver", "org.postgresql.Driver").mode("overwrite").save()

In [None]:
table = "Publisher"
servername = "localhost"
port = 5432
dbname = "Fahasa"
username = "fahasa"
password = "fahasa"

Link = f"jdbc:postgresql://{servername}:{port}/{dbname}"

df_publisher.write.format("jdbc").option("Link", Link).option("dbtable", table).option("user", username).option("password", password).option("driver", "org.postgresql.Driver").mode("overwrite").save()

In [None]:
table = "Supplier"
servername = "localhost"
port = 5432
dbname = "Fahasa"
username = "fahasa"
password = "fahasa"

Link = f"jdbc:postgresql://{servername}:{port}/{dbname}"

df_supplier.write.format("jdbc").option("Link", Link).option("dbtable", table).option("user", username).option("password", password).option("driver", "org.postgresql.Driver").mode("overwrite").save()

In [None]:
table = "Level"
servername = "localhost"
port = 5432
dbname = "Fahasa"
username = "fahasa"
password = "fahasa"

Link = f"jdbc:postgresql://{servername}:{port}/{dbname}"

df_Level.write.format("jdbc").option("Link", Link).option("dbtable", table).option("user", username).option("password", password).option("driver", "org.postgresql.Driver").mode("overwrite").save()

In [None]:
table = "Grade"
servername = "localhost"
port = 5432
dbname = "Fahasa"
username = "fahasa"
password = "fahasa"

Link = f"jdbc:postgresql://{servername}:{port}/{dbname}"

df_Grade.write.format("jdbc").option("Link", Link).option("dbtable", table).option("user", username).option("password", password).option("driver", "org.postgresql.Driver").mode("overwrite").save()

In [None]:
table = "Book"
servername = "localhost"
port = 5432
dbname = "Fahasa"
username = "fahasa"
password = "fahasa"

Link = f"jdbc:postgresql://{servername}:{port}/{dbname}"

df_book.write.format("jdbc").option("Link", Link).option("dbtable", table).option("user", username).option("password", password).option("driver", "org.postgresql.Driver").mode("overwrite").save()

### Microsoft SQL

Thêm servername, user, password

In [None]:
connector = "com.microsoft.sqlserver.jdbc.spark"
username = "sa"
password = ""
dbname = "Fahasa"
servername = "localhost"
table = "dbo.Author"

Link = f"jdbc:sqlserver://{servername};databaseName={dbname}"


df_author.write.format("com.microsoft.sqlserver.jdbc.spark").option("Link", Link).option("dbtable", table).option("user", username).option("password", password).mode("overwrite").save()

In [None]:
connector = "com.microsoft.sqlserver.jdbc.spark"
username = "sa"
password = ""
dbname = "Fahasa"
servername = "localhost"
table = "dbo.Publisher"

Link = f"jdbc:sqlserver://{servername};databaseName={dbname}"


df_publisher.write.format("com.microsoft.sqlserver.jdbc.spark").option("Link", Link).option("dbtable", table).option("user", username).option("password", password).mode("overwrite").save()

In [None]:
connector = "com.microsoft.sqlserver.jdbc.spark"
username = "sa"
password = ""
dbname = "Fahasa"
servername = "localhost"
table = "dbo.Supplier"

Link = f"jdbc:sqlserver://{servername};databaseName={dbname}"


df_supplier.write.format("com.microsoft.sqlserver.jdbc.spark").option("Link", Link).option("dbtable", table).option("user", username).option("password", password).mode("overwrite").save()

In [None]:
connector = "com.microsoft.sqlserver.jdbc.spark"
username = "sa"
password = ""
dbname = "Fahasa"
servername = "localhost"
table = "dbo.Level"

Link = f"jdbc:sqlserver://{servername};databaseName={dbname}"


df_Level.write.format("com.microsoft.sqlserver.jdbc.spark").option("Link", Link).option("dbtable", table).option("user", username).option("password", password).mode("overwrite").save()

In [None]:
connector = "com.microsoft.sqlserver.jdbc.spark"
username = "sa"
password = ""
dbname = "Fahasa"
servername = "localhost"
table = "dbo.Grade"

Link = f"jdbc:sqlserver://{servername};databaseName={dbname}"


df_Grade.write.format("com.microsoft.sqlserver.jdbc.spark").option("Link", Link).option("dbtable", table).option("user", username).option("password", password).mode("overwrite").save()

In [None]:
connector = "com.microsoft.sqlserver.jdbc.spark"
username = "sa"
password = ""
dbname = "Fahasa"
servername = "localhost"
table = "dbo.Book"

Link = f"jdbc:sqlserver://{servername};databaseName={dbname}"


df_book.write.format("com.microsoft.sqlserver.jdbc.spark").option("Link", Link).option("dbtable", table).option("user", username).option("password", password).mode("overwrite").save()