In [None]:
import org.apache.spark.streaming.StreamingContext._
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.streaming.Duration
import org.apache.spark.sql.SQLContext
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.Row
import org.apache.spark.sql.functions.udf
import org.apache.spark.sql.{functions => F}
import org.apache.spark.sql.{types => T}
import org.apache.spark.sql.expressions.Window
import java.sql.Timestamp
import spark.implicits._

In [None]:
spark.catalog.listTables show

In [None]:
def get_bronze_table() = {
    val df_bronze_1 = (spark.read
        format("jdbc")
        table("bronze_houses")
    )
    
    val df_bronze_2 = (df_bronze_1
        withColumn("_filename", $"_metadata.file_name")
        withColumn("_processing_time", current_timestamp())
        withColumn("_start", F.substring($"_metadata.file_name",0,10))
        withColumn("_start", to_date($"_start", "yyyy-MM-dd"))
        withColumn("_end", F.lit("").cast(T.TimestampType))
        withColumnRenamed("data-pk", "data_pk")
        withColumnRenamed("data-lk", "data_lk")
        withColumnRenamed("ldp-description-text", "ldp_description_text")
        withColumnRenamed("ldp-phone-link", "ldp_phone_link")
        withColumnRenamed("property-info-address-citystatezip", "property_info_address_citystatezip")
        withColumnRenamed("property-info-address-main", "property_info_address_main")
        withColumnRenamed("agent-phone", "agent_phone")
        withColumnRenamed("data-est", "data_est")
        withColumn("beds", F.split($"beds", "\n").getItem(0))
        withColumn("price", F.split($"price", " ").getItem(0))
        withColumn("property_info_address_citystatezip", F.trim($"property_info_address_citystatezip"))
        withColumn("property_info_address_main", F.trim($"property_info_address_main"))
        withColumn("full_bathrooms", F.replace($"full_bathrooms", F.lit("Full Bathrooms"), F.lit("")))
        withColumn("sqft", F.split($"sqft", "\n").getItem(0))
        drop("_metadata", "data-pos")
        cache()
        //where($"_start" < "2025-02-09") // TODO; for testing
    )
    
    // rearrage the columns to a more intutitive order
    val column_order = Seq("data_pk", "data_lk") ++ df_bronze_2.columns.filter(x => !Seq("data_pk", "data_lk").contains(x))
    
    val df_bronze = df_bronze_2.select(column_order.map(col):_*)
    df_bronze
}
val df_bronze = get_bronze_table()
df_bronze.count

In [None]:
val df_silver_schema = df_bronze.schema

In [None]:
// get silver table

In [None]:
val df_silver_before = spark.catalog.listTables.filter(row => row.name == "silver_houses").count match {
    case 0 => spark.createDataFrame(sc.emptyRDD[Row], df_silver_schema) // table does not exist
    case _ => spark.read.table("silver_houses").cache()
}
df_silver_before.count()

In [None]:
val df_silver_existing_entries_with_new_changes = (
    df_silver_before.as("a")
    join(
        df_bronze.as("b"),
        $"a.data_pk" === $"b.data_pk",
        "semi")
)

val df_silver_existing_entries_without_changes = (
    df_silver_before.as("a")
    join(
        df_bronze.as("b"),
        $"a.data_pk" === $"b.data_pk",
        "anti")
)
(df_silver_existing_entries_with_new_changes count(), df_silver_existing_entries_without_changes count())

In [None]:
// find new entries between both silver and bronze
val columns_to_compare = df_bronze.columns filter(x => !Seq("_start", "_end", "_processing_time", "_filename").contains(x)) map(col)
val windowSpec_3 = Window.partitionBy(columns_to_compare:_*).orderBy("_start")
val new_data_3 = (
    df_bronze union df_silver_existing_entries_with_new_changes
    withColumn("_rn", F.row_number().over(windowSpec_3))
    where($"_rn" === 1)
    drop("_rn")
    )
new_data_3 count()

In [None]:
// set end dates
val new_data_4_cols = Seq("data_pk") map(col)
val windowSpec_4 = Window.partitionBy(new_data_4_cols:_*).orderBy("_start")
val new_data_4 = (
    new_data_3
    withColumn("_end", lead($"_start",1).over(windowSpec_4))
    withColumn("_end", F.coalesce($"_end", F.lit(Timestamp.valueOf("9999-12-31 00:00:00.000"))))
)
new_data_4 count()

In [None]:
val results = (new_data_4
    .union(df_silver_existing_entries_without_changes))
results.count()

In [None]:
results
    .write
    .format("parquet")
    .mode("overwrite")
    .saveAsTable("silver_houses_temp")

In [None]:
spark.sql("DROP TABLE IF EXIStS silver_houses")

In [None]:
spark.read.table("silver_houses_temp")
.write.saveAsTable("silver_houses")

In [None]:
spark.sql("select * from silver_houses where data_pk = '0dceq922d0y66' order by _start") show