In [0]:
from pyspark.sql.types import StructType, StructField, StringType, FloatType, IntegerType, DateType, LongType
from pyspark.sql.functions import col, udf
from pyspark.sql.functions import col, udf
from pyspark.sql.functions import to_date, date_format
import re
import string

In [0]:
def create_empty_delta_table(table_path,schema):
    empty_df = spark.createDataFrame([], schema=schema)
    empty_df.write.format("delta").mode("ignore").save(table_path)

In [0]:
class BronzeBDS():
    def __init__(self,read_path,write_path,checkpoint_path):
        self.read_path = read_path
        self.write_path = write_path
        self.checkpoint_path = checkpoint_path

    def get_schema(self):
        schema = StructType([
            StructField("Diện tích", StringType(), True),
            StructField("Mức giá", StringType(), True),
            StructField("Mặt tiền", StringType(), True),
            StructField("Đường vào", StringType(), True),
            StructField("Hướng nhà", StringType(), True),
            StructField("Hướng ban công", StringType(), True),
            StructField("Số tầng", StringType(), True),
            StructField("Số phòng ngủ", StringType(), True),
            StructField("Số toilet", StringType(), True),
            StructField("Pháp lý", StringType(), True),
            StructField("Nội thất", StringType(), True),
            StructField("Ngày đăng", StringType(), True),
            StructField("Ngày hết hạn", StringType(), True),
            StructField("Loại tin", StringType(), True),
            StructField("Mã tin", StringType(), True),
            StructField("Địa chỉ", StringType(), True),
            StructField("latitude", FloatType(), True),
            StructField("longtitude", FloatType(), True),
            StructField("url", StringType(), True)
        ])

        return schema

    def get_raw_data(self):
        lines = (spark.readStream
                    .format('json')
                    .option("multiline", "true")
                    # .option("maxFilesPerTrigger", 1) 
                    .schema(self.get_schema())
                    .load(f"{self.read_path}")
                )
        return lines

    def standarize_column_names(self,df):
        new_columns = [
            "area",
            "price",
            "frontage",
            "alley_length_to_house",
            "house_direction",
            "balcony_direction",
            "floor_number",
            "bedroom_number",
            "toilet_number",
            "legal_document",
            "furniture",
            "uploaded_date",
            "expired_date",
            "listing_article_tier",
            "listing_id",
            "full_address",
            "latitude",
            "longtitude",
            "url"
        ]

        # Rename the columns
        for old_col, new_col in zip(df.columns, new_columns):
            df = df.withColumnRenamed(old_col, new_col)
        return df
    

    def append_bronze_data(self,bronze_df,trigger):
        sQuery =  ( bronze_df.writeStream
                            .format('delta')
                            .queryName("bronze-ingestion")
                            .option("checkpointLocation",f"{self.checkpoint_path}/bronze_table_checkpoint")
                            .outputMode("append")
        )

        if(trigger=='batch'):
            return ( sQuery.trigger(availableNow = True)
                         .start(f"{self.write_path}/bronze_delta_table"))
        else:
            return ( sQuery.trigger(processingTime = trigger)
                         .start(f"{self.write_path}/bronze_delta_table"))
        

    def process(self,trigger='batch'):
        print(f"\nStarting Bronze Stream...")
        dbutils.fs.mkdirs(self.read_path)
        # Read
        bronze_df = self.get_raw_data()

        # Transform
        bronze_df = self.standarize_column_names(bronze_df)
        bronze_df = bronze_df.dropDuplicates(["listing_id"])
        create_empty_delta_table(f"{self.write_path}/bronze_delta_table",bronze_df.schema)

        # Write
        sQuery =  self.append_bronze_data(bronze_df,trigger)
        return sQuery

In [0]:

def remove_punctuation(input_string):
    translation_table = str.maketrans("", "", string.punctuation)
    cleaned_string = input_string.translate(translation_table)
    return cleaned_string


class SilverBDS():
    def __init__(self,read_path,write_path,checkpoint_path):
        self.read_path = read_path
        self.write_path = write_path
        self.checkpoint_path = checkpoint_path

    @staticmethod
    @udf(LongType())
    def extract_price(price):
        price=price.replace(',','.').strip()
        if('tỷ' in price):
            price=re.sub('[^0-9\.]', '', price)
            return round(float(price)*pow(10,9))
        elif('triệu' in price):
            price=re.sub('[^0-9\.]', '', price)
            return round(float(price)*pow(10,6))
        else:
            return None

    @staticmethod
    @udf(FloatType())
    def extract_float(num,col_name=None):
        if(num==None):
            if(col_name=="alley_length_to_house"):
                return 0
            else:
                return None
        else:
            num=num.replace(',','.').strip()
            num=re.sub('[^0-9\.]', '', num)
            if(len(num)==0):
                return None
            else:
                return round(float(num),1)

    @staticmethod
    @udf(IntegerType())
    def create_price_per_area(price,area):
        if(price is None or area is None):
            return None
        else:
            return round(price/area)

    # Same name with remove_punctuation function over top, but this is only for UDF 
    @staticmethod
    @udf(StringType())
    def remove_punctuation(my_str):
        return None if my_str==None else re.sub(r'[^\w\s]','',my_str.strip())

    @staticmethod
    @udf(StructType([
        StructField("street", StringType(), True),
        StructField("ward", StringType(), True),
        StructField("district", StringType(), True),
        StructField("province", StringType(), True)
    ]))

    def split_full_address(self,address):
        parts = address.split(", ")
        length = len(parts)

        province = remove_punctuation(parts[-1]) if length >= 1 else None
        district = remove_punctuation(parts[-2]) if length >= 2 else None
        ward = remove_punctuation(parts[-3]) if length >= 3 else None
        street = remove_punctuation(parts[-4]) if length >= 4 else None
        return (street, ward, district, province) 
        
    def read_bronze_data(self):
        return ( spark.readStream
                    .format('delta')
                    .load(f"{self.read_path}/bronze_delta_table")
                )
    
    def transform_silver(self, bronze_df):
        date_format_str="dd/MM/yyyy"     
        silver_df = bronze_df.select(
            col('listing_id'),
            to_date("uploaded_date", date_format_str).alias("uploaded_date"),
            to_date("expired_date", date_format_str).alias("expired_date"),
            col('listing_article_tier'),
            self.extract_float(col("area")).alias('area'),
            self.extract_price(col("price")).alias('price'),
            self.extract_float(col("frontage")).alias('frontage'),
            self.extract_float(col("alley_length_to_house"),"alley_length_to_house").alias('alley_length_to_house'),
            col("house_direction"),
            col("balcony_direction"),
            self.extract_float(col("floor_number")).alias('floor_number').cast(IntegerType()),
            self.extract_float(col("bedroom_number")).alias('bedroom_number').cast(IntegerType()),
            self.extract_float(col("toilet_number")).alias('toilet_number').cast(IntegerType()),
            col("legal_document"),
            self.remove_punctuation(col("furniture")).alias('furniture'),
            col('url'),
            col('latitude'),
            col('longtitude'),
            col('full_address')
        )
        silver_df = silver_df.withColumn("price_per_area", self.create_price_per_area(col("price"), col("area")))
        
        silver_df = silver_df.withColumn("full_address", self.split_full_address(col("full_address")))
        
        return silver_df
    

    def upsert(self, silver_df, batch_id):
        delta_table_path=f"{self.write_path}/silver_delta_table"
        tmp_view_name="silver_df_temp_view"
        silver_df.createOrReplaceTempView(tmp_view_name)
        merge_statement = f"""MERGE INTO delta.`{delta_table_path}` s
                USING {tmp_view_name} t
                ON s.listing_id = t.listing_id
                WHEN MATCHED THEN
                UPDATE SET *
                WHEN NOT MATCHED THEN
                INSERT *
            """
        silver_df._jdf.sparkSession().sql(merge_statement)
    
    def append_silver_data(self,silver_df,trigger):
        sQuery = (silver_df.writeStream
                    .queryName("silver-processing")
                    .format("delta")
                    .outputMode("update")
                    .foreachBatch(self.upsert)
                    .option("checkpointLocation",f"{self.checkpoint_path}/silver_table_checkpoint")
        )

        if(trigger=='batch'):
            return ( sQuery.trigger(availableNow = True)
                         .start(f"{self.write_path}/silver_delta_table"))
        else:
            return ( sQuery.trigger(processingTime = trigger)
                         .start(f"{self.write_path}/silver_delta_table"))

    def process(self,trigger='batch'):
        print(f"\nStarting Silver Stream...", end='')

        # Read
        bronze_df = self.read_bronze_data()

        # Transform
        silver_df = self.transform_silver(bronze_df)
        create_empty_delta_table(f"{self.write_path}/silver_delta_table",silver_df.schema)

        # Write
        sQuery = self.append_silver_data(silver_df,trigger)
        return sQuery  