In [1]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, explode, monotonically_increasing_id, split, when, lower, trim, udf
from pyspark.sql import Row
from pyspark.sql import functions as F
from minio import Minio
import pandas as pd
import numpy as np
import pyarrow as pa
import pyarrow.parquet as pq
from pyspark.sql.types import StringType
from fuzzywuzzy import fuzz

In [2]:
spark = SparkSession.builder \
    .appName("Silver") \
    .master('local[*]') \
    .config("spark.driver.maxResultSize", "6g") \
    .config("spark.driver.memory", "3g") \
    .config("spark.executor.memory", "3g") \
    .config('spark.dynamicAllocation.minExecutors' , '1') \
    .config('spark.dynamicAllocation.maxExecutors' , '2') \
    .config('spark.dynamicAllocation.enabled' , 'true') \
    .config("spark.hadoop.fs.s3a.access.key", 'sJ8IkEjav4gkDwjt2BxK') \
    .config("spark.hadoop.fs.s3a.secret.key", 'atZpF6WhWSD0tH2vSxylDHYpdOWZ11zdnSZ87ca8') \
    .config("spark.hadoop.fs.s3a.endpoint", 'http://127.0.0.1:9000') \
    .config("spark.hadoop.fs.s3a.impl", 'org.apache.hadoop.fs.s3a.S3AFileSystem') \
    .config("spark.hadoop.fs.s3a.path.style.access", 'true') \
    .config("spark.sql.extensions", 'io.delta.sql.DeltaSparkSessionExtension') \
    .config("spark.sql.catalog.spark_catalog", 'org.apache.spark.sql.delta.catalog.DeltaCatalog') \
    .getOrCreate()

#### Kết nối local (chỉnh sửa lại kết nối được với MinIO, sao cho lấy được file trong bucket để xử lý)

In [3]:
file_path = "s3a://bronze/business.parquet"
file_path_review = "s3a://bronze/review.parquet"

#### Xử lý bảng Business

In [4]:
business_raw = spark.read.parquet(file_path)
business_raw.show(5)

+--------------------+--------------------+--------------------+--------------------+--------------------+------------+--------------------+-------+----------+-----------+--------------------+-----------+------------+-----+-----+
|                 _id|             address|          attributes|         business_id|          categories|        city|               hours|is_open|  latitude|  longitude|                name|postal_code|review_count|stars|state|
+--------------------+--------------------+--------------------+--------------------+--------------------+------------+--------------------+-------+----------+-----------+--------------------+-----------+------------+-----+-----+
|{66c753fcd7db47b0...|87 Grasso Plaza S...|{null, null, null...|mpf3x-BjTdTEA3yCZ...|Shipping Centers,...|      Affton|{8:0-18:30, 0:0-0...|      1| 38.551126| -90.335695|       The UPS Store|      63123|          15|  3.0|   MO|
|{66c753fcd7db47b0...|         935 Race St|{null, u'none', n...|MTSW4McQd7CbVtyj

In [5]:
business_raw.printSchema()

root
 |-- _id: struct (nullable = true)
 |    |-- oid: string (nullable = true)
 |-- address: string (nullable = true)
 |-- attributes: struct (nullable = true)
 |    |-- AcceptsInsurance: string (nullable = true)
 |    |-- Alcohol: string (nullable = true)
 |    |-- Ambience: string (nullable = true)
 |    |-- BYOB: string (nullable = true)
 |    |-- BYOBCorkage: string (nullable = true)
 |    |-- BestNights: string (nullable = true)
 |    |-- BikeParking: string (nullable = true)
 |    |-- BusinessAcceptsBitcoin: string (nullable = true)
 |    |-- BusinessAcceptsCreditCards: string (nullable = true)
 |    |-- BusinessParking: string (nullable = true)
 |    |-- ByAppointmentOnly: string (nullable = true)
 |    |-- Caters: string (nullable = true)
 |    |-- CoatCheck: string (nullable = true)
 |    |-- Corkage: string (nullable = true)
 |    |-- DogsAllowed: string (nullable = true)
 |    |-- DriveThru: string (nullable = true)
 |    |-- GoodForDancing: string (nullable = true)
 |    |

#### Tách bảng Business thành các cột riêng biệt

##### Bảng đầu tiên: Business

In [9]:
business_df = business_raw.select("business_id", "name", "address", "city", "state", "postal_code", "latitude", "longitude", "stars", "review_count", "is_open")
business_df.show(5)

business_df.count()

+--------------------+--------------------+--------------------+------------+-----+-----------+----------+-----------+-----+------------+-------+
|         business_id|                name|             address|        city|state|postal_code|  latitude|  longitude|stars|review_count|is_open|
+--------------------+--------------------+--------------------+------------+-----+-----------+----------+-----------+-----+------------+-------+
|mpf3x-BjTdTEA3yCZ...|       The UPS Store|87 Grasso Plaza S...|      Affton|   MO|      63123| 38.551126| -90.335695|  3.0|          15|      1|
|MTSW4McQd7CbVtyjq...|  St Honore Pastries|         935 Race St|Philadelphia|   PA|      19107|39.9555052|-75.1555641|  4.0|          80|      1|
|tUFrWirKiKi_TAnsV...|              Target|5255 E Broadway Blvd|      Tucson|   AZ|      85711| 32.223236|-110.880452|  3.5|          22|      0|
|mWMc6_wTdE0EUBKIG...|Perkiomen Valley ...|       101 Walnut St|  Green Lane|   PA|      18054|40.3381827|-75.4716585|  4.5|

150346

In [10]:
business_df.printSchema()

root
 |-- business_id: string (nullable = true)
 |-- name: string (nullable = true)
 |-- address: string (nullable = true)
 |-- city: string (nullable = true)
 |-- state: string (nullable = true)
 |-- postal_code: string (nullable = true)
 |-- latitude: double (nullable = true)
 |-- longitude: double (nullable = true)
 |-- stars: double (nullable = true)
 |-- review_count: integer (nullable = true)
 |-- is_open: integer (nullable = true)



In [14]:
# Biến toàn cục để lưu các thành phố đã chuẩn hóa
unique_cities = []

# Hàm chuẩn hóa tên thành phố dựa trên Fuzzy Matching
def fuzzy_match(city, threshold=85):
    global unique_cities
    for uc in unique_cities:
        if fuzz.ratio(city.lower(), uc.lower()) >= threshold:
            return uc
    unique_cities.append(city)
    return city

# Định nghĩa UDF
fuzzy_match_udf = udf(lambda city: fuzzy_match(city), StringType())

# Áp dụng UDF để chuẩn hóa tên thành phố
business_df_normalized = business_df.withColumn("city", fuzzy_match_udf(col("city")))

business_df_normalized.show()

+--------------------+--------------------+--------------------+--------------+-----+-----------+-------------+---------------+-----+------------+-------+
|         business_id|                name|             address|          city|state|postal_code|     latitude|      longitude|stars|review_count|is_open|
+--------------------+--------------------+--------------------+--------------+-----+-----------+-------------+---------------+-----+------------+-------+
|mpf3x-BjTdTEA3yCZ...|       The UPS Store|87 Grasso Plaza S...|        Affton|   MO|      63123|    38.551126|     -90.335695|  3.0|          15|      1|
|MTSW4McQd7CbVtyjq...|  St Honore Pastries|         935 Race St|  Philadelphia|   PA|      19107|   39.9555052|    -75.1555641|  4.0|          80|      1|
|tUFrWirKiKi_TAnsV...|              Target|5255 E Broadway Blvd|        Tucson|   AZ|      85711|    32.223236|    -110.880452|  3.5|          22|      0|
|mWMc6_wTdE0EUBKIG...|Perkiomen Valley ...|       101 Walnut St|    Gr

In [15]:
business_df_normalized.write.format("delta").mode("overwrite").save("s3a://silver/business")

#### Xử lý cột Attributes

##### Bảng thứ 2: Business_Attributes

In [9]:
# Lấy các cột từ attributes và business_id
business_attributes = business_raw.select(
    F.col("business_id"),
    *[F.col(f"attributes.{attr}").alias(attr) for attr in business_raw.select("attributes.*").columns]
)

# Tạo biểu thức stack động cho tất cả các cột attribute
stack_expr = f"stack({len(business_attributes.columns) - 1}, " + ", ".join(
    [f"'{i}', {col}" for i, col in enumerate(business_attributes.columns[1:]) if col is not None]
) + ") as (attribute_id, value)"

# Chuyển đổi DataFrame sử dụng biểu thức stack
business_attributes_convert = (
    business_attributes
    .selectExpr("business_id", stack_expr)
    .filter(F.col("value").isNotNull())  # Lọc bỏ các dòng có giá trị null trong cột value
    .select("business_id", F.col("attribute_id").cast("long"))  # Chuyển đổi attribute_id sang kiểu long
)

# Hiển thị DataFrame đã chuyển đổi
business_attributes_convert.show(5)

+--------------------+------------+
|         business_id|attribute_id|
+--------------------+------------+
|mpf3x-BjTdTEA3yCZ...|           8|
|MTSW4McQd7CbVtyjq...|           1|
|MTSW4McQd7CbVtyjq...|           6|
|MTSW4McQd7CbVtyjq...|           8|
|MTSW4McQd7CbVtyjq...|           9|
+--------------------+------------+
only showing top 5 rows



In [10]:
business_attributes_convert.printSchema()

root
 |-- business_id: string (nullable = true)
 |-- attribute_id: long (nullable = true)



In [11]:
business_attributes_convert.write.format("delta").mode("overwrite").save("s3a://silver/business_attributes")

##### Bảng thứ 3: Attributes

In [12]:
attribute_names = business_raw.select("attributes.*").columns

# Tạo DataFrame Spark với các cột `id` và `name`
attributes_name = spark.createDataFrame([Row(attribute_id=i, attribute_name=name) for i, name in enumerate(attribute_names)])

# Hiển thị DataFrame `attributes_name`
attributes_name.show(10)

+------------+--------------------+
|attribute_id|      attribute_name|
+------------+--------------------+
|           0|    AcceptsInsurance|
|           1|             Alcohol|
|           2|            Ambience|
|           3|                BYOB|
|           4|         BYOBCorkage|
|           5|          BestNights|
|           6|         BikeParking|
|           7|BusinessAcceptsBi...|
|           8|BusinessAcceptsCr...|
|           9|     BusinessParking|
+------------+--------------------+
only showing top 10 rows



In [13]:
attributes_name.printSchema()

root
 |-- attribute_id: long (nullable = true)
 |-- attribute_name: string (nullable = true)



In [14]:
attributes_name.write.format("delta").mode("overwrite").save("s3a://silver/attributes")

#### Xử lý cột Hours

##### Bảng thứ 4: Business_Work_Hours

In [15]:
business_hours = business_raw.select(
    col("business_id"),
    col("hours.*")
)

stack_expr = "stack({}, {}) as (day_id, open_hours)".format(
    len(business_hours.columns[1:]),
    ", ".join(["'{}', {}".format(day, day) for day in business_hours.columns[1:]])
)

business_hours = business_hours.selectExpr("business_id", stack_expr)

business_hours = business_hours.na.drop(subset="open_hours").filter("open_hours != '0:0'")

business_hours_split = business_hours.withColumn("open_close", split(col("open_hours"), "-"))
business_hours = business_hours_split.withColumn("open_hours", col("open_close").getItem(0)) \
                                       .withColumn("close_hours", col("open_close").getItem(1))

business_hours = business_hours.drop("open_close")

business_hours = business_hours.filter(~((col("open_hours") == "0:0") | (col("close_hours") == "0:0")))

day_order = when(col("day_id") == "Monday", 1) \
            .when(col("day_id") == "Tuesday", 2) \
            .when(col("day_id") == "Wednesday", 3) \
            .when(col("day_id") == "Thursday", 4) \
            .when(col("day_id") == "Friday", 5) \
            .otherwise(6)  

business_hours = business_hours.withColumn("day_order", day_order)

business_hours = business_hours.orderBy("day_order").drop("day_order")

business_hours.show(10)

+--------------------+------+----------+-----------+
|         business_id|day_id|open_hours|close_hours|
+--------------------+------+----------+-----------+
|ROeacJQwBeh05Rqg7...|Monday|     11:30|      20:30|
|UJsufbvfyfONHeWdv...|Monday|      9:30|      21:30|
|qhDdDeI3K4jy2Kyzw...|Monday|      10:0|       21:0|
|tUFrWirKiKi_TAnsV...|Monday|       8:0|       22:0|
|WKMJwqnfZKsAae75R...|Monday|       8:0|       18:0|
|qkRM_2X51Yqxk3btl...|Monday|       9:0|       17:0|
|kfNv-JZpuN6TVNSO6...|Monday|      11:0|       21:0|
|9OG5YkX1g2GReZM0A...|Monday|      11:0|       22:0|
|noByYNtDLQAra9ccq...|Monday|      11:0|       18:0|
|MTSW4McQd7CbVtyjq...|Monday|       7:0|       20:0|
+--------------------+------+----------+-----------+
only showing top 10 rows



In [16]:
business_hours.printSchema()

root
 |-- business_id: string (nullable = true)
 |-- day_id: string (nullable = true)
 |-- open_hours: string (nullable = true)
 |-- close_hours: string (nullable = true)



In [17]:
business_hours.write.format("delta").mode("overwrite").save("s3a://silver/business_work_hours")

#### Xử lý cột Categories

##### Bảng thứ 5: Business_Categories

In [18]:
business_categories = business_raw.select(
    col("business_id"),
    col("categories")
).filter(col("categories").isNotNull() & (col("categories") != ""))

# Remove spaces and convert to lower case
business_categories = business_categories.withColumn("categories", lower(trim(col("categories"))))

# Split the categories within each row
business_categories = business_categories.withColumn("category_list", split(col("categories"), ","))

# Explode the categories into separate individual rows
business_categories = business_categories.select(
    col("business_id"),
    explode(col("category_list")).alias("category")
)

# Drop duplicates
business_categories = business_categories.dropDuplicates()

# Create unique categories DataFrame and assign category_id
category_name = business_categories.select("category").distinct() \
    .withColumn("category_id", monotonically_increasing_id())

# Sort category_name by category_id and then by category name
category_name = category_name.orderBy(col("category_id").asc(), col("category").asc())

# Select category_id and category in the desired order
category_name = category_name.select("category_id", "category")

# Join to get business_categories_result
business_categories_result = business_categories.join(category_name, on="category", how="inner") \
    .select("business_id", "category_id")

# Show the results
business_categories_result.show(10)

+--------------------+-----------+
|         business_id|category_id|
+--------------------+-----------+
|aPNXGTDkf-4bjhyMB...|       1816|
|JX4tUpd09YFchLBuI...|       1348|
|Hwt3_mOEmU-t--ywc...|       1211|
|Hwt3_mOEmU-t--ywc...|       2146|
|pEm4xNCk8d0TF6A1g...|        778|
|P8brGDYVWjeW9GrKi...|       1474|
|SZU9c8V2GuREDN5Kg...|        604|
|Ucl9Vo5lwrUmYbV8D...|       1226|
|1E9o1SNo7UTf1XHTF...|       1617|
|cSigjSbOfHR_mHGTC...|       1438|
+--------------------+-----------+
only showing top 10 rows



In [19]:
business_categories_result.printSchema()

root
 |-- business_id: string (nullable = true)
 |-- category_id: long (nullable = false)



In [20]:
business_categories_result.write.format("delta").mode("overwrite").save("s3a://silver/business_category")

##### Bảng thứ 6: Categories

In [21]:
category_name.show(10)

+-----------+--------------------+
|category_id|            category|
+-----------+--------------------+
|          0|    shipping centers|
|          1|event planning & ...|
|          2|            painters|
|          3|         art classes|
|          4| carpet installation|
|          5|             jewelry|
|          6|      acne treatment|
|          7|            notaries|
|          8|            climbing|
|          9|        post offices|
+-----------+--------------------+
only showing top 10 rows



In [22]:
category_name.printSchema()

root
 |-- category_id: long (nullable = false)
 |-- category: string (nullable = false)



In [23]:
category_name.write.format("delta").mode("overwrite").save("s3a://silver/category")

#### Xử lý bảng Reviews

##### Bảng thứ 7: Reviews

In [5]:
review_raw = spark.read.parquet(file_path_review)
review_raw.show(5)

+--------------------+--------------------+----+-------------------+-----+--------------------+-----+--------------------+------+--------------------+
|                 _id|         business_id|cool|               date|funny|           review_id|stars|                text|useful|             user_id|
+--------------------+--------------------+----+-------------------+-----+--------------------+-----+--------------------+------+--------------------+
|{66c75476b97ce792...|kxX2SOes4o-D3ZQBk...|   1|2015-01-04 00:01:03|    0|AqPFMleE6RsU23_au...|  5.0|Wow!  Yummy, diff...|     1|_7bHUi9Uuf5__HHc_...|
|{66c75476b97ce792...|XQfwVwDr-v0ZS3_Cb...|   0|2018-07-07 22:09:11|    0|KU_O5udG6zpxOg-Vc...|  3.0|If you decide to ...|     0|mh_-eMZ6K5RLWhZyI...|
|{66c75476b97ce792...|YjUWPpI6HXG530lwP...|   0|2014-02-05 20:30:30|    0|saUsX_uimxRlCVr67...|  3.0|Family diner. Had...|     0|8g_iMtfSiwikVnbP2...|
|{66c75476b97ce792...|7ATYjTIgM3jUlt4UM...|   1|2012-01-03 15:28:18|    0|BiTunyQ73aT9WBnpR...

In [6]:
review_raw.printSchema()

root
 |-- _id: struct (nullable = true)
 |    |-- oid: string (nullable = true)
 |-- business_id: string (nullable = true)
 |-- cool: integer (nullable = true)
 |-- date: string (nullable = true)
 |-- funny: integer (nullable = true)
 |-- review_id: string (nullable = true)
 |-- stars: double (nullable = true)
 |-- text: string (nullable = true)
 |-- useful: integer (nullable = true)
 |-- user_id: string (nullable = true)



In [7]:
review_df = review_raw.select("review_id", "user_id", "business_id", "stars", "useful", "funny", "cool", "text", "date")

review_df.show(5)


+--------------------+--------------------+--------------------+-----+------+-----+----+--------------------+-------------------+
|           review_id|             user_id|         business_id|stars|useful|funny|cool|                text|               date|
+--------------------+--------------------+--------------------+-----+------+-----+----+--------------------+-------------------+
|AqPFMleE6RsU23_au...|_7bHUi9Uuf5__HHc_...|kxX2SOes4o-D3ZQBk...|  5.0|     1|    0|   1|Wow!  Yummy, diff...|2015-01-04 00:01:03|
|KU_O5udG6zpxOg-Vc...|mh_-eMZ6K5RLWhZyI...|XQfwVwDr-v0ZS3_Cb...|  3.0|     0|    0|   0|If you decide to ...|2018-07-07 22:09:11|
|saUsX_uimxRlCVr67...|8g_iMtfSiwikVnbP2...|YjUWPpI6HXG530lwP...|  3.0|     0|    0|   0|Family diner. Had...|2014-02-05 20:30:30|
|BiTunyQ73aT9WBnpR...|OyoGAe7OKpv6SyGZT...|7ATYjTIgM3jUlt4UM...|  5.0|     1|    0|   1|I've taken a lot ...|2012-01-03 15:28:18|
|Sx8TMOWLNuJBWer-0...|bcjbaE6dDog4jkNY9...|e4Vwtrqf-wpJfwesg...|  4.0|     1|    0|   1|Cu

In [8]:
review_df.printSchema()

root
 |-- review_id: string (nullable = true)
 |-- user_id: string (nullable = true)
 |-- business_id: string (nullable = true)
 |-- stars: double (nullable = true)
 |-- useful: integer (nullable = true)
 |-- funny: integer (nullable = true)
 |-- cool: integer (nullable = true)
 |-- text: string (nullable = true)
 |-- date: string (nullable = true)



In [9]:
review_df.write.format("delta").mode("overwrite").save("s3a://silver/reviews")

In [10]:
spark.stop()