In [0]:
# first thing we need to have loading logic we check what is the max_ingestion time in the downstream table and set
# query accordingly to load the upstream table

In [0]:
%sql
select current_timestamp() as ts

In [0]:
# new table name parsed_data

In [0]:
from pyspark.sql.functions import col, lit
from pyspark.sql.utils import AnalysisException

# Default to load all data if no timestamp found
default_timestamp = "1900-01-01 00:00:00"

try:
    result = spark.sql("SELECT MAX(ingestion_timestamp) AS last_ts FROM mycatalog.hp_prd_data.parsed_data").collect()
    last_processed_timestamp = result[0]["last_ts"]
    
    # Handle first-time (null result)
    if last_processed_timestamp is None:
        last_processed_timestamp = default_timestamp

except AnalysisException as e:
    # Table does not exist (e.g., first run)
    print("Downstream table doesn't exist. Loading all data.")
    last_processed_timestamp = default_timestamp


In [0]:
last_processed_timestamp

In [0]:
df = spark.sql(f"SELECT zipcode, raw_json FROM mycatalog.hp_prd_data.raw_data WHERE ingestion_timestamp > '{last_processed_timestamp}'")

In [0]:
df.count()

In [0]:
import json
from pyspark.sql.functions import udf
from pyspark.sql.types import *
# Define schema of one flattened home
home_schema = StructType([
    StructField("search_zipcode", StringType()),
    StructField("mls#", StringType()),
    StructField("mls_status", StringType()),
    StructField("house_price", LongType()),
    StructField("hoa", StringType()),                  # None or str
    StructField("sqft", StringType()),                 # Can be None or float
    StructField("price_per_sqft", StringType()),
    StructField("lot_size", StringType()),
    StructField("beds",StringType()),
    StructField("baths", StringType()),
    StructField("fullbaths", StringType()),
    StructField("partialBaths", StringType()),
    StructField("location", StringType()),
    StructField("stories", StringType()),              # stories is 1.0, so use DoubleType
    StructField("latitude", DoubleType()),
    StructField("longitude", DoubleType()),
    StructField("streetLine", StringType()),
    StructField("city", StringType()),
    StructField("state", StringType()),
    StructField("zip", StringType()),
    StructField("postalCode", StringType()),
    StructField("countryCode", StringType()),
    StructField("searchStatus", IntegerType()),        # 1
    StructField("propertyType", IntegerType()),        # 6
    StructField("uiPropertyType", IntegerType()),      # 1
    StructField("listingType", IntegerType()),         # 1
    StructField("propertyId", LongType()),             # 38910260
    StructField("listingId", LongType()),              # 205114738
    StructField("dataSourceId", IntegerType()),
    StructField("marketId", IntegerType()),
    StructField("yearBuilt", IntegerType()),
    StructField("openHouseStartFormatted", StringType()),
    StructField("openHouseEventName", StringType()),
    StructField("url", StringType()),
    StructField("isHot", BooleanType()),
    StructField("hasVirtualTour", BooleanType()),
    StructField("hasVideoTour", BooleanType()),
    StructField("has3DTour", BooleanType()),
    StructField("isActiveKeyListing", BooleanType()),
    StructField("isNewConstruction", BooleanType()),
    StructField("listingRemarks", StringType()),
    StructField("scanUrl", StringType()),
    StructField("posterFrameUrl", StringType())
])

# Define UDF that returns an array of flattened homes
def extract_listings(text, search_zipcode):
    results = []
    try:
        json_text = text[4:]
        data = json.loads(json_text)
        homes = data.get("payload", {}).get("originalHomes", {}).get("homes", [])
        
        for row in homes:
            try:
                results.append({
                    "search_zipcode": search_zipcode,
                    "mls#": row.get("mlsId", {}).get("value"),
                    "mls_status": row.get("mlsStatus"),
                    "house_price": row.get("price", {}).get("value"),
                    "hoa": row.get("hoa", {}).get("value") if "hoa" in row and "value" in row["hoa"] else None,
                    "sqft": row.get("sqFt", {}).get("value") if "sqFt" in row else None,  # FIXED
                    "price_per_sqft": row.get("pricePerSqFt", {}).get("value"),
                    "lot_size": row.get("lotSize", {}).get("value"),
                    "beds": row.get("beds"),
                    "baths": row.get("baths"),
                    "fullbaths": row.get("fullbaths"),
                    "partialBaths": row.get("partialBaths"),
                    "location": row.get("location", {}).get("value"),
                    "stories": row.get("stories"),
                    "latitude": row.get("latLong", {}).get("value", {}).get("latitude"),
                    "longitude": row.get("latLong", {}).get("value", {}).get("longitude"),
                    "streetLine": row.get("streetLine", {}).get("value"),
                    "city": row.get("city"),
                    "state": row.get("state"),
                    "zip": row.get("zip"),
                    "postalCode": row.get("postalCode", {}).get("value"),
                    "countryCode": row.get("countryCode"),
                    "searchStatus": row.get("searchStatus"),
                    "propertyType": row.get("propertyType"),
                    "uiPropertyType": row.get("uiPropertyType"),
                    "listingType": row.get("listingType"),
                    "propertyId": row.get("propertyId"),
                    "listingId": row.get("listingId"),
                    "dataSourceId": row.get("dataSourceId"),
                    "marketId": row.get("marketId"),
                    "yearBuilt": row.get("yearBuilt", {}).get("value"),
                    "openHouseStartFormatted": row.get("openHouseStartFormatted"),
                    "openHouseEventName": row.get("openHouseEventName"),
                    "url": row.get("url"),
                    "isHot": row.get("isHot"),
                    "hasVirtualTour": row.get("hasVirtualTour"),
                    "hasVideoTour": row.get("hasVideoTour"),
                    "has3DTour": row.get("has3DTour"),
                    "isActiveKeyListing": row.get("isActiveKeyListing"),
                    "isNewConstruction": row.get("isNewConstruction"),
                    "listingRemarks": row.get("listingRemarks"),
                    "scanUrl": row.get("scanUrl"),
                    "posterFrameUrl": row.get("posterFrameUrl")
                })
            except Exception as e:
                print("we have an error ", e)
                continue
            print("PRICE BLOCK:", row.get("price"))    
    except Exception as e:
        pass
    return results
# Register as UDF
extract_udf = udf(extract_listings, ArrayType(home_schema))


In [0]:
df_extracted = df.withColumn("homes_array", extract_udf("raw_json", "zipcode"))

In [0]:
from pyspark.sql.functions import explode
import datetime

In [0]:

final_df = df_extracted.select(explode("homes_array").alias("home")).select("home.*")

In [0]:
%sql
drop table mycatalog.hp_prd_data.dev_parsed_data

In [0]:
from pyspark.sql.functions import current_timestamp
from pyspark.sql.types import TimestampType

final_df = final_df.withColumn("ingestion_timestamp", current_timestamp().cast(TimestampType()))


In [0]:

final_df.write.mode("append").saveAsTable("mycatalog.hp_prd_data.parsed_data")

In [0]:
%sql
select * from mycatalog.hp_prd_data.dev_parsed_data limit 10

In [0]:
%sql
select
count(*) from
mycatalog.hp_prd_data.parsed_data